Storm-kafka中如何理解ZkCoordinator的过程
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,Storm-kafka中如何理解ZkCoordinator的过程,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。梳理ZkCoordin
千家信息网最后更新 2025年12月02日Storm-kafka中如何理解ZkCoordinator的过程
Storm-kafka中如何理解ZkCoordinator的过程,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
梳理ZkCoordinator的过程
package com.mixbox.storm.kafka;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.mixbox.storm.kafka.trident.GlobalPartitionInformation;import java.util.*;import static com.mixbox.storm.kafka.KafkaUtils.taskId;/** * * * ZKCoordinator 协调器 * * @author Yin Shuai */public class ZkCoordinator implements PartitionCoordinator { public static final Logger LOG = LoggerFactory .getLogger(ZkCoordinator.class); SpoutConfig _spoutConfig; int _taskIndex; int _totalTasks; String _topologyInstanceId; // 每一个分区对应着一个分区管理器 Map _managers = new HashMap(); //缓存的List List _cachedList; //上次刷新的时间 Long _lastRefreshTime = null; //刷新频率 毫秒 int _refreshFreqMs; //动态分区连接 DynamicPartitionConnections _connections; //动态BrokersReader DynamicBrokersReader _reader; ZkState _state; Map _stormConf; /** * * @param connections * 动态的 分区连接 * @param stormConf * Storm的配置文件 * @param spoutConfig * Storm sput的配置文件 * @param state * 对于ZKState的连接 * @param taskIndex * 任务 * @param totalTasks * 总共的任务 * @param topologyInstanceId * 拓扑的实例ID */ public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) { this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks, topologyInstanceId, buildReader(stormConf, spoutConfig)); } public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) { _spoutConfig = spoutConfig; _connections = connections; _taskIndex = taskIndex; _totalTasks = totalTasks; _topologyInstanceId = topologyInstanceId; _stormConf = stormConf; _state = state; ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts; _refreshFreqMs = brokerConf.refreshFreqSecs * 1000; _reader = reader; } /** * @param stormConf * @param spoutConfig * @return */ private static DynamicBrokersReader buildReader(Map stormConf, SpoutConfig spoutConfig) { ZkHosts hosts = (ZkHosts) spoutConfig.hosts; return new DynamicBrokersReader(stormConf, hosts.brokerZkStr, hosts.brokerZkPath, spoutConfig.topic); } @Override public List getMyManagedPartitions() { if (_lastRefreshTime == null || (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) { refresh(); _lastRefreshTime = System.currentTimeMillis(); } return _cachedList; } /** * 简单的刷新的行为 * */ void refresh() { try { LOG.info(taskId(_taskIndex, _totalTasks) + "Refreshing partition manager connections"); // 拿到所有的分区信息 GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo(); // 拿到自己任务的所有分区 List mine = KafkaUtils.calculatePartitionsForTask( brokerInfo, _totalTasks, _taskIndex); // 拿到当前任务的分区 Set curr = _managers.keySet(); // 构造一个集合 Set newPartitions = new HashSet(mine); // 在new分区中,移除掉所有 自己拥有的分区 newPartitions.removeAll(curr); // 要删除的分区 Set deletedPartitions = new HashSet(curr); // deletedPartitions.removeAll(mine); LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString()); for (Partition id : deletedPartitions) { PartitionManager man = _managers.remove(id); man.close(); } LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString()); for (Partition id : newPartitions) { PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id); _managers.put(id, man); } } catch (Exception e) { throw new RuntimeException(e); } _cachedList = new ArrayList(_managers.values()); LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing"); } @Override public PartitionManager getManager(Partition partition) { return _managers.get(partition); }} 1 : 首先 ZKCoorDinator 实现 PartitionCoordinator的接口
package com.mixbox.storm.kafka;import java.util.List;/** * @author Yin Shuai */public interface PartitionCoordinator { /** * 拿到我管理的分区列表 List{PartitionManager} * @return */ List getMyManagedPartitions(); /** * @param 依据制定的分区partition,去getManager * @return */ PartitionManager getManager(Partition partition);} 第一个方法拿到所有的 PartitionManager
第二个方法依据特定的 Partition去得到一个分区管理器
关于 Storm-kafka中如何理解ZkCoordinator的过程问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注行业资讯频道了解更多相关知识。
任务
过程
动态
方法
问题
管理
文件
更多
帮助
解答
配置
易行
简单易行
信息
内容
实例
小伙
小伙伴
拓扑
接口
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
公安宣传网络安全知识
华邦互联网科技
戴尔服务器红屏
服务器做raid步骤
东城区网络安全招标
泰拉瑞亚服务器地址
数据库安全解决方案
黑龙江电信服务器地址云空间
猎鹿人无法连接服务器
三级数据库填空题题库百度云
新建dns服务器
优迈服务器如何清除故障
无线打印机服务器安全吗
软件开发的目标是什么
速达数据库密码
怎样启动用友管理服务器
云数据库监控
数据库建立自定义数据类型
成华区源的网络技术工作室
两组数据提取相同的数据库
隐私计算技术分为数据库安全
网络技术与教育心得体会
世界服务器ip
如何把结构化数据库变成知识图谱
软件开发的部门的职责
有哪几种网络安全防护技术
网络安全常识十条大学生
安徽app软件开发平台有哪些
8个软件开发小工具
短视频原生态软件开发