千家信息网

52.源代码解读-RocketMQ消息写入机制

发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,一. 前言RocketMQ采用内存和磁盘存储来存储消息。那现在来分析一下消息存储的流程二. 代码流程在Broker启动的时候会拉起相关服务流程如下:流程图引用网址http://blog.csdn.ne
千家信息网最后更新 2025年12月01日52.源代码解读-RocketMQ消息写入机制

一. 前言

RocketMQ采用内存和磁盘存储来存储消息。那现在来分析一下消息存储的流程

二. 代码流程

在Broker启动的时候会拉起相关服务
流程如下:

流程图引用网址
http://blog.csdn.net/akfly/article/details/53447000

三. 代码流程

由于是Broker来存储消息,那么消息入口的代码应该是在Broker里面,而Broker的入口是BrokerStartup,以及重要的BrokerController。
具体流程可以参考Broker启动源代码分析。

Broker启动流程

以发送消息为例

1. Broker启动注册发送消息处理器

Broker启动的时候,会注册一个SendMessageProcesser来响应netty的发送消息请求,如下:

public void registerProcessor() {        /**         * SendMessageProcessor         */        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);        sendProcessor.registerSendMessageHook(sendMessageHookList);        sendProcessor.registerConsumeMessageHook(consumeMessageHookList);        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);        this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);        this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);        this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);        this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);}

2. 消息处理器处理发送者发送过来的消息

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {    @Override    public RemotingCommand proce***equest(ChannelHandlerContext ctx,        RemotingCommand request) throws RemotingCommandException {        SendMessageContext mqtraceContext;                ...        switch (request.getCode()) {            response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);        }    }}       

继续看sendMessage..

private RemotingCommand sendMessage(final ChannelHandlerContext ctx,        ...        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);}               

调用MessageStore.putMessage(msgInner)

0