RocketMQ中如何实现push consumer顺序消费
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,这篇文章主要介绍了RocketMQ中如何实现push consumer顺序消费,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。顺序消费的
千家信息网最后更新 2025年12月01日RocketMQ中如何实现push consumer顺序消费
这篇文章主要介绍了RocketMQ中如何实现push consumer顺序消费,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。
顺序消费的逻辑实现在类ConsumeMessageOrderlyService中,为了实现消费的有序性需要对queue进行加锁,包括:
在broker对message queue加锁,保证当前client占有该队列
consumer端对MessageQueue加锁,保证当前线程占有该队列
consumer端对ProcessQueue加锁,保证当前线程占有该队列
对broker上message queue加锁是在ConsumeMessageOrderlyService中周期性调度执行的:
// ConsumeMessageOrderlySerivce public void start() { if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { ConsumeMessageOrderlyService.this.lockMQPeriodically(); } }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); } } public synchronized void lockMQPeriodically() { if (!this.stopped) { // 通过LOCK_BATCH_MQ请求在broker批量锁定mq this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll(); } }ConsumeMessageOrderlyService中的消费请求提交:
// ConsumeMessageOrderlySerivce public void submitConsumeRequest( final Listmsgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispathToConsume) { if (dispathToConsume) { // 提交ConsumeRequest,丢弃了入参的msgs,每次都从ProcessQueue中顺序获取 ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue); this.consumeExecutor.submit(consumeRequest); } }
顺序处理了逻辑:
// ConsumeMessageOrderlyService.ConsumeRequest public void run() { if (this.processQueue.isDropped()) { log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; } // 1.获取MessageQueue上的锁 final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); synchronized (objLock) { if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) { final long beginTime = System.currentTimeMillis(); for (boolean continueConsume = true; continueConsume; ) { // 循环处理 // ... // 单个ConsumeRequest最长处理时间默认60s long interval = System.currentTimeMillis() - beginTime; if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) { ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10); break; } final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); // 2. 从ProcessQueue顺序获取batchSize个消息 List msgs = this.processQueue.takeMessags(consumeBatchSize); defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup()); if (!msgs.isEmpty()) { final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue); ConsumeOrderlyStatus status = null; ConsumeMessageContext consumeMessageContext = null; // .... long beginTimestamp = System.currentTimeMillis(); ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; boolean hasException = false; try { // 3. 获取ProcessQueue上的锁 this.processQueue.getLockConsume().lock(); if (this.processQueue.isDropped()) { log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); break; } // 4. 推给业务处理逻辑 status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageOrderlyService.this.consumerGroup, msgs, messageQueue); hasException = true; } finally { this.processQueue.getLockConsume().unlock(); // 解锁 } // ... // 5. 处理消费结果 continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this); } else { continueConsume = false; // ProcessQueue为空,停止本次推送 } } } else { if (this.processQueue.isDropped()) { log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; } ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100); } } } 在processConsumeResult中主要会执行2步操作:
在ProcessQueue上执行commit(),将前一次takeMessages返回的msgs从缓存中删除
更新OffsetStore
感谢你能够认真阅读完这篇文章,希望小编分享的"RocketMQ中如何实现push consumer顺序消费"这篇文章对大家有帮助,同时也希望大家多多支持,关注行业资讯频道,更多相关知识等着你来学习!
顺序
消费
处理
篇文章
逻辑
保证
线程
最长
有序
业务
价值
兴趣
单个
同时
周期
周期性
时间
是在
更多
有序性
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
我的世界虎牙服务器是什么版本
数据库 回滚日志 归档日志
魔兽7.1数据库
湖北多功能软件开发费用
数据库字段为空怎么放到map中
从江无线网络技术
网络安全观后感1500
大型工控软件开发
香港软件开发合同
杜绝出现网络安全
冯玉 数据库
自由好玩沙雕的我的世界服务器
魅影传奇没有服务器
中天农村土地调查数据库怎么截屏
昆山正规软件开发口碑推荐
上海飞旗网络技术招聘
计算机网络技术谢希仁第五版
网络安全 会议 征稿 2022
oracle数据库执行语句教程
sql数据库表备份方法
光纤报警数据采集服务器
情绪面孔数据库
html页面间传数据库
数据库编辑版权文件
数据库对我们来说有什么意义
西安四叶草网络安全薪资
交警网络安全管理员组织生活会
海南应用软件开发有哪些
育碧什么软件开发游戏的
embase数据库检索式