consumer数量变化会怎样
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,本篇文章给大家分享的是有关consumer数量变化会怎样,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。ConsumerManagerpu
千家信息网最后更新 2025年12月03日consumer数量变化会怎样
本篇文章给大家分享的是有关consumer数量变化会怎样,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

ConsumerManagerpublic boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, final SetsubList, boolean isNotifyConsumerIdsChangedEnable) { ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); if (null == consumerGroupInfo) { ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere); ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp); consumerGroupInfo = prev != null ? prev : tmp; } boolean r1 = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere); boolean r2 = consumerGroupInfo.updateSubscription(subList); if (r1 || r2) { if (isNotifyConsumerIdsChangedEnable) { //通知同组内的其他consumer this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); } } this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList); return r1 || r2;}public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo, boolean isNotifyConsumerIdsChangedEnable) { ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); if (null != consumerGroupInfo) { consumerGroupInfo.unregisterChannel(clientChannelInfo); if (consumerGroupInfo.getChannelInfoTable().isEmpty()) { ConsumerGroupInfo remove = this.consumerTable.remove(group); if (remove != null) { log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group); this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group); } } if (isNotifyConsumerIdsChangedEnable) { //单向通知channel this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); } }}
DefaultConsumerIdsChangeListener@Overridepublic void handle(ConsumerGroupEvent event, String group, Object... args) { case CHANGE: if (args == null || args.length < 1) { return; } List channels = (List) args[0]; if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) { //对组内的其他consumer的channel连接发送单向通知(不管对方有木有收到) for (Channel chl : channels) { this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group); } } break;} Broker2Clientpublic void notifyConsumerIdsChanged( final Channel channel, final String consumerGroup) { if (null == consumerGroup) { log.error("notifyConsumerIdsChanged consumerGroup is null"); return; } NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader(); requestHeader.setConsumerGroup(consumerGroup); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader); try { this.brokerController.getRemotingServer().invokeOneway(channel, request, 10); } catch (Exception e) { //发送异常,只是打印log log.error("notifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage()); }}通知channel是单向的,也就是不管对方有没有答复,都认为发送成功了,这样会有两种情况发生:
channel收到消息:收到消息后,channel会触发rebalance,正常逻辑
channel没收到消息:该consumer不会触发rebalance,存在问题!
register:该consumer不知道已经有新的consumer加入,造成同一个mq会有多个consumer进行消费
unregister:该consumer不知道有consumer下线,造成部分mq没有consumer负责消费
我们先看unregister这种情况
在consumer启动时,会同时启动一个RebalanceService线程,这个线程做的事就是每隔20秒主动进行一次rebalance,这样就能把unregister这种影响降低,最多导致该mq的消息会延迟20秒之后才有consumer负责消费
RebalanceServiceprivate static long waitInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));@Overridepublic void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { this.waitForRunning(waitInterval); this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + " service end");}接下来分析比较大条的Register
同一个mq在同一组内有不同的consumer消费,这种情况在clustering模式下是有大问题的,会造成重复消费,消费进度错误等问题,带着rocketmq应该不至于犯如此低级错误的想法再继续看代码,果然别有洞天
RebalanceImplprivate void rebalanceByTopic(final String topic, final boolean isOrder) { //rebalance过程 //关键点在这,在上面rebalance完之后, 就能知道自己该负责哪些mq的消费 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);}private boolean updateProcessQueueTableInRebalance(final String topic, final Set mqSet, final boolean isOrder) { for (MessageQueue mq : mqSet) { //如果是新增的mq,会尝试调用远程broker lock mq,获取锁失败,则说明有其他consumer获取了锁,自己应该放弃消费该mq if (!this.processQueueTable.containsKey(mq)) { if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } } }} 以上就是consumer数量变化会怎样,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。
消费
消息
单向
情况
问题
数量
变化
对方
就是
更多
知识
篇文章
线程
部分
错误
不同
低级
实用
主动
成功
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
企业网络安全工作责任制度
安卓软件开发用软件
数据库的技术及运用
数据库清除单据语句
如何避免需求和软件开发不一致
服务器组策略管理器
人工管理阶段数据库的特点
商品房销售管理系统数据库
我的世界服务器管理员方块模组
北辰区电子网络技术售后服务
自己制作的服务器
方舟 非专用多人服务器
软件开发可以考红帽吗
数据库中的r-s
ps5版cod15连接服务器
服务器 360安全模式
美国软件开发平均年薪
小学网络安全工作小组
浙江通信网络安全防护方案设计
电网公司信息网络安全顺口溜
发票机安全接入服务器
监控服务器开机需要多长时间
贵阳多媒体安全文化展厅软件开发
口碑好的存储服务器哪里有
泰格收银软件连接不上数据库
opc服务器模拟软件
荆州软件开发招生
山东土地资产管理软件开发
数据库多表查询字段
数据库管理与日常维护说明