52.源代码解读-RocketMQ消息写入机制
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,一. 前言RocketMQ采用内存和磁盘存储来存储消息。那现在来分析一下消息存储的流程二. 代码流程在Broker启动的时候会拉起相关服务流程如下:流程图引用网址http://blog.csdn.ne
千家信息网最后更新 2025年12月01日52.源代码解读-RocketMQ消息写入机制
一. 前言
RocketMQ采用内存和磁盘存储来存储消息。那现在来分析一下消息存储的流程
二. 代码流程
在Broker启动的时候会拉起相关服务
流程如下:

流程图引用网址
http://blog.csdn.net/akfly/article/details/53447000
三. 代码流程
由于是Broker来存储消息,那么消息入口的代码应该是在Broker里面,而Broker的入口是BrokerStartup,以及重要的BrokerController。
具体流程可以参考Broker启动源代码分析。
Broker启动流程
以发送消息为例
1. Broker启动注册发送消息处理器
Broker启动的时候,会注册一个SendMessageProcesser来响应netty的发送消息请求,如下:
public void registerProcessor() { /** * SendMessageProcessor */ SendMessageProcessor sendProcessor = new SendMessageProcessor(this); sendProcessor.registerSendMessageHook(sendMessageHookList); sendProcessor.registerConsumeMessageHook(consumeMessageHookList); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);}2. 消息处理器处理发送者发送过来的消息
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { @Override public RemotingCommand proce***equest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { SendMessageContext mqtraceContext; ... switch (request.getCode()) { response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); } }} 继续看sendMessage..
private RemotingCommand sendMessage(final ChannelHandlerContext ctx, ... PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);} 调用MessageStore.putMessage(msgInner)
消息
流程
存储
代码
处理
入口
处理器
时候
分析
源代码
重要
内存
前言
发送者
是在
流程图
磁盘
网址
参考
服务
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
sql如何查数据库哪个锁表了
软件开发公司封闭开发管理
计算机网络技术小作文
江阴创新网络技术创新服务
大型网络安全事件溯源
软件开发人月费用标准文件
企业征信数据库的管理
5g网络技术相关股票
珠海通讯软件开发厂家直销
分布式数据库在石油领域的应用
c访问sql数据库
光猫服务器名称怎么查
在建立网络安全监测预警和
手游用什么语言和软件开发
有必要用到云服务器吗
nba发展联盟球员数据库
淘宝物流数据库查询
数据库管理技术课本
it技术软件开发
软件开发过程中的版本
汽车供应链网络安全
蜗牛移动服务器维护需要多久
重装系统后数据库恢复
学微电子想转软件开发
4s网络安全标准
东莞软件开发职校
深圳互联网科技城
软件开发数据标准规范
大连港隆港网络技术有限公司
点米互联网科技有限公司怎么样