千家信息网

rocketmq中DefaultRocketMQListenerContainer的原理及用法

发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,这篇文章主要介绍"rocketmq中DefaultRocketMQListenerContainer的原理及用法",在日常操作中,相信很多人在rocketmq中DefaultRocketMQListe
千家信息网最后更新 2025年12月01日rocketmq中DefaultRocketMQListenerContainer的原理及用法

这篇文章主要介绍"rocketmq中DefaultRocketMQListenerContainer的原理及用法",在日常操作中,相信很多人在rocketmq中DefaultRocketMQListenerContainer的原理及用法问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"rocketmq中DefaultRocketMQListenerContainer的原理及用法"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

本文主要研究一下rocketmq的DefaultRocketMQListenerContainer

DefaultRocketMQListenerContainer

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

public class DefaultRocketMQListenerContainer implements InitializingBean,    RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {    private final static Logger log = LoggerFactory.getLogger(DefaultRocketMQListenerContainer.class);    private ApplicationContext applicationContext;    /**     * The name of the DefaultRocketMQListenerContainer instance     */    private String name;    private long suspendCurrentQueueTimeMillis = 1000;    /**     * Message consume retry strategy
-1,no retry,put into DLQ directly
0,broker control retry frequency
* >0,client control retry frequency. */ private int delayLevelWhenNextConsume = 0; private String nameServer; private AccessChannel accessChannel = AccessChannel.LOCAL; private String consumerGroup; private String topic; private int consumeThreadMax = 64; private String charset = "UTF-8"; private ObjectMapper objectMapper; private RocketMQListener rocketMQListener; private RocketMQMessageListener rocketMQMessageListener; private DefaultMQPushConsumer consumer; private Class messageType; private boolean running; // The following properties came from @RocketMQMessageListener. private ConsumeMode consumeMode; private SelectorType selectorType; private String selectorExpression; private MessageModel messageModel; private long consumeTimeout; //...... public void setRocketMQMessageListener(RocketMQMessageListener anno) { this.rocketMQMessageListener = anno; this.consumeMode = anno.consumeMode(); this.consumeThreadMax = anno.consumeThreadMax(); this.messageModel = anno.messageModel(); this.selectorExpression = anno.selector_Expression(); this.selectorType = anno.selectorType(); this.consumeTimeout = anno.consumeTimeout(); } @Override public void setupMessageListener(RocketMQListener rocketMQListener) { this.rocketMQListener = rocketMQListener; } @Override public void destroy() { this.setRunning(false); if (Objects.nonNull(consumer)) { consumer.shutdown(); } log.info("container destroyed, {}", this.toString()); } @Override public boolean isAutoStartup() { return true; } @Override public void stop(Runnable callback) { stop(); callback.run(); } @Override public void start() { if (this.isRunning()) { throw new IllegalStateException("container already running. " + this.toString()); } try { consumer.start(); } catch (MQClientException e) { throw new IllegalStateException("Failed to start RocketMQ push consumer", e); } this.setRunning(true); log.info("running container: {}", this.toString()); } @Override public void stop() { if (this.isRunning()) { if (Objects.nonNull(consumer)) { consumer.shutdown(); } setRunning(false); } } @Override public boolean isRunning() { return running; } private void setRunning(boolean running) { this.running = running; } @Override public int getPhase() { // Returning Integer.MAX_VALUE only suggests that // we will be the first bean to shutdown and last bean to start return Integer.MAX_VALUE; } @Override public void afterPropertiesSet() throws Exception { initRocketMQPushConsumer(); this.messageType = getMessageType(); log.debug("RocketMQ messageType: {}", messageType.getName()); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @Override public String toString() { return "DefaultRocketMQListenerContainer{" + "consumerGroup='" + consumerGroup + '\'' + ", nameServer='" + nameServer + '\'' + ", topic='" + topic + '\'' + ", consumeMode=" + consumeMode + ", selectorType=" + selectorType + ", selectorExpression='" + selectorExpression + '\'' + ", messageModel=" + messageModel + '}'; } private void initRocketMQPushConsumer() throws MQClientException { Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required"); Assert.notNull(consumerGroup, "Property 'consumerGroup' is required"); Assert.notNull(nameServer, "Property 'nameServer' is required"); Assert.notNull(topic, "Property 'topic' is required"); RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey()); boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace(); if (Objects.nonNull(rpcHook)) { consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(), enableMsgTrace, this.applicationContext.getEnvironment(). resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic())); consumer.setVipChannelEnabled(false); consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup)); } else { log.debug("Access-key or secret-key not configure in " + this + "."); consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace, this.applicationContext.getEnvironment(). resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic())); } String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer()); if (customizedNameServer != null) { consumer.setNamesrvAddr(customizedNameServer); } else { consumer.setNamesrvAddr(nameServer); } if (accessChannel != null) { consumer.setAccessChannel(accessChannel); } consumer.setConsumeThreadMax(consumeThreadMax); if (consumeThreadMax < consumer.getConsumeThreadMin()) { consumer.setConsumeThreadMin(consumeThreadMax); } consumer.setConsumeTimeout(consumeTimeout); consumer.setInstanceName(this.name); switch (messageModel) { case BROADCASTING: consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING); break; case CLUSTERING: consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING); break; default: throw new IllegalArgumentException("Property 'messageModel' was wrong."); } switch (selectorType) { case TAG: consumer.subscribe(topic, selectorExpression); break; case SQL92: consumer.subscribe(topic, MessageSelector.bySql(selectorExpression)); break; default: throw new IllegalArgumentException("Property 'selectorType' was wrong."); } switch (consumeMode) { case ORDERLY: consumer.setMessageListener(new DefaultMessageListenerOrderly()); break; case CONCURRENTLY: consumer.setMessageListener(new DefaultMessageListenerConcurrently()); break; default: throw new IllegalArgumentException("Property 'consumeMode' was wrong."); } if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) { ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer); } } private Class getMessageType() { Class targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener); Type[] interfaces = targetClass.getGenericInterfaces(); Class superclass = targetClass.getSuperclass(); while ((Objects.isNull(interfaces) || 0 == interfaces.length) && Objects.nonNull(superclass)) { interfaces = superclass.getGenericInterfaces(); superclass = targetClass.getSuperclass(); } if (Objects.nonNull(interfaces)) { for (Type type : interfaces) { if (type instanceof ParameterizedType) { ParameterizedType parameterizedType = (ParameterizedType) type; if (Objects.equals(parameterizedType.getRawType(), RocketMQListener.class)) { Type[] actualTypeArguments = parameterizedType.getActualTypeArguments(); if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) { return (Class) actualTypeArguments[0]; } else { return Object.class; } } } } return Object.class; } else { return Object.class; } } //......}
  • DefaultRocketMQListenerContainer实现了InitializingBean, RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware接口;setRocketMQMessageListener方法会根据RocketMQMessageListener注解的信息来设置consumeMode、consumeThreadMax、messageModel、selectorExpression、selectorType、consumeTimeout

  • afterPropertiesSet方法执行了initRocketMQPushConsumer及getMessageType方法;initRocketMQPushConsumer方法会根据rpcHook是否为null来创建不同的DefaultMQPushConsumer,之后根据messageModel、selectorType、consumeMode等来配置consumer;如果rocketMQListener类型是RocketMQPushConsumerLifecycleListener的,则执行RocketMQPushConsumerLifecycleListener的prepareStart方法

  • setupMessageListener方法主要是保存了rocketMQListener;isAutoStartup方法返回true;start方法主要是执行consumer.start()方法;stop及destroy方法主要是执行consumer.shutdown()

DefaultMessageListenerConcurrently

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

    public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {        @SuppressWarnings("unchecked")        @Override        public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {            for (MessageExt messageExt : msgs) {                log.debug("received msg: {}", messageExt);                try {                    long now = System.currentTimeMillis();                    rocketMQListener.onMessage(doConvertMessage(messageExt));                    long costTime = System.currentTimeMillis() - now;                    log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);                } catch (Exception e) {                    log.warn("consume message failed. messageExt:{}", messageExt, e);                    context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;                }            }            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;        }    }
  • DefaultMessageListenerConcurrently方法实现了MessageListenerConcurrently接口;它的consumeMessage方法使用for循环try catch执行rocketMQListener.onMessage(doConvertMessage(messageExt))回调,都成功返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,一旦异常则返回ConsumeConcurrentlyStatus.RECONSUME_LATER

DefaultMessageListenerOrderly

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

    public class DefaultMessageListenerOrderly implements MessageListenerOrderly {        @SuppressWarnings("unchecked")        @Override        public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {            for (MessageExt messageExt : msgs) {                log.debug("received msg: {}", messageExt);                try {                    long now = System.currentTimeMillis();                    rocketMQListener.onMessage(doConvertMessage(messageExt));                    long costTime = System.currentTimeMillis() - now;                    log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);                } catch (Exception e) {                    log.warn("consume message failed. messageExt:{}", messageExt, e);                    context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;                }            }            return ConsumeOrderlyStatus.SUCCESS;        }    }
  • DefaultMessageListenerOrderly实现了MessageListenerOrderly接口,其consumeMessage方法使用for循环try catch执行rocketMQListener.onMessage(doConvertMessage(messageExt))回调,都成功返回ConsumeOrderlyStatus.SUCCESS,一旦异常则返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT

小结

  • DefaultRocketMQListenerContainer实现了InitializingBean, RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware接口;setRocketMQMessageListener方法会根据RocketMQMessageListener注解的信息来设置consumeMode、consumeThreadMax、messageModel、selectorExpression、selectorType、consumeTimeout

  • afterPropertiesSet方法执行了initRocketMQPushConsumer及getMessageType方法;initRocketMQPushConsumer方法会根据rpcHook是否为null来创建不同的DefaultMQPushConsumer,之后根据messageModel、selectorType、consumeMode等来配置consumer;如果rocketMQListener类型是RocketMQPushConsumerLifecycleListener的,则执行RocketMQPushConsumerLifecycleListener的prepareStart方法

  • setupMessageListener方法主要是保存了rocketMQListener;isAutoStartup方法返回true;start方法主要是执行consumer.start()方法;stop及destroy方法主要是执行consumer.shutdown()

到此,关于"rocketmq中DefaultRocketMQListenerContainer的原理及用法"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

方法 原理 接口 学习 不同 成功 信息 更多 注解 类型 帮助 循环 配置 实用 接下来 小结 文章 理论 知识 篇文章 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 怎么卸载qq所有数据库 服务器rd授权管理器 嵌入式底层软件开发实训报告 栾城区高科技软件开发服务供应 双色球如何看到售票数据库 易语言数据库编程 合肥市网络安全大赛 武汉物流软件开发平台 闵行区互联网软件开发诚信经营 网络安全法宣讲 用友app登录不上服务器 网络安全员的自我介绍 网络安全的问题有哪些方面 数据库复习必考题b站 万方数据库可以提供哪种文献信息 买一个服务器家用能带几台电脑 天融信网络安全员工资 淮安营销软件开发咨询热线 对网络安全法的见解 局域网数据库怎么打开 网络安全但无internet 北京易加网网络技术有限公司 打印服务器电脑是正常电脑吗 安装linux服务器 香港服务器免备案多少钱一个月 胶州微信公众号软件开发企业 计算机网络技术可以考一建 联想小新进行软件开发 成都物流软件开发公司 上位机软件开发哪些语言好
0