springboot2.5.6集成RabbitMq实现Topic主题模式的方法是什么
发表于:2025-11-13 作者:千家信息网编辑
千家信息网最后更新 2025年11月13日,本篇内容主要讲解"springboot2.5.6集成RabbitMq实现Topic主题模式的方法是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"spr
千家信息网最后更新 2025年11月13日springboot2.5.6集成RabbitMq实现Topic主题模式的方法是什么
本篇内容主要讲解"springboot2.5.6集成RabbitMq实现Topic主题模式的方法是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"springboot2.5.6集成RabbitMq实现Topic主题模式的方法是什么"吧!
1.application.yml
server: port: 8184spring: application: name: rabbitmq-demo rabbitmq: host: 127.0.0.1 # ip地址 port: 5672 username: admin # 连接账号 password: 123456 # 连接密码 template: retry: enabled: true # 开启失败重试 initial-interval: 10000ms # 第一次重试的间隔时长 max-interval: 300000ms # 最长重试间隔,超过这个间隔将不再重试 multiplier: 2 # 下次重试间隔的倍数,此处是2即下次重试间隔是上次的2倍 exchange: topic.exchange # 缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个 publisher-confirm-type: correlated # 生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试 publisher-returns: true listener: type: simple simple: acknowledge-mode: manual prefetch: 1 # 限制每次发送一条数据。 concurrency: 3 # 同一个队列启动几个消费者 max-concurrency: 3 # 启动消费者最大数量 # 重试策略相关配置 retry: enabled: true # 是否支持重试 max-attempts: 5 stateless: false multiplier: 1.0 # 时间策略乘数因子 initial-interval: 1000ms max-interval: 10000ms default-requeue-rejected: true
2.pom.xml引入依赖
org.springframework.boot spring-boot-starter-amqp
3.常量类创建
/** * @author kkp * @ClassName RabbitMqConstants * @date 2021/11/3 14:16 * @Description */public class RabbitMqConstants { public final static String TEST1_QUEUE = "test1-queue"; public final static String TEST2_QUEUE = "test2-queue"; public final static String EXCHANGE_NAME = "test.topic.exchange"; /** * routingKey1 */ public final static String TOPIC_TEST1_ROUTINGKEY = "topic.test1.*"; public final static String TOPIC_TEST1_ROUTINGKEY_TEST = "topic.test1.test"; /** * routingKey1 */ public final static String TOPIC_TEST2_ROUTINGKEY = "topic.test2.*"; public final static String TOPIC_TEST2_ROUTINGKEY_TEST = "topic.test2.test";}4.配置Configuration
import com.example.demo.common.RabbitMqConstants;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.beans.factory.config.ConfigurableBeanFactory;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Scope;/** * @author kkp * @ClassName RabbitMqConfig * @date 2021/11/3 14:16 * @Description */@Slf4j@Configurationpublic class RabbitMqConfig { @Autowired private CachingConnectionFactory connectionFactory; /** * 声明交换机 */ @Bean(RabbitMqConstants.EXCHANGE_NAME) public Exchange exchange(){ //durable(true) 持久化,mq重启之后交换机还在 // Topic模式 //return ExchangeBuilder.topicExchange(RabbitMqConstants.EXCHANGE_NAME).durable(true).build(); //发布订阅模式 return ExchangeBuilder.fanoutExchange(RabbitMqConstants.EXCHANGE_NAME).durable(true).build(); } /** * 声明队列 * new Queue(QUEUE_EMAIL,true,false,false) * durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列 * auto-delete 表示消息队列没有在使用时将被自动删除 默认是false * exclusive 表示该消息队列是否只在当前connection生效,默认是false */ @Bean(RabbitMqConstants.TEST1_QUEUE) public Queue esQueue() { return new Queue(RabbitMqConstants.TEST1_QUEUE); } /** * 声明队列 */ @Bean(RabbitMqConstants.TEST2_QUEUE) public Queue gitalkQueue() { return new Queue(RabbitMqConstants.TEST2_QUEUE); } /** * TEST1_QUEUE队列绑定交换机,指定routingKey */ @Bean public Binding bindingEs(@Qualifier(RabbitMqConstants.TEST1_QUEUE) Queue queue, @Qualifier(RabbitMqConstants.EXCHANGE_NAME) Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(RabbitMqConstants.TOPIC_TEST1_ROUTINGKEY).noargs(); } /** * TEST2_QUEUE队列绑定交换机,指定routingKey */ @Bean public Binding bindingGitalk(@Qualifier(RabbitMqConstants.TEST2_QUEUE) Queue queue, @Qualifier(RabbitMqConstants.EXCHANGE_NAME) Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(RabbitMqConstants.TOPIC_TEST2_ROUTINGKEY).noargs(); } /** * 如果需要在生产者需要消息发送后的回调, * 需要对rabbitTemplate设置ConfirmCallback对象, * 由于不同的生产者需要对应不同的ConfirmCallback, * 如果rabbitTemplate设置为单例bean, * 则所有的rabbitTemplate实际的ConfirmCallback为最后一次申明的ConfirmCallback。 * @return */ @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory); return template; }}5.Rabbit工具类创建
import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.util.UUID;/** * @author kkp * @ClassName RabbitMqUtils * @date 2021/11/3 14:21 * @Description */@Slf4j@Componentpublic class RabbitMqUtils implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{ private RabbitTemplate rabbitTemplate; /** * 构造方法注入 */ @Autowired public RabbitMqUtils(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; //这是是设置回调能收到发送到响应 rabbitTemplate.setConfirmCallback(this); //如果设置备份队列则不起作用 rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(this); } /** * 回调确认 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause); }else{ log.info("消息发送失败:correlationData({}),ack({}),cause({})",correlationData,ack,cause); } } /** * 消息发送到转换器的时候没有对列,配置了备份对列该回调则不生效 * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message); } /** * 发送到指定Queue * @param queueName * @param obj */ public void send(String queueName, Object obj){ CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); this.rabbitTemplate.convertAndSend(queueName, obj, correlationId); } /** * 1、交换机名称 * 2、routingKey * 3、消息内容 */ public void sendByRoutingKey(String exChange, String routingKey, Object obj){ CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); this.rabbitTemplate.convertAndSend(exChange, routingKey, obj, correlationId); }}6.service创建
public interface TestService { String sendTest1(String content); String sendTest2(String content);}7.impl实现
import com.example.demo.common.RabbitMqConstants;import com.example.demo.util.RabbitMqUtils;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;/** * @author kkp * @ClassName TestServiceImpl * @date 2021/11/3 14:24 * @Description */@Service@Slf4jpublic class TestServiceImpl implements TestService { @Autowired private RabbitMqUtils rabbitMqUtils; @Override public String sendTest1(String content) { rabbitMqUtils.sendByRoutingKey(RabbitMqConstants.EXCHANGE_NAME, RabbitMqConstants.TOPIC_TEST1_ROUTINGKEY_TEST, content); log.info(RabbitMqConstants.TOPIC_TEST1_ROUTINGKEY_TEST+"***************发送成功*****************"); return "发送成功!"; } @Override public String sendTest2(String content) { rabbitMqUtils.sendByRoutingKey(RabbitMqConstants.EXCHANGE_NAME, RabbitMqConstants.TOPIC_TEST2_ROUTINGKEY_TEST, content); log.info(RabbitMqConstants.TOPIC_TEST2_ROUTINGKEY_TEST+"***************发送成功*****************"); return "发送成功!"; }}8.监听类
import com.example.demo.common.RabbitMqConstants;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;/** * @author kkp * @ClassName RabbitMqListener * @date 2021/11/3 14:22 * @Description */@Slf4j@Componentpublic class RabbitMqListener { @RabbitListener(queues = RabbitMqConstants.TEST1_QUEUE) public void test1Consumer(Message message, Channel channel) { try { //手动确认消息已经被消费 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("Counsoum1消费消息:" + message.toString() + "。成功!"); } catch (Exception e) { e.printStackTrace(); log.info("Counsoum1消费消息:" + message.toString() + "。失败!"); } } @RabbitListener(queues = RabbitMqConstants.TEST2_QUEUE) public void test2Consumer(Message message, Channel channel) { try { //手动确认消息已经被消费 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("Counsoum2消费消息:" + message.toString() + "。成功!"); } catch (Exception e) { e.printStackTrace(); log.info("Counsoum2消费消息:" + message.toString() + "。失败!"); } }}9.Controller测试
import com.example.demo.server.TestService;import jdk.nashorn.internal.objects.annotations.Getter;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.*;import java.util.Map;/** * @author kkp * @ClassName TestController * @date 2021/11/3 14:25 * @Description */@Slf4j@RestController@RequestMapping("/enterprise")public class TestController { @Autowired private TestService testService; @GetMapping("/finance") public String hello3(@RequestParam(required = false) Map params) { return testService.sendTest2(params.get("entId").toString()); } /** * 发送消息test2 * @param content * @return */ @PostMapping(value = "/finance2") public String sendTest2(@RequestBody String content) { return testService.sendTest2(content); }} 到此,相信大家对"springboot2.5.6集成RabbitMq实现Topic主题模式的方法是什么"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
消息
队列
消费
成功
交换机
方法
模式
配置
主题
内容
不同
名称
备份
实际
手动
时候
消费者
生产者
策略
学习
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
oracle数据库连不上的问题
湖北创新网络技术服务代理品牌
poc是啥意思 软件开发
字节跳动企业网络服务器
软件开发师答案
软件开发公司哪儿好
软件开发瀑布流方法
与数据库相关的新技术新应用
数据库和数据仓库连接方式
互联网科技与政府治理社科院
哈工大数据库慕课第七章答案
瓜子二手车软件开发薪资
服务器怎么防护攻击
手机软件开发游戏工具下载
不能生成默认数据库啥原因
计算机网络技术icmp
计算机网络技术求职信息
神经网络技术概念
杭州临安灵迅网络技术经营部
易赛诺青岛网络技术
互联网金融科技工作会议
服务器格言
数据库课程心得
网络技术发展的方向
用户所有信息数据库
沧州支付软件开发
数据库中产品编号怎么写
sql 显示数据库中的表
软件需求工程对软件开发的影响
易赛诺青岛网络技术