Springboot如何整合RocketMQ收发消息
发表于:2025-11-07 作者:千家信息网编辑
千家信息网最后更新 2025年11月07日,这篇文章将为大家详细讲解有关Springboot如何整合RocketMQ收发消息,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。Springboot 整合 Rocke
千家信息网最后更新 2025年11月07日Springboot如何整合RocketMQ收发消息
这篇文章将为大家详细讲解有关Springboot如何整合RocketMQ收发消息,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
Springboot 整合 RocketMQ 收发消息
创建springboot项目
pom.xml添加rocketmq-spring-boot-starter依赖。
org.apache.rocketmq rocketmq-spring-boot-starter 2.1.0
yml 配置
application.yml
rocketmq: name-server: 192.168.64.141:9876
application-demo1.yml
使用 demo1 profile 指定生产者组组名
rocketmq: producer: group: producer-demo1
application-demo2.yml
使用 demo2 profile 指定生产者组组名
rocketmq: producer: group: producer-demo2
测试
demo 1
发送普通消息
发送 Spring 的通用 Message 对象
发送异步消息
发送顺序消息
生产者
package cn.tedu.demo2.m1;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;@Componentpublic class Producer { @Autowired private RocketMQTemplate t ; public void send(){ //发送同步消息 t.convertAndSend("Topic1:TagA", "Hello world! "); //发送spring的Message Message message = MessageBuilder.withPayload("Hello Spring message! ").build(); t.send("Topic1:TagA",message); //发送异步消息 t.asyncSend("Topic1:TagA", "hello world asyn", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("发送成功"); } @Override public void onException(Throwable throwable) { System.out.println("发送失败"); } }); //发送顺序消息 t.syncSendOrderly("Topic1", "98456237,创建", "98456237"); t.syncSendOrderly("Topic1", "98456237,支付", "98456237"); t.syncSendOrderly("Topic1", "98456237,完成", "98456237"); }} 消费者
package cn.tedu.demo2.m1;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener(topic = "Topic1",consumerGroup = "consumer-demo1")public class Consumer implements RocketMQListener{ @Override public void onMessage(String s) { System.out.println("收到"+s); }}
主类
package cn.tedu.demo2.m1;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class Main { public static void main(String[] args) { SpringApplication.run(Main.class, args); }}测试类
需要放在 test 文件夹
激活 demo1 profile @ActiveProfiles("demo1")
package cn.tedu.demo2.m1;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.ActiveProfiles;@SpringBootTest@ActiveProfiles("demo1")public class Test1 { @Autowired private Producer producer; @Test public void test1(){ producer.send(); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } }}demo 2
发送事务消息
生产者
package cn.tedu.demo2.m2;import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;@Componentpublic class Producer { @Autowired private RocketMQTemplate t; public void send(){ Message message = MessageBuilder.withPayload("Hello world").build(); //一旦发送消息,则执行监听器 t.sendMessageInTransaction("Topic2",message,null); } @RocketMQTransactionListener class Lis implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { System.out.println("执行本地事务"); return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { System.out.println("执行事务回查"); return RocketMQLocalTransactionState.COMMIT; } }} 消费者
package cn.tedu.demo2.m2;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener(topic = "Topic2",consumerGroup = "consumer-demo2")public class Consumer implements RocketMQListener{ @Override public void onMessage(String s) { System.out.println("收到"+s); }}
主类
package cn.tedu.demo2.m2;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class Main { public static void main(String[] args) { SpringApplication.run(Main.class, args); }}测试类
package cn.tedu.demo2.m2;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.ActiveProfiles;@SpringBootTest@ActiveProfiles("demo2")public class Test2 { @Autowired private Producer producer; @Test public void test1(){ producer.send(); //为了能够收到消费者消费的数据,这里通过休眠模拟等待时间 try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } }}关于"Springboot如何整合RocketMQ收发消息"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
消息
生产者
消费
生产
整合
事务
消费者
篇文章
测试
更多
顺序
不错
实用
普通
成功
内容
对象
数据
文件
文件夹
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
宝山区正规软件开发服务保障
lw服务器
企业网络技术开发
网络技术大赛获奖题目
网络安全运维的课程
南宁惠普服务器
烈熊网络技术有限公司干什么的
软件开发 英语
网络技术三级总结
河南质量软件开发推广
网络安全语言800字
管理服务器角色
恩牛网络技术有限公司招聘
软件开发公司考勤制度
阳泉天创软件开发有限公司
周鸿伟说网络安全
专升本网络技术有什么要求
jsp数据库连接代码写在哪
华为服务器芯片生产
江苏工程软件开发定制价格
河南浩瀚星云网络技术
网络技术与培训宣传
上古世纪怎么选服务器
联想服务器默认超级密码
国际服服务器怎么登录
谈谈网络安全为什么重要论文
udp服务器失败
我国网络安全法
服务器站建立
软件开发 残疾人