RocketMQ中broker消息存储之如何实现拉取消息
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这篇文章给大家分享的是有关RocketMQ中broker消息存储之如何实现拉取消息的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。在consumer拉取消息时,broker首
千家信息网最后更新 2025年12月02日RocketMQ中broker消息存储之如何实现拉取消息
这篇文章给大家分享的是有关RocketMQ中broker消息存储之如何实现拉取消息的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
在consumer拉取消息时,broker首先会根据待拉取的topic+queueId得到对应的ConsumeQueue,再根据消费offset从ConsumeQueue相应的偏移位置中获取该消息在commitlog里真实的offset/msgsize/tagscode信息,最后再从commitlog查出消息体。
消息拉取在broker存储层的调用入口为DefaultMessageStore.getMessage方法。核心逻辑如下:
// DefaultMessageStore.java public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, final MessageFilter messageFilter) { // ... // 1. 定位ConsumeQueue ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); if (consumeQueue != null) { minOffset = consumeQueue.getMinOffsetInQueue(); maxOffset = consumeQueue.getMaxOffsetInQueue(); if (maxOffset == 0) { status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; nextBeginOffset = nextOffsetCorrection(offset, 0); } else if (offset < minOffset) { status = GetMessageStatus.OFFSET_TOO_SMALL; nextBeginOffset = nextOffsetCorrection(offset, minOffset); } else if (offset == maxOffset) { status = GetMessageStatus.OFFSET_OVERFLOW_ONE; nextBeginOffset = nextOffsetCorrection(offset, offset); } else if (offset > maxOffset) { status = GetMessageStatus.OFFSET_OVERFLOW_BADLY; if (0 == minOffset) { nextBeginOffset = nextOffsetCorrection(offset, minOffset); } else { nextBeginOffset = nextOffsetCorrection(offset, maxOffset); } } else { // 2. 从ConsumeQueue中读取消费偏移offset处的内容 SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset); if (bufferConsumeQueue != null) { try { status = GetMessageStatus.NO_MATCHED_MESSAGE; long nextPhyFileStartOffset = Long.MIN_VALUE; long maxPhyOffsetPulling = 0; int i = 0; final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE); // 单个请求最大拉取数据量 final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded(); ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); // commitlog offset 8bytes int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); // msg size 4bytes long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); // tags hashcode 8bytes // ... // 3. 通过tagscode快速过滤 if (messageFilter != null && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } continue; } // 4. 从commitlog获取消息体 SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy); if (null == selectResult) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.MESSAGE_WAS_REMOVING; } nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy); continue; } // 5. 通过消息体过滤 if (messageFilter != null && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } // release... selectResult.release(); continue; } this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet(); // 6.添加到返回结果 getResult.addMessage(selectResult); status = GetMessageStatus.FOUND; nextPhyFileStartOffset = Long.MIN_VALUE; } // ... } finally { bufferConsumeQueue.release(); } } else { // ... } } } else { status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE; nextBeginOffset = nextOffsetCorrection(offset, 0); } // ... return getResult; }ConsumeQueue中存储的是固定长度(每个消息20字节)的内容,因此访问比较简单:
// ConsumeQueue.java public SelectMappedBufferResult getIndexBuffer(final long startIndex) { int mappedFileSize = this.mappedFileSize; long offset = startIndex * CQ_STORE_UNIT_SIZE; // 消费者offset * 固定20字节长度 if (offset >= this.getMinLogicOffset()) { // 定位到所属的MappedFile MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset); if (mappedFile != null) { SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize)); // 从MappedFile中读取实际的数据 return result; } } return null; }通过ConsumeQueue获取消息在commitlog中的偏移量以及消息大小之后,获取消息体的方法如下
// CommitLog.java public SelectMappedBufferResult getMessage(final long offset, final int size) { int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); // 定位消息所在的MappedFile MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0); if (mappedFile != null) { int pos = (int) (offset % mappedFileSize); return mappedFile.selectMappedBuffer(pos, size); // 从MappedFile中获取消息体 } return null; }消息拉取整体流程如下
感谢各位的阅读!关于"RocketMQ中broker消息存储之如何实现拉取消息"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!
消息
存储
内容
偏移
定位
消费
字节
数据
方法
更多
篇文章
长度
不错
实用
最大
位置
信息
入口
单个
大小
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
discuz 数据库地址
华为服务器开机不引导
云巅科技软件开发有限公司
杭州快睿网络技术公司
tag图数据库
软件开发中什么时候用数组
工业软件开发入口
倩女幽魂手游哪个服务器最好
艾洛裳网络技术有限公司
重庆大坪医院网络安全
2000万数据库怎么看
操作系统在软件开发中的应用
广西户外广告机软件开发
迷你世界的玩家去炸mc的服务器
数据库有几种分页方式
互联网 农业科技
数据库好弄吗
小程序服务器到期怎么换
丰台区网络技术咨询职责
从高到低排序 数据库
华为服务器开机不引导
网络安全卡通手抄报
美版我的世界起床战争服务器
数据库创建关系引用字
职业杀手小说软件开发
速达软件数据库注册表删除
厦门市蓝谷网络技术有限公司
郑州联宇网络技术
数据库新标准
浪潮服务器nf5270m6