千家信息网

Message Queue Selector如何实现顺序消费

发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,Message Queue Selector如何实现顺序消费,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。顺序消息的定义:顺序消息是指消
千家信息网最后更新 2025年12月03日Message Queue Selector如何实现顺序消费

Message Queue Selector如何实现顺序消费,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

顺序消息的定义:

顺序消息是指消息的消费顺序和生产顺序相同,在某些场景下,必须保证顺序消息。比如订单的生成、付款、发货.顺序消息又分为全局顺序消息和部分顺序消息,全局顺序消息指某一个topic下的所有消息都要保证顺序;部分顺序消息只要保证某一组消息被顺序消费。对于订单消息来说,只要保证同一个订单ID的生成、付款、发货消息按照顺序消费即可。

部分顺序消费实现原理:

1. 发送端:保证相同订单ID的各种消息发往同一个MessageQueue(同一个Topic下的某一个queue)

2.消费端:保证同一个MessageQueue里面的消息不被并发处理 (同一个Topic的不同MessageQueue是可以同时消费的)

        DefaultMQProducer producer = new DefaultMQProducer("local-test-producer");                producer.setNamesrvAddr("10.76.0.38:9876");                producer.start();                for (int i = 0; i < 1000; i++) {                        Order order  = new Order();                        order.orderId = i;                        order.status = "生成";                        Message msg1 = new Message("local-test-producer",                                        "TagA",                                        JsonUtils.toJson(order).getBytes()                        );                        SendResult sendResult1 = producer.send(msg1, new MessageQueueSelector() {                                @Override                                public MessageQueue select(List mqs, Message msg, Object arg) {                                        return null;                                }                        }, order.orderId);                        log.info("sendResult1={}",sendResult1);                        Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);                        order.status="付款";                        Message msg2 = new Message("local-test-producer",                                        "TagA",                                        JsonUtils.toJson(order).getBytes()                        );                        SendResult sendResult2 = producer.send(msg2, new MessageQueueSelector() {                                @Override                                public MessageQueue select(List mqs, Message msg, Object arg) {                                        return null;                                }                        }, order.orderId);                        log.info("sendResult2={}",sendResult2);                        Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);                        order.status="发货";                        Message msg3 = new Message("local-test-producer",                                        "TagA",                                        JsonUtils.toJson(order).getBytes()                        );                        producer.send(msg2, new MessageQueueSelector() {                                @Override                                public MessageQueue select(List mqs, Message msg, Object arg) {                                        return null;                                }                        }, order.orderId);                        Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);                        SendResult sendResult3 = producer.send(msg3, new MessageQueueSelector() {                                @Override                                public MessageQueue select(List mqs, Message msg, Object arg) {                                        Integer id = (Integer) arg;                                        int index = id % mqs.size();                                        return mqs.get(index);                                }                                //MessageQueueSelector保证同一个orderId的消息都存储在同一个MessageQueue。                        }, order.orderId);                        log.info("sendResult3={}",sendResult1);                }

消费端主要逻辑如下,主要MessageListenerOrderly回调实现同一个MessageQueue里面的消息不会被并发消费:

       //同一个MessageQueue里面的消息要顺序消费,不能并发消费。                //但是同一个Topic的不同MessageQueue是可以同时消费的                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("local-test-consumer2");                consumer.setNamesrvAddr("10.76.0.38:9876");                consumer.subscribe("test", "");                consumer.setPullBatchSize(1);                consumer.setConsumeThreadMin(1);                consumer.setConsumeThreadMax(1);        //      consumer.registerMessageListener(new MessageListenerConcurrently() {                consumer.registerMessageListener(new MessageListenerOrderly() {                        @Override                        public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {                                List messages = new ArrayList<>();                                for (MessageExt msg : msgs) {                                        messages.add(new String(msg.getBody()) +"\tbroker:"+msg.getStoreHost());                                }                                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), messages);                                return ConsumeOrderlyStatus.SUCCESS;                        }                });                consumer.start();                Thread.currentThread().join();

源码分析:

我们知道在RocketMQ中是可以给一个消费者实例设置多个线程并发消费的. consumer.setConsumeThreadMin 和 setConsumeThreadMax,

那MessageListenerOrderly是如何保证某一个时刻,只有一个消费者的某一个线程在消费某一个MessageQueue的呢?

就在Client模块的 ConsumeMessageOrderlyService里面,消费者端并不是简单的禁止并发处理,而是给每一个Consumer Queue加锁,

private final MessageQueueLock messageQueueLock = new MessageQueueLock();

在消费每个消息之前,需要先获取这个消息对应的Consumer Queue所对应的锁,保证同一个Consumer Queue的消息不会被并发消费,但是不同的Consumer Queue的消息是可以并发处理的。

看完上述内容,你们掌握Message Queue Selector如何实现顺序消费的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!

0