RocketMQ中broker消息存储之如何实现消息转储
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,小编给大家分享一下RocketMQ中broker消息存储之如何实现消息转储,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!broker在接收到producer发送的消息之后,首先会将消
千家信息网最后更新 2025年12月02日RocketMQ中broker消息存储之如何实现消息转储
小编给大家分享一下RocketMQ中broker消息存储之如何实现消息转储,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!
broker在接收到producer发送的消息之后,首先会将消息存储到CommitLog的末尾,然后通过一个异步的分发线程ReputMessageService将消息转储到ConsumeQueue以及IndexFile中。
转储的核心逻辑在ReputMessageService.doReput中:
// DefaultMessageStore.ReputMessageService private void doReput() { if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) { log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.", this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset()); this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset(); } for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) { break; } // 1. 获取reputFromOffset偏移所指向的数据 SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); if (result != null) { try { this.reputFromOffset = result.getStartOffset(); for (int readSize = 0; readSize < result.getSize() && doNext; ) { // 2. 解析消息体 DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize(); if (dispatchRequest.isSuccess()) { if (size > 0) { // 3. 执行分发 DefaultMessageStore.this.doDispatch(dispatchRequest); // ... this.reputFromOffset += size; readSize += size; // ... } else if (size == 0) { this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset); readSize = result.getSize(); } } else if (!dispatchRequest.isSuccess()) { // ... } } } finally { result.release(); } } else { doNext = false; } } }ConsumeQueue的插入操作如下:
// ConsumeQueue.java private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, final long cqOffset) { if (offset + size <= this.maxPhysicOffset) { log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset); return true; } // 1. 将commitlog offset/msg size/tags code写到内存缓存 this.byteBufferIndex.flip(); this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE); this.byteBufferIndex.putLong(offset); this.byteBufferIndex.putInt(size); this.byteBufferIndex.putLong(tagsCode); final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE; // ConsumeQueue中偏移 // 2. 获取最后一个MappedFile MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset); if (mappedFile != null) { // ... this.maxPhysicOffset = offset + size; // 3. 写入索引数据 return mappedFile.appendMessage(this.byteBufferIndex.array()); } return false; }IndexFile的写入逻辑如下:
// IndexService.java public void buildIndex(DispatchRequest req) { IndexFile indexFile = retryGetAndCreateIndexFile(); if (indexFile != null) { long endPhyOffset = indexFile.getEndPhyOffset(); DispatchRequest msg = req; String topic = msg.getTopic(); String keys = msg.getKeys(); if (msg.getCommitLogOffset() < endPhyOffset) { return; } // ... if (keys != null && keys.length() > 0) { String[] keyset = keys.split(MessageConst.KEY_SEPARATOR); for (int i = 0; i < keyset.length; i++) { // 为每个key执行写入 String key = keyset[i]; if (key.length() > 0) { indexFile = putKey(indexFile, msg, buildKey(topic, key)); if (indexFile == null) { log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey()); return; } } } } } else { log.error("build index error, stop building index"); } } private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) { for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) { log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one"); indexFile = retryGetAndCreateIndexFile(); // 文件已满,重试 if (null == indexFile) { return null; } ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); } return indexFile; }消息转储的整体流程如下图:
看完了这篇文章,相信你对"RocketMQ中broker消息存储之如何实现消息转储"有了一定的了解,如果想了解更多相关知识,欢迎关注行业资讯频道,感谢各位的阅读!
消息
存储
数据
篇文章
逻辑
偏移
内存
完了
指向
整体
文件
更多
末尾
核心
流程
知识
索引
线程
缓存
行业
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
美股网络安全
数据库ifexists
北京中国移动网络技术有限公司
excel 数据库查找
各种主流无线传感网络技术
软件开发功能清单用什么工具
数据库原理参照完整性概念
检查服务器是否正常工作
软件开发与运维主要内容
网络安全大赛攻防操作
普陀区智能化软件开发质量
小学网络安全应急预案方案
奉贤区推广软件开发厂家范围
网络安全大赛在中国的战队
软件开发哪些项目需要学习
软件开发职称怎么写
网络安全法手抄报海报
乡镇网络安全工作总结汇报
智能电视出现服务器问题
数据库 所有ip访问
数据库表初始化语句
手机银行服务器不可用
战地5服务器怎么设置
无人深空发现服务器怎么看
省级数字家庭管理平台服务器认证
饥荒服务器关闭
jsp数据库增删改查
网络技术企业简介范文
布鲁金斯学会 中美网络安全
挂接数据库