RabbitMQ用多路由,多队列来破除流控
发表于:2025-12-06 作者:千家信息网编辑
千家信息网最后更新 2025年12月06日,本篇内容主要讲解"RabbitMQ用多路由,多队列来破除流控",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"RabbitMQ用多路由,多队列来破除流控"吧!
千家信息网最后更新 2025年12月06日RabbitMQ用多路由,多队列来破除流控
本篇内容主要讲解"RabbitMQ用多路由,多队列来破除流控",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"RabbitMQ用多路由,多队列来破除流控"吧!
流控机制是我们在使用RabbitMQ最头疼的问题,一旦并发激增时,消费者消费队列消息就像滴水一样慢。
现在我们下单后,需要给通知中心发送消息,让通知中心通知服务商收取订单,并确认提供服务。
我们先给Order接口添加一个发送消息的方法。
public interface Order {public void makeOrder(Order order); public OrderSuccessResult getResult(Order order); public void postOrder(Order order);}实现类实现该方法
@Data@AllArgsConstructor@NoArgsConstructor@ServiceOrderVersion(value = 1)@RequiredArgsConstructorpublic class ServiceOrder extends AbstractOrder {private Long id; @NonNull private String code; @NonNull private Store store; @NonNull private ProviderService service; @NonNull private Car car; @NonNull private Date serviceDate; @NonNull private String contact; @NonNull private String contactTel; private AppUser user; @NonNull private String content; private int status; private Date createDate; @Override public void makeOrder(Order order) { ServiceOrderDao serviceOrderDao = SpringBootUtil.getBean(ServiceOrderDao.class); IdService idService = SpringBootUtil.getBean(IdService.class); ((ServiceOrder)order).setId(idService.genId()); ((ServiceOrder)order).setCode(getCodeInfo(idService)); AppUser loginAppUser = AppUserUtil.getLoginAppUser(); AppUser user = new AppUser(); user.setId(loginAppUser.getId()); user.setUsername(loginAppUser.getUsername()); ((ServiceOrder)order).setUser(user); ((ServiceOrder)order).setStatus(1); ((ServiceOrder)order).setCreateDate(new Date()); serviceOrderDao.save((ServiceOrder) order); }@Override public OrderSuccessResult getResult(Order order) { ServiceOrderSuccessResultFactory orderSuccessResultFactory = SpringBootUtil.getBean(ServiceOrderSuccessResultFactory.class); this.orderSuccessResult = orderSuccessResultFactory.getOrderSuccessResult(); return this.orderSuccessResult.getResult(order); }@Override public void postOrder(Order order) { MessageSender sender = SpringBootUtil.getBean(MessageSender.class); CompletableFuture.runAsync(() ->sender.send(OwnerCarCenterMq.MQ_EXCHANGE_ORDER, OwnerCarCenterMq.ROUTING_KEY_ORDER, order) ); }private String getCodeInfo(IdService idService) { String flow = String.valueOf(idService.genId()); flow = flow.substring(14,flow.length()); String pre = DateUtils.format(new Date(), DateUtils.pattern9); return pre + flow; }}其中我们定义了这么一组队列名,交换机,和路由
public interface OwnerCarCenterMq {/** * 队列名 */ String ORDER_QUEUE = "order"; /** * 服务系统exchange名 */ String MQ_EXCHANGE_ORDER = "order.topic.exchange"; /** * 服务添加routing key */ String ROUTING_KEY_ORDER = "post.order";}为了避免流控,我们定义了10个队列,并全部绑定到一个交换机上。
@Configurationpublic class RabbitmqConfig { @Bean public List orderQueues() { List queues = new ArrayList<>(); for (int i = 1;i < 11;i++) { Queue queue = new Queue(OwnerCarCenterMq.ORDER_QUEUE + "_" + i); queues.add(queue); } return queues; } @Bean public TopicExchange orderExchange() { return new TopicExchange(OwnerCarCenterMq.MQ_EXCHANGE_ORDER); } @Bean public List bindingOrders() { List bindings = new ArrayList<>(); for (int i = 1;i < 11;i++) { Binding binding = BindingBuilder.bind(orderQueues().get(i - 1)).to(orderExchange()) .with(OwnerCarCenterMq.ROUTING_KEY_ORDER + "_" + i); bindings.add(binding); } return bindings; }} 重新封装消息提供者,每次发送都随机选取一个路由来进行发送。
@Slf4j@Componentpublic class MessageSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {@Autowired private RabbitTemplate rabbitTemplate; public void send(String exchange,String routingKey,Object content) {log.info("send content=" + content); this.rabbitTemplate.setMandatory(true); this.rabbitTemplate.setConfirmCallback(this); this.rabbitTemplate.setReturnCallback(this); ThreadLocalRandom random = ThreadLocalRandom.current(); this.rabbitTemplate.convertAndSend(exchange,routingKey + "_" + random.nextInt(1,11),serialize(content)); }/** * 确认后回调: * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) {if (!ack) {log.info("send ack fail, cause = " + cause); } else {log.info("send ack success"); } }/** * 失败后return回调: * * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey); }/** * 对消息对象进行二进制序列化 * @param o * @return */ private byte[] serialize(Object o) { Kryo kryo = new Kryo(); ByteArrayOutputStream stream = new ByteArrayOutputStream(); Output output = new Output(stream); kryo.writeObject(output, o); output.close(); return stream.toByteArray(); }}我们可以看到在ServiceOrder里,我们是通过异步来进行发送到。
Controller如下
@Slf4j@RestControllerpublic class OrderController {private ThreadLocal orderFactory = new ThreadLocal<>(); private ThreadLocal orderService = new ThreadLocal<>(); @Autowired private OrderBean orderBean; @Transactional @SuppressWarnings("unchecked")@PostMapping("/makeeorder")public Result makeOrder(@RequestBody String orderStr, @RequestParam("type") String type) {log.info(orderStr); Order order = setOrderFactory(orderStr,type); orderService.get().makeOrder(order); orderService.get().postOrder(order); return Result.success(orderService.get().getResult(order)); }/** * 判断是哪一种类型的订单来获取哪一种类型的具体订单工厂 * @param orderStr * @return */ private Order setOrderFactory(String orderStr,String type) { Class> classType = orderBean.getOrderMap().get(type); Object order = JSONObject.parseObject(orderStr, classType);// if (orderStr.contains("service")) {// order = JSON.parseObject(orderStr, ServiceOrder.class);// }else if (orderStr.contains("product")) {// order = JSON.parseObject(orderStr, ProductOrder.class);// } Class> classFactoryType = orderBean.getOrderFactoryMap().get(type + "Factory"); this.orderFactory.set((OrderFactory) SpringBootUtil.getBean(classFactoryType));// if (order instanceof ServiceOrder) {// this.orderFactory.set(SpringBootUtil.getBean(ServiceOrderFactory.class));// }else if (order instanceof ProductOrder) {// this.orderFactory.set(SpringBootUtil.getBean(ProductOrderFactory.class));// } orderService.set(orderFactory.get().getOrder()); return (Order) order; }} 最后是在我们的通知中心模块接收消息,同时对这10个队列实行监控
@Slf4j@Component@RabbitListener(queues = {OwnerCarCenterMq.ORDER_QUEUE + "_" + 1, OwnerCarCenterMq.ORDER_QUEUE + "_" + 2, OwnerCarCenterMq.ORDER_QUEUE + "_" + 3, OwnerCarCenterMq.ORDER_QUEUE + "_" + 4, OwnerCarCenterMq.ORDER_QUEUE + "_" + 5, OwnerCarCenterMq.ORDER_QUEUE + "_" + 6, OwnerCarCenterMq.ORDER_QUEUE + "_" + 7, OwnerCarCenterMq.ORDER_QUEUE + "_" + 8, OwnerCarCenterMq.ORDER_QUEUE + "_" + 9, OwnerCarCenterMq.ORDER_QUEUE + "_" + 10})public class ServiceOrderConsummer {@Getter private Queue serviceOrders = new ConcurrentLinkedDeque<>(); @RabbitHandler public void receiceOrder(byte[] data, Channel channel, Message message) throws IOException {try {//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); ServiceOrder order = unSerialize(data); this.serviceOrders.add(order); log.info(String.valueOf(order)); } catch (IOException e) { e.printStackTrace(); //丢弃这条消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); log.info("receiver fail"); } }/** * 反序列化 * @param data * @return */ private ServiceOrder unSerialize(byte[] data) { Input input = null; try { Kryo kryo = new Kryo(); input = new Input(new ByteArrayInputStream(data)); return kryo.readObject(input,ServiceOrder.class); }finally { input.close(); } }} 项目启动后,我们可以看到rabbitmq的情况如下
现我们来对其进行压测,启动Jmeter,我们使用1000线程来进行压测测试。各配置如下
保存文件上传服务器,因为本人是华为云的服务器,故在服务器上进行压测,不进行远程压测
在服务器的jmeter的bin目录下输入
./jmeter -n -t model/rabbit.jmx -l log.jtl
这里-n为不启动图形界面,-t使用我们上传的配置文件,-l记录日志
压测结果如下
我们在压测过程中来看一下rabbitmq的UI界面
消费基本上是实时的,没有出现流控积压现象。
到此,相信大家对"RabbitMQ用多路由,多队列来破除流控"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
消息
服务
队列
服务器
路由
消费
方法
订单
内容
序列
文件
界面
类型
学习
配置
头疼
实用
更深
二进制
交换机
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
关于读秀数据库的检索功能
网络安全管理局就黑卡
ca证书服务器
软件开发假学历
网络安全事件都有哪些形式
广州店铺帮网络技术有限公司客服
网络安全科学院院士
数据库级联更新触发器
郴州市学计算机软件开发培训中心
做信息数据库的申请
会计核算数据库名称怎么填
小成旭软件开发
服务器地址映射到外网安全
网络安全管理先进
医院信息科数据库要怎么维护
无线网络安全检测
网络安全等级保护建设小组
简述在数据库管理技术中
第二届大学生网络安全
access数据库显示表
进口串口设备服务器哪家好
达梦数据库缓存同步
2网络安全ppt
上海智能软件开发成交价
游戏软件开发流程ppt讲解
钉钉公司员工信息对接数据库
数据库文件如何降版本
获取更新服务器失败闪耀暖暖
讯仁网络技术工作室
计算机软件开发需要掌握什么