千家信息网

Java搭建RabbitMq消息中间件过程是怎么样的

发表于:2025-11-07 作者:千家信息网编辑
千家信息网最后更新 2025年11月07日,这期内容当中小编将会给大家带来有关Java搭建RabbitMq消息中间件过程是怎么样的,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。前言当系统中出现"生产"和"消费
千家信息网最后更新 2025年11月07日Java搭建RabbitMq消息中间件过程是怎么样的

这期内容当中小编将会给大家带来有关Java搭建RabbitMq消息中间件过程是怎么样的,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

前言

当系统中出现"生产"和"消费"的速度或稳定性等因素不一致的时候,就需要消息队列。

名词

exchange: 交换机 routingkey: 路由key queue:队列

控制台端口:15672

  exchange和queue是需要绑定在一起的,然后消息发送到exchange再由exchange通过routingkey发送到对应的队列中。

使用场景

1.技能订单3分钟自动取消,改变状态

2.直播开始前15分钟提醒

3.直播状态自动结束

流程

  生产者发送消息 -> order_pre_exchange交换机 -> order_per_ttl_delay_queue队列

  -> 时间到期 -> order_delay_exchange交换机 -> order_delay_process_queue队列 -> 消费者

第一步:在pom文件中添加

org.springframework.boot spring-boot-starter-amqp

第二步:在application.properties文件中添加

spring.rabbitmq.host=172.xx.xx.xxxspring.rabbitmq.port=5672spring.rabbitmq.username=rabbitspring.rabbitmq.password=123456spring.rabbitmq.virtual-host=/spring.rabbitmq.connection-timeout=15000spring.rabbitmq.publisher-confirms=truespring.rabbitmq.publisher-returns=truespring.rabbitmq.template.mandatory=true

第三步:配置 OrderQueueConfig

package com.tuohang.platform.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.QueueBuilder;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * rabbitMQ的队列设置(生产者发送的消息,永远是先进入exchange,再通过路由,转发到队列) * * * @author Administrator * @version 1.0 * @Date 2018年9月18日 */@Configurationpublic class OrderQueueConfig { /** * 订单缓冲交换机名称 */ public final static String ORDER_PRE_EXCHANGE_NAME = "order_pre_exchange"; /** * 发送到该队列的message会在一段时间后过期进入到order_delay_process_queue 【队列里所有的message都有统一的失效时间】 */ public final static String ORDER_PRE_TTL_DELAY_QUEUE_NAME = "order_pre_ttl_delay_queue"; /** * 订单的交换机DLX 名字 */ final static String ORDER_DELAY_EXCHANGE_NAME = "order_delay_exchange"; /** * 订单message时间过期后进入的队列,也就是订单实际的消费队列 */ public final static String ORDER_DELAY_PROCESS_QUEUE_NAME = "order_delay_process_queue"; /** * 订单在缓冲队列过期时间(毫秒)30分钟 */ public final static int ORDER_QUEUE_EXPIRATION = 1800000; /** * 订单缓冲交换机 * * @return */ @Bean public DirectExchange preOrderExange() { return new DirectExchange(ORDER_PRE_EXCHANGE_NAME); } /** * 创建order_per_ttl_delay_queue队列,订单消息经过缓冲交换机,会进入该队列 * * @return */ @Bean public Queue delayQueuePerOrderTTLQueue() { return QueueBuilder.durable(ORDER_PRE_TTL_DELAY_QUEUE_NAME) .withArgument("x-dead-letter-exchange", ORDER_DELAY_EXCHANGE_NAME) // DLX .withArgument("x-dead-letter-routing-key", ORDER_DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key .withArgument("x-message-ttl", ORDER_QUEUE_EXPIRATION) // 设置订单队列的过期时间 .build(); } /** * 将order_pre_exchange绑定到order_pre_ttl_delay_queue队列 * * @param delayQueuePerOrderTTLQueue * @param preOrderExange * @return */ @Bean public Binding queueOrderTTLBinding(Queue delayQueuePerOrderTTLQueue, DirectExchange preOrderExange) { return BindingBuilder.bind(delayQueuePerOrderTTLQueue).to(preOrderExange).with(ORDER_PRE_TTL_DELAY_QUEUE_NAME); } /** * 创建订单的DLX exchange * * @return */ @Bean public DirectExchange delayOrderExchange() { return new DirectExchange(ORDER_DELAY_EXCHANGE_NAME); } /** * 创建order_delay_process_queue队列,也就是订单实际消费队列 * * @return */ @Bean public Queue delayProcessOrderQueue() { return QueueBuilder.durable(ORDER_DELAY_PROCESS_QUEUE_NAME).build(); } /** * 将DLX绑定到实际消费队列 * * @param delayProcessOrderQueue * @param delayExchange * @return */ @Bean public Binding dlxOrderBinding(Queue delayProcessOrderQueue, DirectExchange delayOrderExchange) { return BindingBuilder.bind(delayProcessOrderQueue).to(delayOrderExchange).with(ORDER_DELAY_PROCESS_QUEUE_NAME); } /** * 监听订单实际消费者队列order_delay_process_queue * * @param connectionFactory * @param processReceiver * @return */ @Bean public SimpleMessageListenerContainer orderProcessContainer(ConnectionFactory connectionFactory, OrderProcessReceiver processReceiver) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(ORDER_DELAY_PROCESS_QUEUE_NAME); // 监听order_delay_process_queue container.setMessageListener(new MessageListenerAdapter(processReceiver)); return container; }}

消费者 OrderProcessReceiver :

package com.tuohang.platform.config;import java.util.Objects;import org.apache.tools.ant.types.resources.selectors.Date;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;/** * 订单延迟处理消费者 * * * @author Administrator * @version 1.0 * @Date 2018年9月18日 */@Componentpublic class OrderProcessReceiver implements ChannelAwareMessageListener { private static Logger logger = LoggerFactory.getLogger(OrderProcessReceiver.class); String msg = "The failed message will auto retry after a certain delay"; @Override public void onMessage(Message message, Channel channel) throws Exception { try { processMessage(message); } catch (Exception e) { // 如果发生了异常,则将该消息重定向到缓冲队列,会在一定延迟之后自动重做 channel.basicPublish(OrderQueueConfig.ORDER_PRE_EXCHANGE_NAME, OrderQueueConfig.ORDER_PRE_TTL_DELAY_QUEUE_NAME, null, msg.getBytes()); } } /** * 处理订单消息,如果订单未支付,取消订单(如果当消息内容为FAIL_MESSAGE的话,则需要抛出异常) * * @param message * @throws Exception */ public void processMessage(Message message) throws Exception { String realMessage = new String(message.getBody()); logger.info("Received <" + realMessage + ">"); // 取消订单 if(!Objects.equals(realMessage, msg)) {// SpringKit.getBean(ITestService.class).resetSexById(Long.valueOf(realMessage)); System.out.println("测试111111-----------"+new Date()); System.out.println(message); } }}

或者

/** * 测试 rabbit 消费者 * * * @author Administrator * @version 1.0 * @Date 2018年9月25日 */@Component@RabbitListener(queues = TestQueueConfig.TEST_DELAY_PROCESS_QUEUE_NAME)public class TestProcessReceiver { private static Logger logger = LoggerFactory.getLogger(TestProcessReceiver.class); String msg = "The failed message will auto retry after a certain delay"; @RabbitHandler public void onMessage(Message message, Channel channel) throws Exception { try { processMessage(message); //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 如果发生了异常,则将该消息重定向到缓冲队列,会在一定延迟之后自动重做 channel.basicPublish(TestQueueConfig.TEST_PRE_EXCHANGE_NAME, TestQueueConfig.TEST_PRE_TTL_DELAY_QUEUE_NAME, null, msg.getBytes()); } } /** * 处理订单消息,如果订单未支付,取消订单(如果当消息内容为FAIL_MESSAGE的话,则需要抛出异常) * * @param message * @throws Exception */ public void processMessage(Message message) throws Exception { String realMessage = new String(message.getBody()); logger.info("Received < " + realMessage + " >"); // 取消订单 if(!Objects.equals(realMessage, msg)) { System.out.println("测试111111-----------"+new Date()); }else { System.out.println("rabbit else..."); } }}

生产者

/** * 测试rabbitmq * * @return */ @RequestMapping(value = "/testrab") public String testraa() { GenericResult gr = null; try { String name = "test_pre_ttl_delay_queue"; long expiration = 10000;//10s 过期时间 rabbitTemplate.convertAndSend(name,String.valueOf(123456)); // 在单个消息上设置过期时间 //rabbitTemplate.convertAndSend(name,(Object)String.valueOf(123456), new ExpirationMessagePostProcessor(expiration)); } catch (ServiceException e) { e.printStackTrace(); gr = new GenericResult(StateCode.ERROR, languageMap.get("network_error"), e.getMessage()); } return getWrite(gr); }

上述就是小编为大家分享的Java搭建RabbitMq消息中间件过程是怎么样的了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。

订单 队列 消息 消费 时间 交换机 缓冲 消费者 内容 实际 处理 测试 生产 生产者 延迟 中间件 过程 也就是 文件 服务器 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 360网络安全实验室数据 烟草软件开发公司 安全数据库系统课程设计 湖南web靶场选择东塔网络安全 软件开发团队中的人到点就走 守望先锋有多少服务器 云服务器集群管理 现代版数据库软件 文科生想报计算机网络技术怎么办 服务器内部构造解释图 车载gps 服务器要求 单片机微型数据库 qq邮箱 服务器安全性 济南应用软件开发哪家公司好 代理IP服务器是什么 数控系统中的网络技术 中小学生守则网络安全教育 ps4都有哪些服务器 网络安全宣传周乱晒照片的后果 项目计划书对软件开发的作用 郑州华夏宏图网络技术有限公司 深圳互联网科技加盟代理 数据库中增加字段长度的字句 如何做好客户网络安全管理工作 哪个城市软件开发商最多 dhcp服务器怎么搭建 qq空间小秘书服务器 初中暑假网络安全内容手抄报 谷歌地球服务器地址 软件开发可以办个体执照吗
0