Storm如何实现单词计数
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,这篇文章主要介绍"Storm如何实现单词计数",在日常操作中,相信很多人在Storm如何实现单词计数问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Storm如何实现单词
千家信息网最后更新 2025年12月03日Storm如何实现单词计数
这篇文章主要介绍"Storm如何实现单词计数",在日常操作中,相信很多人在Storm如何实现单词计数问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Storm如何实现单词计数"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
1. 使用mvn命令创建项目
mvn archetype:generate -DgroupId=storm.test -DartifactId=Storm01 -DpackageName=com.zhch.v1
然后编辑配置文件pom.xml,添加storm依赖
org.apache.storm storm-core 0.9.4
最后通过下述命令来编译项目,编译正确完成后导入到IDE中
mvn install
当然,也可以在IDE中安装maven插件,从而直接在IDE中创建maven项目
2. 实现数据源,用重复的静态语句来模拟数据源
package storm.test.v1;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import java.util.Map;public class SentenceSpout extends BaseRichSpout { private String[] sentences = { "storm integrates with the queueing", "and database technologies you already use", "a storm topology consumes streams of data", "and processes those streams in arbitrarily complex ways", "repartitioning the streams between each stage of the computation however needed" }; private int index = 0; private SpoutOutputCollector collector; @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("sentence")); } @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.collector = spoutOutputCollector; } @Override public void nextTuple() { this.collector.emit(new Values(sentences[index])); index++; if (index >= sentences.length) { index = 0; } try { Thread.sleep(1); } catch (InterruptedException e) { } }}3. 实现语句分割bolt
package storm.test.v1;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.Map;public class SplitSentenceBolt extends BaseRichBolt { private OutputCollector collector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; } @Override public void execute(Tuple tuple) { String sentence = tuple.getStringByField("sentence"); String[] words = sentence.split(" "); for (String word : words) { this.collector.emit(new Values(word)); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); }}4. 实现单词计数bolt
package storm.test.v1;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.HashMap;import java.util.Map;public class WordCountBolt extends BaseRichBolt { private OutputCollector collector; private HashMap counts = null; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; this.counts = new HashMap(); } @Override public void execute(Tuple tuple) { String word = tuple.getStringByField("word"); Long count = this.counts.get(word); if (count == null) { count = 0L; } count++; this.counts.put(word, count); this.collector.emit(new Values(word, count)); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word", "count")); }} 5. 实现上报bolt
package storm.test.v1;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Tuple;import java.util.ArrayList;import java.util.Collections;import java.util.HashMap;import java.util.List;import java.util.Map;public class ReportBolt extends BaseRichBolt { private HashMap counts = null; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { counts = new HashMap(); } @Override public void execute(Tuple tuple) { String word = tuple.getStringByField("word"); Long count = tuple.getLongByField("count"); this.counts.put(word, count); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } @Override public void cleanup() { //本地模式下,终止topology时可以保证cleanup()被执行 System.out.println("--- FINAL COUNTS ---"); List keys = new ArrayList(); keys.addAll(this.counts.keySet()); Collections.sort(keys); for (String key : keys) { System.out.println(key + " : " + this.counts.get(key)); } System.out.println("----------"); }} 6. 实现单词计数topology
package storm.test.v1;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.TopologyBuilder;import backtype.storm.tuple.Fields;public class WordCountTopology { private static final String SENTENCE_SPOUT_ID = "sentence-spout"; private static final String SPLIT_BOLT_ID = "split-bolt"; private static final String COUNT_BOLT_ID = "count-bolt"; private static final String REPORT_BOLT_ID = "report-bolt"; private static final String TOPOLOGY_NAME = "word-count-topology"; public static void main(String[] args) { SentenceSpout spout = new SentenceSpout(); SplitSentenceBolt spiltBolt = new SplitSentenceBolt(); WordCountBolt countBolt = new WordCountBolt(); ReportBolt reportBolt = new ReportBolt(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SENTENCE_SPOUT_ID, spout); //注册数据源 builder.setBolt(SPLIT_BOLT_ID, spiltBolt) //注册bolt .shuffleGrouping(SENTENCE_SPOUT_ID); //该bolt订阅spout随机均匀发射来的数据流 builder.setBolt(COUNT_BOLT_ID, countBolt) .fieldsGrouping(SPLIT_BOLT_ID, new Fields("word")); //该bolt订阅spiltBolt发射来的数据流,并且保证"word"字段值相同的tuple会被路由到同一个countBolt builder.setBolt(REPORT_BOLT_ID, reportBolt) .globalGrouping(COUNT_BOLT_ID); //该bolt订阅countBolt发射来的数据流,并且所有的tuple都会被路由到唯一的一个reportBolt中 Config config = new Config(); //本地模式启动 LocalCluster cluster = new LocalCluster(); cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology()); try { Thread.sleep(5 * 1000); } catch (InterruptedException e) { } cluster.killTopology(TOPOLOGY_NAME); cluster.shutdown(); }}7. 运行结果:
--- FINAL COUNTS ---a : 302already : 302and : 604arbitrarily : 302between : 302complex : 302computation : 302consumes : 302data : 302database : 302each : 302however : 302in : 302integrates : 302needed : 302of : 604processes : 302queueing : 302repartitioning : 302stage : 302storm : 604streams : 906technologies : 302the : 906those : 302topology : 302use : 302ways : 302with : 302you : 302----------
到此,关于"Storm如何实现单词计数"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!
单词
数据
学习
数据流
数据源
项目
发射
订阅
命令
更多
模式
语句
路由
保证
帮助
编译
实用
相同
接下来
字段
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
南京创惠互联网科技有限公司
贵州智慧医养软件开发专业制作
pubg一直处于服务器状态
软件开发用到大数据吗
软件开发阶段包括维护吗
服务器系统在u盘怎么看
苹果app签名证书软件开发
杨浦区会计软件开发诚信服务
数据库哪些最好
科技互联网捐赠汇总
怎么学好网络安全技术
hlwsccl服务器
怎么查看自己的服务器是什么系统
服装生产软件开发
网络安全自律规范
对日软件开发优势
网络安全事件上报
镇江专业网络安全准入控制公司
按照软件开发的阶段测试
服务器安装挖矿软件
济南机构养老软件开发系统
迈外迪网络安全吗
比威网络技术
关注网络安全威胁
计算机网络安全目标
碳交易系统软件开发上市公司
粘土服务器魔方大厦紫色攻略
嘉定区网络技术咨询平均价格
web服务器配置
网络安全规划的不足