Storm中怎么使用Direct Grouping分组策略
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,这篇文章主要介绍"Storm中怎么使用Direct Grouping分组策略",在日常操作中,相信很多人在Storm中怎么使用Direct Grouping分组策略问题上存在疑惑,小编查阅了各式资料,
千家信息网最后更新 2025年12月03日Storm中怎么使用Direct Grouping分组策略
这篇文章主要介绍"Storm中怎么使用Direct Grouping分组策略",在日常操作中,相信很多人在Storm中怎么使用Direct Grouping分组策略问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Storm中怎么使用Direct Grouping分组策略"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
使用 Direct Grouping 分组策略,将首字母相同的单词发送给同一个task计数
数据源spout
package com.zhch.v3;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import java.io.BufferedReader;import java.io.FileReader;import java.util.Map;import java.util.UUID;import java.util.concurrent.ConcurrentHashMap;public class SentenceSpout extends BaseRichSpout { private FileReader fileReader = null; private boolean completed = false; private ConcurrentHashMap pending; private SpoutOutputCollector collector; @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("sentence")); } @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.collector = spoutOutputCollector; this.pending = new ConcurrentHashMap(); try { this.fileReader = new FileReader(map.get("wordsFile").toString()); } catch (Exception e) { throw new RuntimeException("Error reading file [" + map.get("wordsFile") + "]"); } } @Override public void nextTuple() { if (completed) { try { Thread.sleep(1000); } catch (InterruptedException e) { } } String line; BufferedReader reader = new BufferedReader(fileReader); try { while ((line = reader.readLine()) != null) { Values values = new Values(line); UUID msgId = UUID.randomUUID(); this.pending.put(msgId, values); this.collector.emit(values, msgId); } } catch (Exception e) { throw new RuntimeException("Error reading tuple", e); } finally { completed = true; } } @Override public void ack(Object msgId) { this.pending.remove(msgId); } @Override public void fail(Object msgId) { this.collector.emit(this.pending.get(msgId), msgId); }} 实现语句分割bolt
package com.zhch.v3;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.List;import java.util.Map;public class SplitSentenceBolt extends BaseRichBolt { private OutputCollector collector; private List numCounterTasks; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; //获取下游bolt的taskId列表 this.numCounterTasks = topologyContext.getComponentTasks(WordCountTopology.COUNT_BOLT_ID); } @Override public void execute(Tuple tuple) { String sentence = tuple.getStringByField("sentence"); String[] words = sentence.split(" "); for (String word : words) { Integer taskId = this.numCounterTasks.get(this.getWordCountIndex(word)); collector.emitDirect(taskId, tuple, new Values(word)); } this.collector.ack(tuple); } public Integer getWordCountIndex(String word) { word = word.trim().toUpperCase(); if (word.isEmpty()) return 0; else { //单词首字母对下游 bolt taskId 列表长度取余 return word.charAt(0) % numCounterTasks.size(); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); }} 实现单词计数bolt
package com.zhch.v3;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import java.io.BufferedWriter;import java.io.FileWriter;import java.util.HashMap;import java.util.Iterator;import java.util.Map;public class WordCountBolt extends BaseRichBolt { private OutputCollector collector; private HashMap counts = null; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; this.counts = new HashMap(); } @Override public void execute(Tuple tuple) { String word = tuple.getStringByField("word"); Long count = this.counts.get(word); if (count == null) { count = 0L; } count++; this.counts.put(word, count); BufferedWriter writer = null; try { writer = new BufferedWriter(new FileWriter("/home/grid/stormData/result.txt")); Iterator keys = this.counts.keySet().iterator(); while (keys.hasNext()) { String w = keys.next(); Long c = this.counts.get(w); writer.write(w + " : " + c); writer.newLine(); writer.flush(); } } catch (Exception e) { e.printStackTrace(); } finally { if (writer != null) { try { writer.close(); } catch (Exception e) { e.printStackTrace(); } writer = null; } } this.collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word", "count")); }} 实现单词计数topology
package com.zhch.v3;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.topology.TopologyBuilder;public class WordCountTopology { public static final String SENTENCE_SPOUT_ID = "sentence-spout"; public static final String SPLIT_BOLT_ID = "split-bolt"; public static final String COUNT_BOLT_ID = "count-bolt"; public static final String TOPOLOGY_NAME = "word-count-topology-v3"; public static void main(String[] args) throws Exception { SentenceSpout spout = new SentenceSpout(); SplitSentenceBolt spiltBolt = new SplitSentenceBolt(); WordCountBolt countBolt = new WordCountBolt(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SENTENCE_SPOUT_ID, spout, 2); builder.setBolt(SPLIT_BOLT_ID, spiltBolt, 2).setNumTasks(4) .shuffleGrouping(SENTENCE_SPOUT_ID); builder.setBolt(COUNT_BOLT_ID, countBolt, 2) .directGrouping(SPLIT_BOLT_ID); //使用 Direct Grouping 分组策略 Config config = new Config(); config.put("wordsFile", args[0]); if (args != null && args.length > 1) { config.setNumWorkers(2); //集群模式启动 StormSubmitter.submitTopology(args[1], config, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology()); try { Thread.sleep(5 * 1000); } catch (InterruptedException e) { } cluster.killTopology(TOPOLOGY_NAME); cluster.shutdown(); } }}提交到Storm集群
storm jar Storm02-1.0-SNAPSHOT.jar com.zhch.v3.WordCountTopology /home/grid/stormData/input.txt word-count-topology-v3
运行结果:
[grid@hadoop5 stormData]$ cat result.txt second : 1can : 1set : 1simple : 1use : 2unbounded : 1used : 1It : 1Storm : 4online : 1cases: : 1open : 1Apache : 1of : 2over : 1more : 1clocked : 1easy : 2scalable : 1any : 1guarantees : 1ETL : 1million : 1continuous : 1is : 6with : 1it : 2makes : 1your : 1a : 4at : 1machine : 1analytics : 1up : 1and : 5many : 1system : 1source : 1what : 1operate : 1will : 1computation : 2streams : 1[grid@hadoop6 stormData]$ cat result.txt to : 3for : 2data : 2distributed : 2has : 1free : 1programming : 1reliably : 1fast: : 1processing : 2be : 2Hadoop : 1did : 1fun : 1learning : 1torm : 1process : 1RPC : 1node : 1processed : 2per : 2realtime : 3benchmark : 1batch : 1doing : 1lot : 1language : 1tuples : 1fault-tolerant : 1
到此,关于"Storm中怎么使用Direct Grouping分组策略"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!
策略
分组
单词
学习
字母
更多
集群
帮助
实用
相同
接下来
数据
数据源
文章
方法
模式
理论
知识
篇文章
结果
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
cf端游最好看的服务器
网络安全教育怎么写字
夏普打印机无法进入服务器
苹果7没有语音与数据库
银行产品表数据库
shopex连接不上数据库
软件开发主管竞选
新乡华诚网络技术有限公司
选择计算机网络技术的认知
北京德博网络技术公司怎么样
数据库大小2g怎么导入
我的世界手机版服务器生存合集
移动设备网络安全概念股
学业考试网络技术应用
衢州制造业产品追溯软件开发
网络安全信息委题
反电诈网络安全教育心得300字
dz论坛用什么web服务器
舟山软件开发部
app软件开发多钱
软件开发简易流程
公开的ftp服务器
吉林省超级服务器云服务器
哪项扩展和数据库无关
数据库二级缓存是什么意思
专门数据库
宝山区参考软件开发制造价格
视频网站使用什么数据库
csdn数据库原理
网络安全法宣贯情况