千家信息网

Storm-kafka接口怎么实现

发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,本篇内容主要讲解"Storm-kafka接口怎么实现",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Storm-kafka接口怎么实现"吧!阅读背景: 如有
千家信息网最后更新 2025年12月02日Storm-kafka接口怎么实现

本篇内容主要讲解"Storm-kafka接口怎么实现",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Storm-kafka接口怎么实现"吧!

阅读背景: 如有需要,尽情参看本空间的另外一篇文档

阅读目的:了解Storm 如何来封装kafka接口,如何处理Connection连接的封装性问题

package com.mixbox.storm.kafka;import kafka.javaapi.consumer.SimpleConsumer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.mixbox.storm.kafka.trident.IBrokerReader;import java.util.HashMap;import java.util.HashSet;import java.util.Map;import java.util.Set;/** * 2014/07/22 * 动态的【分区连接】 * @author Yin Shuai */public class DynamicPartitionConnections {        public static final Logger LOG = LoggerFactory                        .getLogger(DynamicPartitionConnections.class);        /**         * 持有了一个 kafka底层的SimpleConsumer对象         * 持有了  具体的分区         *          * @author Yin Shuai         */                        static class ConnectionInfo {                //内部维持了一个SimpleConsumer                SimpleConsumer consumer;                                //分区                Set partitions = new HashSet();                public ConnectionInfo(SimpleConsumer consumer) {                        this.consumer = consumer;                }        }        /**         * 也就是kafka的每一个节点都维持了一个COnnectionInfo,ConnectionInfo         */        Map _connections = new HashMap();        // kafkaConfig        KafkaConfig _config;        /**         * IBrokerReader 基本上 IbroerReader这里初始化的是ZkBrokerReader         */        IBrokerReader _reader;        /**         * @param config         *            kafka配置         * @param brokerReader         *            IBrokerReader-用于拿到当前的接口         */        public DynamicPartitionConnections(KafkaConfig config,                        IBrokerReader brokerReader) {                _config = config;                _reader = brokerReader;        }        /**         * @param partition  分区         * @return         */        public SimpleConsumer register(Partition partition) {                /**                 * 依据你所拥有的partition号,拿到你所对应的Broker                 * GlobalPartitionInformation中有Map                 * partitionMap,记录了分区号与Broker所对应的关系                 */                Broker broker = _reader.getCurrentBrokers().getBrokerFor(                                partition.partition);                return register(broker, partition.partition);        }        /**         * @param host         *            主机         * @param partition         *            分区         * @return 底层的SimpleConsumer 对象,这里存在一个注册的行为,将主机和端口【broker】,和分区【partition】 注册到 connections连接之中         */        public SimpleConsumer register(Broker host, int partition) {                // Map _connections = new HashMap();                //如果连接之中没有包含了Broker,那么建立一个新的连接,并且将这个  主机和连接注册到  _connections之中                if (!_connections.containsKey(host)) {                        _connections.put(host, new ConnectionInfo(new SimpleConsumer(                                        host.host, host.port, _config.socketTimeoutMs,                                        _config.bufferSizeBytes, _config.clientId)));                }                                // ---------   在这里,不管之前有没有都只取一次 -------------                                //当包含了,那就直接取出                ConnectionInfo info = _connections.get(host);                info.partitions.add(partition);                return info.consumer;        }        public SimpleConsumer getConnection(Partition partition) {                // ConnectionInfo 之中封装了一个simpleConsumer                ConnectionInfo info = _connections.get(partition.host);                if (info != null) {                        return info.consumer;                }                return null;        }        /**         * @param port    固定的Broker         * @param partition  固定的分区         */        public void unregister(Broker port, int partition) {                ConnectionInfo info = _connections.get(port);                info.partitions.remove(partition);                if (info.partitions.isEmpty()) {                        info.consumer.close();                        _connections.remove(port);                }        }        public void unregister(Partition partition) {                unregister(partition.host, partition.partition);        }        public void clear() {                for (ConnectionInfo info : _connections.values()) {                        info.consumer.close();                }                _connections.clear();        }}

与前文有关

1: 在DynamicPartitionConnections之中,我们持有了一个 IBrokerReader的接口对象。

2 : 由于IBrokerReader 派生出了

2.1 StaticBrokerReader

2.2 ZBrokerReader

在这个序列的一系列博文之中,ZBrokerReader已经进行了详尽的分析,并且在赋值的过程之中,IBrokerReader也是实例化为ZBrokerReader了。

内部类:

DynamicPartitionConnections 持有了一个 CinnectionInfo的内部类

static class ConnectionInfo {                //内部维持了一个SimpleConsumer                SimpleConsumer consumer;                                //分区                Set partitions = new HashSet();                public ConnectionInfo(SimpleConsumer consumer) {                        this.consumer = consumer;                }        }

1: 对于每一个Connection内部都维持了一个SimpleConsumer ,以及一个 Set集合 partitions

2 :在DynamicPartitionConnections里面我们维持了一个_connections的对象

Map _connections = new HashMap();

3 :在连接维护之中,关键的地方是维护一个 register注册的行为:

public SimpleConsumer register(Broker host, int partition) {

4: 如果_connections之中没有包含Broker,那么将会再建立一个新的连接,并且将Broker和Connection 注册到_connections之中

5:在注册的过程之中,不包含就注册,最后都直接取出SimpleConsumer,这个SimpleConsumer

封装了

new ConnectionInfo(new SimpleConsumer(

host.host, host.port, _config.socketTimeoutMs,

_config.bufferSizeBytes, _config.clientId)):

到此,相信大家对"Storm-kafka接口怎么实现"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

之中 接口 对象 封装 主机 内容 底层 行为 过程 学习 实用 更深 详尽 也就是 关键 兴趣 动态 地方 实例 实用性 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 服务器ddr3能用普通主板吗 代理服务器的分类 阿里云服务器文件上传 it软件开发是程序员吗 永定区标乾网络技术工作室 迁安市网络安全工作会议 常熟网络技术共同合作 局域网 服务器 网站 湖南软件开发工程师培训哪家好 做三维设计用服务器可以吗 号牌抓取软件开发 江西省网络安全总队总工 骑士人才数据库 软件技术与网络技术关联性 django存储数据库数据 软件开发矩阵式组织架构缺点 oa系统服务器网址 网络安全风险处置报告 网络安全 述职报告 网络安全合作发展 迅通网络安全防护是真的吗 计算机网络技术考什么内容 计算机网络技术中职要点 哪些专科有网络安全专业 上海软件开发五年 收入 罗布乐思12月8日停止服务器 啥时候上架 在微信里发表情包服务器会保存吗 我为安全献一计网络安全口号 网络安全审核报告范本 网络安全 述职报告
0