千家信息网

一次KAFKA消费者异常引起的思考

发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,问题描述:线上出现一台服务器特别慢,于是关闭了服务器上的kafka broker. 关闭后发现一些kafka consumer无法正常消费数据了, 日志错误:o.a.kakfa.clients.con
千家信息网最后更新 2025年12月01日一次KAFKA消费者异常引起的思考
问题描述:

线上出现一台服务器特别慢,于是关闭了服务器上的kafka broker. 关闭后发现一些kafka consumer无法正常消费数据了, 日志错误:
o.a.kakfa.clients.consumer.internals.AbstractCordinator Marking the coordinator (39.0.2.100) as dead.

原因:

经过一番排查,发现consumer group信息:
(kafka.coordinator.GroupMetadataMessageFormatter类型):
groupId::[groupId,Some(consumer),groupState,Map(memberId -> [memberId,clientId,clientHost,sessionTimeoutMs], ...->[]...)],
存到了KAFKA内部topic: __consumer_offsets里, , 它的key是 groupId.
同时发现broker 参数 offsets.topic.replication.factor 错误地被设置为1. 这个参数表示TOPIC: __Consumer_offsets 的副本数. 这样一旦某个broker被关闭, 如果被关闭的Broker 是__Consumer_offsets的某些partition的Leader. 则导致某些consumer group 不可用. 如果一旦broker已经启动, 需要手工通过命令行来扩展副本数.

reassignment.json:{"version":1, "partitions": [{"topic": "xxx", "partition": 0, "replicas": {brokerId1, brokerId2}}]}kafka-reassign-partitions  --zookeeper localhost:2818 --reassignment-json-file  reassignment.json --execute

客户端寻找Consumer Coordinator的过程:
客户端 org.apache.kafka.clients.consumer.internals.AbstractCoordinator
如果Coordinator 未知 (AbstractCoordinator.coordinatorUnknown()), 发起请求 lookupCoordinator,向负载最低的节点发送FindCoordinatorRequest

服务端 KafkaApis.handleFindCoordinatorRequest 接收请求:
首先调用 GroupMetaManager.partitionFor(consumerGroupId) consunerGroupId 的 hashCode 对 __consumer_offsets 总的分片数取模获取partition id 再从 __consumer_offset 这个Topic 中找到partition对应的 Partition Metadata, 并且获取对应的Partition leader 返回给客户端

引伸思考

KAFKA 的failover机制究竟是怎么样的?假使 __consumer_offset 设置了正确的副本数,重选举的过程是怎样的. 如果broker宕机后导致某些副本不可用, 副本会自动迁移到其他节点吗?带着这些问题稍微阅读了一下KAFKA的相关代码:

当一个Broker 被关掉时, 会有两步操作:
KafkaController.onBrokerFailure ->KafkaController.onReplicasBecomeOffline
主要是通过 PartitionStateMachine.handleStateChanges 方法通知Partition状态机将状态置为offline. ReplicaStateMachine.handleStateChanges方法会将Replica 状态修改为OfflineReplica, 同时修改partition ISR. 如果被关闭broker 是partition leader 那么需要重新触发partition leader 选举,最后发送LeaderAndIsrRequest获取最新的Leader ISR 信息.
KafkaController.unregisterBrokerModificationsHandler 取消注册的BrokerModificationsHandler 并取消zookeeper 中broker 事件的监听.

当ISR请求被发出,KafkaApis.handleLeaderAndIsrRequest() 会被调用. 这里如果需要变更leader的partition是属于__consumer_offset这个特殊的topic,取决于当前的broker节点是不是partition leader. 会分别调用GroupCoordinator.handleGroupImmigrationGroupCoordinator.handleGroupEmmigration. 如果是partition leader, GroupCoordinator.handleGroupImmigration -> GroupMetadataManager.loadGroupsForPartition 会重新从 __consumer_offset 读取group数据到本地metadata cache, 如果是partition follower, GroupCoordniator.handleGroupImigration -> GroupMetadataManager.removeGroupsForPartition 会从metadata cache中移除group信息. 并在onGroupUnloaded回调函数中将group的状态变更为dead. 同时通知所有等待join或者sync的组成员.

KAFKA在Broker关闭时不会自动做partition 副本的迁移. 这时被关闭的Broker上的副本变为under replicated 状态. 这种状态将持续直到Broker被重新拉起并且追上新的数据, 或者用户通过命令行 手动复制副本到其他节点.

官方建议设置两个参数来保证graceful shutdown. controlled.shutdown.enable=true auto.leader.rebalance.enable=true前者保证关机之前将日志数据同步到磁盘,并进行重选举. 后者保证在broker重新恢复后再次获得宕机前leader状态. 避免leader分配不均匀导致读写热点.

Reference

https://blog.csdn.net/zhanglh046/article/details/72833129
https://blog.csdn.net/huochen1994/article/details/80511038
https://www.jianshu.com/p/1aba6e226763

副本 状态 数据 节点 信息 参数 同时 客户 客户端 保证 服务 选举 命令 方法 日志 服务器 过程 错误 问题 消费 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 如何查看数据库数据是否存在乱码 互联网科技报文字图片 哈尔滨市一乐软件开发 软件开发工资很低咋办 佛山银飞网络技术有佛山 db2 查询数据库 数据库系统管理的意义 江苏手机直播app软件开发 西安西西弗斯软件开发哪家好 实时数据库的特点 cmd查看服务器上的文本内容 中小型企业erp软件开发 深圳赛盈网络技术有限公司 全栈网络安全专家百度云 rpc服务器导致无法开机 抗疫用了哪些网络技术 数据库有索引导入很慢 数据库带端口的怎么连接 南充市老虎云网络技术 带数据库的程序 方舟生存进化手游服务器自动重启 小葡萄网络安全技能大赛 网络安全隐患排查治理月报 opcua服务器直接连接plc 弹性 网络安全 手机怎么修改网络安全密码 王牌竞速为什么连接不了服务器 无线网络安全性选择 软件开发优秀员工评选标准 你对计算机网络技术了解多少
0