Storm-kafka中如何封装DynamicBrokerReader类
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,这篇文章主要介绍Storm-kafka中如何封装DynamicBrokerReader类,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!在细节上把握 DynamicBrokerR
千家信息网最后更新 2025年12月03日Storm-kafka中如何封装DynamicBrokerReader类
这篇文章主要介绍Storm-kafka中如何封装DynamicBrokerReader类,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
在细节上把握 DynamicBrokerReder的封装类 - ZkBrokerReader
package com.mixbox.storm.kafka.trident;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.mixbox.storm.kafka.DynamicBrokersReader;import com.mixbox.storm.kafka.ZkHosts;import java.util.Map;/** * 2014/07/22 * 在ZK中间拿到 GlobalPartitionInformation * * ZkBrokerReader 是对于DynamicBrokersReader的一个简单的封装 * @author Yin Shuai */public class ZkBrokerReader implements IBrokerReader { public static final Logger LOG = LoggerFactory .getLogger(ZkBrokerReader.class); GlobalPartitionInformation cachedBrokers; DynamicBrokersReader reader; long lastRefreshTimeMs; long refreshMillis; /** * * @param conf * @param topic * 指定topic的zkBrokerReader * @param hosts */ public ZkBrokerReader(Map conf, String topic, ZkHosts hosts) { reader = new DynamicBrokersReader(conf, hosts.brokerZkStr, hosts.brokerZkPath, topic); cachedBrokers = reader.getBrokerInfo(); lastRefreshTimeMs = System.currentTimeMillis(); refreshMillis = hosts.refreshFreqSecs * 1000L; } @Override public GlobalPartitionInformation getCurrentBrokers() { long currTime = System.currentTimeMillis(); // 很简单, 指定了你多长时间开始去刷新Brokerlibiao if (currTime > lastRefreshTimeMs + refreshMillis) { LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired"); cachedBrokers = reader.getBrokerInfo(); lastRefreshTimeMs = currTime; } return cachedBrokers; } @Override public void close() { reader.close(); }}总览我们的Code :
ZkBrokerReader 是对于 DynamicBrokersReader的一个简单封装,ZkBrokerReader之中持有2个主要的Class
1 GlobalPartitionInformatio cachedBroker;
2 DynamicBrokersReader reader;
3 long lastRefreshTimeMs; 最新的刷新时间
lastRefreshTimeMs = System.currentTimeMillis(); 最新的刷新时间为系统的当前时间
4 long refreshMillis
refreshMillis = host.refreshFreqSecs * 1000L 设定刷新的毫秒数为
5
public GlobalPartitionInformation getCurrentBrokers() { long currTime = System.currentTimeMillis(); // 很简单, 指定了你多长时间开始去刷新Brokerlibiao if (currTime > lastRefreshTimeMs + refreshMillis) { LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired"); cachedBrokers = reader.getBrokerInfo(); lastRefreshTimeMs = currTime; } return cachedBrokers; }每一次调用getCurrentBrokers,首先会取System.currentTimeMillis 当当前的系统时间超过了 最早的刷新时间+刷新
的间隔,就会再次的去跟新:
cachedBrokers = reader.getBrokerInfo(); getBrokerInfo()方法每调用一次,也就重新在zk之中重新去取
一次。
ZkBrokerReader是对于DynamicBrokerReader的一个封装,DynamicBrokerReader的Dynamic性质并不程序动态的因数,而只是简单在读取ZK数据的过程之中,Zk数据已经动态的发生变化?
以上是"Storm-kafka中如何封装DynamicBrokerReader类"这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注行业资讯频道!
时间
封装
之中
内容
动态
数据
篇文章
系统
价值
兴趣
再次
只是
因数
小伙
小伙伴
性质
方法
更多
知识
程序
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库监控分析
个人数据库开发
软件开发行业监管机构
服务器如何排查nmap
数据库文件由哪两大文件构成
winform监控云数据库变化
网络安全知识竞赛活动策划案
软件开发的用户群
防范网络安全等风险人民网
企业网络安全汇报
数据库ado技术原理
沈阳志微君安网络技术
hp服务器是什么
服务器lldp开启
北京赛通网络技术公司
网约车平台数据库接入
提倡注意网络安全的标题
数据库自动生成主键
k8s有搭载在服务器上的么
软件开发有哪些课件
图数据库实现oneid
珠海金融软件开发定制
诸暨市宇君网络技术有限公司
数据库安全网关功能
软件开发 周记
万应互联网科技有限公司
安卓手机关闭vpn服务器
南充网络技术是什么
包头网络技术操作
fifaol+3韩服数据库