千家信息网

storm中如何自定义数据分组

发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,今天就跟大家聊聊有关storm中如何自定义数据分组,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。数据流组设计一个拓扑时,你要做的最重要的事情之
千家信息网最后更新 2025年12月02日storm中如何自定义数据分组

今天就跟大家聊聊有关storm中如何自定义数据分组,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

数据流组

设计一个拓扑时,你要做的最重要的事情之一就是定义如何在各组件之间交换数据(数据流是如何被bolts消费的)。一个数据流组指定了每个bolt会消费哪些数据流,以及如何消费它们。

storm自带数据流组

随机数据流组

随机流组是最常用的数据流组。它只有一个参数(数据源组件),并且数据源会向随机选择的bolt发送元组,保证每个消费者收到近似数量的元组。

 builder.setBolt("word-counter", new WordCounter()).shuffleGrouping("word-normalizer");

域数据流组

域数据流组允许你基于元组的一个或多个域控制如何把元组发送给bolts。它保证拥有相同域组合的值集发送给同一个bolt。回到单词计数器的例子,如果你用word域为数据流分组,word-normalizer bolt将只会把相同单词的元组发送给同一个word-counterbolt实例。

 builder.setBolt("word-counter", new WordCounter(),2)           .fieldsGrouping("word-normalizer", new Fields("word"));

全部数据流组

全部数据流组,为每个接收数据的实例复制一份元组副本。这种分组方式用于向bolts发送信号。比如,你要刷新缓存,你可以向所有的bolts发送一个刷新缓存信号。在单词计数器的例子里,你可以使用一个全部数据流组,添加清除计数器缓存的功能

builder.setBolt("word-counter", new WordCounter(),2)           .fieldsGroupint("word-normalizer",new Fields("word"))           .allGrouping("signals-spout","signals");

直接数据流组

这是一个特殊的数据流组,数据源可以用它决定哪个组件接收元组

 builder.setBolt("word-counter", new WordCounter(),2)           .directGrouping("word-normalizer");

。与前面的例子类似,数据源将根据单词首字母决定由哪个bolt接收元组。要使用直接数据流组,在WordNormalizer bolt中,使用emitDirect方法代替emit。

public void execute(Tuple input) {        ...        for(String word : words){            if(!word.isEmpty()){                ...                collector.emitDirect(getWordCountIndex(word),new Values(word));            }        }        //对元组做出应答        collector.ack(input);    }    public Integer getWordCountIndex(String word) {        word = word.trim().toUpperCase();        if(word.isEmpty()){            return 0;        }else{            return word.charAt(0) % numCounterTasks;        }    }

在prepare方法中计算任务数

 public void prepare(Map stormConf, TopologyContext context,                 OutputCollector collector) {        this.collector = collector;        this.numCounterTasks = context.getComponentTasks("word-counter");    }

全局数据流组

全局数据流组把所有数据源创建的元组发送给单一目标实例(即拥有最低ID的任务)。

不分组

这个数据流组相当于随机数据流组。也就是说,使用这个数据流组时,并不关心数据流是如何分组的。

自定义数据流组

storm自定义数据流组和hadoop Partitioner分组很相似,storm自定义分组要实现CustomStreamGrouping接口,接口源码如下:

public interface CustomStreamGrouping extends Serializable { void prepare(WorkerTopologyContext context, GlobalStreamId stream, List targetTasks); List chooseTasks( int taskId, List values); }

targetTasks就是Storm运行时告诉你,当前有几个目标Task可以选择,每一个都给编上了数字编号。而 chooseTasks(int taskId, List values); 就是让你选择,你的这条数据values,是要哪几个目标Task处理?

这是我写的一个自定义分组,总是把数据分到第一个Task:

public class MyFirstStreamGrouping implements CustomStreamGrouping { private static Logger log = LoggerFactory.getLogger(MyFirstStreamGrouping. class ); private List tasks; @Override public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List targetTasks) { this .tasks = targetTasks; log.info(tasks.toString()); } @Override public List chooseTasks( int taskId, List values) { log.info(values.toString()); return Arrays.asList(tasks.get( 0 )); } }

从上面的代码可以看出,该自定义分组会把数据归并到第一个TaskArrays.asList(tasks.get(0));,也就是数据到达后总是被派发到第一组。和Hadoop不同的是,Storm允许一条数据被多个Task处理,因此返回值是List .就是让你来在提供的 'List targetTasks' Task中选择任意的几个(必须至少是一个)Task来处理数据。

第二个自定义分组,wordcount中使首字母相同的单词交给同一个bolt处理:

public class ModuleGrouping implements CustormStreamGrouping{        int numTasks = 0;        @Override        public List chooseTasks(List values) {            List boltIds = new ArrayList();            if(values.size()>0){                String str = values.get(0).toString();                if(str.isEmpty()){                    boltIds.add(0);                }else{                    boltIds.add(str.charAt(0) % numTasks);                }            }            return boltIds;        }        @Override        public void prepare(TopologyContext context, Fields outFields, List targetTasks) {            numTasks = targetTasks.size();        }    }

这是一个CustomStreamGrouping的简单实现,在这里我们采用单词首字母字符的整数值与任务数的余数,决定接收元组的bolt。

builder.setBolt("word-normalizer", new WordNormalizer())           .customGrouping("word-reader", new ModuleGrouping());

看完上述内容,你们对storm中如何自定义数据分组有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。

数据 数据流 分组 单词 数据源 就是 处理 消费 选择 相同 任务 例子 内容 字母 实例 目标 组件 缓存 计数器 这是 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 网络安全文库库 使用命令访问FTP服务器 计算机软件开发的基金有哪些 南开区网络安全和信息化委员会 我国网络安全和政治的联系 查询服务器的bmc地址 定制软件开发预算金额 供应串口联网服务器公司 广州聚家互联网科技有限公司 五年来软件开发行业平均指标 开展网络安全法治 权汇网络技术 青岛智能未来软件开发 百隆科技公司互联网流量 数据库各表之间的计算 知名软件开发优质推荐 每次都显示无法连接到服务器 php安装数据库 网络安全板块怎么操作 脏小豆服务器编号是多少 长春汇网络技术有限公司 网络安全主题简单又漂亮的手抄报 疫苗网络安全手抄报该写什么 海南万稷网络技术有限公司旗下游戏 假面骑士帝骑把b站服务器炸了 存储技术和数据库设计 数据库读一致性场景 江苏软件开发的价格 数据库响应慢如何解决 职中计算机网络技术怎么样
0