千家信息网

如何进行Twitter Storm Stream Grouping编写自定义分组实现

发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,本篇文章为大家展示了如何进行Twitter Storm Stream Grouping编写自定义分组实现,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。##自定
千家信息网最后更新 2025年12月03日如何进行Twitter Storm Stream Grouping编写自定义分组实现

本篇文章为大家展示了如何进行Twitter Storm Stream Grouping编写自定义分组实现,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

##自定义Grouping测试

Storm是支持自定义分组的,本篇文章就是探究Storm如何编写一个自定义分组器,以及对Storm分组器如何分组数据的理解。

这是我写的一个自定义分组,总是把数据分到第一个Task:

public class MyFirstStreamGrouping implements CustomStreamGrouping {    private static Logger log = LoggerFactory.getLogger(MyFirstStreamGrouping.class);    private List tasks;    @Override    public void prepare(WorkerTopologyContext context, GlobalStreamId stream,                List targetTasks) {            this.tasks = targetTasks;            log.info(tasks.toString());    }           @Override    public List chooseTasks(int taskId, List values) {            log.info(values.toString());            return Arrays.asList(tasks.get(0));    }}

从上面的代码可以看出,该自定义分组会把数据归并到第一个TaskArrays.asList(tasks.get(0));,也就是数据到达后总是被派发到第一组。

测试代码:

TopologyBuilder builder = new TopologyBuilder();builder.setSpout("words", new TestWordSpout(), 2); //自定义分组,builder.setBolt("exclaim1", new DefaultStringBolt(), 3)            .customGrouping("words", new MyFirstStreamGrouping());

和之前的测试用例一样,Spout总是发送new String[] {"nathan", "mike", "jackson", "golda", "bertels"}列表的字符串。我们运行验证一下:

11878 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson11943 [Thread-41-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [nathan]11944 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan11979 [Thread-29-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike]11980 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike12045 [Thread-41-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson]12045 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson12080 [Thread-29-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson]12081 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson12145 [Thread-41-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike]12146 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike

从这个运行日志我们可以看出,数据总是派发到一个Blot:Thread-25-exclaim1。因为我时本地测试,Thread-25-exclaim1是线程名。而派发的线程是数据多个线程的。因此该测试符合预期,即总是发送到一个Task,并且这个Task也是第一个。

##理解自定义分组实现

自己实现一个自定义分组难吗?其实如果你理解了Hadoop的Partitioner,Storm的CustomStreamGrouping和它也是一样的道理。

Hadoop MapReduce的Map完成后会把Map的中间结果写入磁盘,在写磁盘前,线程首先根据数据最终要传送到的Reducer把数据划分成相应的分区,然后不同的分区进入不同的Reduce。我们先来看看Hadoop是怎样把数据怎样分组的,这是Partitioner唯一一个方法:

public class Partitioner {    @Override    public int getPartition(K key, V value, int numReduceTasks) {        return 0;    }}

上面的代码中:Map输出的数据都会经过getPartition()方法,用来确定下一步的分组。numReduceTasks是一个Job的Reduce数量,而返回值就是确定该条数据进入哪个Reduce。返回值必须大于等于0,小于numReduceTasks,否则就会报错。返回0就意味着这条数据进入第一个Reduce。对于随机分组来说,这个方法可以这么实现:

public int getPartition(K key, V value, int numReduceTasks) {    return hash(key) % numReduceTasks;}

其实Hadoop 默认的Hash分组策略也正是这么实现的。这样好处是,数据在整个集群基本上是负载平衡的。

搞通了Hadoop的Partitioner,我们来看看Storm的CustomStreamGrouping。

这是CustomStreamGrouping类的源码:

public interface CustomStreamGrouping extends Serializable {   void prepare(WorkerTopologyContext context, GlobalStreamId stream, List targetTasks);   List chooseTasks(int taskId, List values); }

一模一样的道理,targetTasks就是Storm运行时告诉你,当前有几个目标Task可以选择,每一个都给编上了数字编号。而 chooseTasks(int taskId, List values); 就是让你选择,你的这条数据values,是要哪几个目标Task处理?

如上文文章开头的自定义分组器实现的代码,我选择的总是让第一个Task来处理数据, return Arrays.asList(tasks.get(0)); 。和Hadoop不同的是,Storm允许一条数据被多个Task处理,因此返回值是List.就是让你来在提供的 'List targetTasks' Task中选择任意的几个(必须至少是一个)Task来处理数据。

上述内容就是如何进行Twitter Storm Stream Grouping编写自定义分组实现,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注行业资讯频道。

数据 分组 就是 测试 代码 线程 处理 选择 不同 文章 方法 这是 运行 内容 多个 技能 目标 知识 磁盘 道理 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 国内app软件开发公司 华为手机不显示动数据库 软件开发实习生实习内容 联合国amis数据库 php获取数据库时间 电力系统网络安全的防范措施 网络安全路由器和防火墙区别 上海gps网络时间服务器地址 迁安信息网络技术不二之选 服务器 假期无人运行 安全 电脑网络安全防护未开启在哪设置 服务器日志记录如何查询 软件开发软件质量评价 幻塔白月破晓服务器可以转服没有 注意网络安全的英语作文 香港维普软件开发 企业管理系统 服务器 数据库 ole对象类型 借还款的信息报送金融数据库 海康威视平台服务器视频 女软件开发女生博士就业前 网络技术制度 住建局网络安全自检自查报告 网络安全领导小组主要职责 湖州智诚网络技术有限公司电话 安防nvr软件开发 服务器千兆还是万兆网卡好 小学生网络安全主题班会会标 桌面随机点名软件开发 网络安全问题咨询哪里
0