使用RocketMQ怎么对消息进行处理
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这期内容当中小编将会给大家带来有关使用RocketMQ怎么对消息进行处理,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。消息发送(生产者)以maven + Sprin
千家信息网最后更新 2025年12月02日使用RocketMQ怎么对消息进行处理
这期内容当中小编将会给大家带来有关使用RocketMQ怎么对消息进行处理,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
消息发送(生产者)
以maven + SpringBoot 工程为例,先在pom.xml增加依赖
org.apache.rocketmq rocketmq-spring-boot-starter 2.0.1
由于,这个依赖是一个starter,直接引入依赖就可以开始写投递消息的代码了。这个starter注册了一个叫org.apache.rocketmq.spring.core.RocketMQTemplate的bean,用它就可以直接把消息投递出去。 具体的API是这样的
XXXEvent xxxDto = new XXXEvent(); Messagemessage = MessageBuilder.withPayload(xxxDto).build(); String dest = String.format("%s:%s",topic-name","tag-name"); //默认投递:同步发送 不会丢失消息。如果在投递成功后发生网络异常,客户端会认为投递失败而回滚本地事务 this.rocketMQTemplate.send(dest, xxxDto);
这种投递方式能保证投递成功的消息不会丢失,但是不能保证投递一定成功。假设一次调用的流程是这样的
如果在步骤3的时候发生错误,因为出错mqClient会认为消息投递失败而把事务回滚。如果消息已经被消费,那就会导致业务错误。我们可以用事务消息解决这个问题。
以带事务方式投递的消息,正常情况下的处理流程是这样的
出错的时候是这样的
由于普通消息没有消息回查,普通消息用的producer不支持回查操作,不同业务的回查处理也不一样,事务消息需要使用单独的producer。消息发送代码大概是这样的
//调用这段代码之前别做会影响数据的操作XXXEvent xxxDto = new XXXEvent();Messagemessage = MessageBuilder.withPayload(xxxDto).build();String dest = String.format("%s:%s",topic-name","tag-name");TransactionSendResult transactionSendResult = this.rocketMQTemplate.sendMessageInTransaction("poducer-name","topic-name:tag-name",message,"xxxid");if (LocalTransactionState.ROLLBACK_MESSAGE.equals(transactionSendResult.getLocalTransactionState())){ throw new RuntimeException("事务消息投递失败");}//按照RocketMQ的写法,这个地方不应该有别的代码
@RocketMQTransactionListener(txProducerGroup = "producer") class TransactionListenerImpl implements RocketMQLocalTransactionListener { //消息投递成功后执行的逻辑(半消息) //原文:When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction. @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { try{ // xxxService.doSomething(); return RocketMQLocalTransactionState.COMMIT; catch(IOException e){ //不确定最终是否成功 return RocketMQLocalTransactionState.UNKNOWN; }catch(Exception e){ return RocketMQLocalTransactionState.ROLLBACK; } } //回查事务执行状态 @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { Boolean result = xxxService.isSuccess(msg,arg); if(result != null){ if(result){ return RocketMQLocalTransactionState.COMMIT; }else{ return RocketMQLocalTransactionState.ROLLBACK; } } return RocketMQLocalTransactionState.UNKNOWN; } }处理消息(消费)
普通消息和事务消息的区别只在投递的时候才明显,对应的消费端代码比较简单
import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.stereotype.Component;@Slf4j@Component@RocketMQMessageListener(consumerGroup = "xxx-consumer", topic = "topic-name",selectorExpression = "tag-name")public class XXXEventMQListener implements RocketMQListener{ private String repeatCheckRedisKeyTemplate = "topic-name:tag:repeat-check:%s"; @Autowired private StringRedisTemplate redisTemplate; @Override public void onMessage(XXXEvent message) { log.info("consumer message {}",message); //处理消息 try{ xxxService.doSomething(message); }catch(Exception ex){ log.warn(String.format("message [%s] 消费失败",message),ex); //抛出异常后,MQClient会返回ConsumeConcurrentlyStatus.RECONSUME_LATER,这条消息会再次尝试消费 throw new RuntimException(ex); } }}
RocketMQ用ACK机制保证NameServer知道消息是否被消费在org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer里是这么处理的
public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently { @SuppressWarnings("unchecked") @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { for (MessageExt messageExt : msgs) { log.debug("received msg: {}", messageExt); try { long now = System.currentTimeMillis(); rocketMQListener.onMessage(doConvertMessage(messageExt)); long costTime = System.currentTimeMillis() - now; log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); } catch (Exception e) { log.warn("consume message failed. messageExt:{}", messageExt, e); context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }} 上述就是小编为大家分享的使用RocketMQ怎么对消息进行处理了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。
消息
事务
处理
消费
成功
代码
普通
时候
保证
业务
内容
方式
流程
错误
分析
不同
专业
中小
内容丰富
再次
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
揭阳软件开发去哪
软件开发要学会的知识
自己创建软件开发团队
不动产登记数据库建设方案
高新区一站式网络技术市面价
河北大学生网络安全大赛
幻璃镜 连接服务器失败
诛仙3新服务器推荐
软件开发公司招会计可靠吗
数据库软件工程师考试题
知网研学的数据库
武汉博纳领航网络技术
设备管理的服务器地址是什么
员工表数据库
插接式数据库
小程序转成链接需要挂在服务器吗
我心中网络安全
网络安全+答题游戏
盖娅互娱是外国服务器公司吗
网络安全需要多长时间精通
维普科技期刊数据库
高天 石家庄网络安全
关于网络安全应急处置的表述
数据库显示字段相同
c 有什么软件开发
数据库重装后
我的世界呆呆联机服务器
通信网络安全详细设计步骤
闵行区品牌软件开发咨询热线
成年人注意的网络安全