storm如何配置使用
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,这篇文章主要为大家展示了"storm如何配置使用",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"storm如何配置使用"这篇文章吧。示例代码如下:#stor
千家信息网最后更新 2025年12月01日storm如何配置使用
这篇文章主要为大家展示了"storm如何配置使用",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"storm如何配置使用"这篇文章吧。
示例代码如下:
#storm.yaml 配置#zookeeper storm.zookeeper.servers: - "bigdata01" - "bigdata02" - "bigdata03"#本地存放数据的路径storm.local.dir: "/apps/storm"#nimbus masternimbus.seeds: ["bigdata00"]#workder端口supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 启动命令 bin/ nohup storm nimbus & bin/ nohup storm supervisor & bin/ nohup storm ui &--------------------------------------------------------------------------------------package com.hgs.storm;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.generated.AlreadyAliveException;import org.apache.storm.generated.AuthorizationException;import org.apache.storm.generated.InvalidTopologyException;import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.IRichBolt;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseRichBolt;import org.apache.storm.topology.base.BaseRichSpout;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;public class StormWordCountTest { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, InterruptedException { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("wordspout", new WordCountSpout(), 3); builder.setBolt("splitword", (IRichBolt) new WordSpliteBolt(), 2).shuffleGrouping("wordspout"); //word 是splitword发出的字段,如第九十行 builder.setBolt("wordcount", new WordCountBolt(), 2).fieldsGrouping("splitword", new Fields("word")); Config config = new Config(); config.setNumWorkers(2);/* StormSubmitter.submitTopology("words-count", config, builder.createTopology()); if(args!=null && args.length>0) { StormSubmitter.submitTopology(args[0], config, builder.createTopology()); }else { LocalCluster cluster = new LocalCluster(); }*/ LocalCluster cluster = new LocalCluster(); cluster.submitTopology("words-count", config, builder.createTopology()); }}class WordCountSpout extends BaseRichSpout{ private static final long serialVersionUID = 1L; //从open方法中的到collector,用于declareOutputFields 方法发出字段信息 SpoutOutputCollector collector = null; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } @Override public void nextTuple() { collector.emit(new Values(" this is my first storm program so i hope it will success")); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields("message")); } }class WordSpliteBolt extends BaseRichBolt{ private static final long serialVersionUID = 1L; OutputCollector collector = null; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String line = input.getString(0); String[] words = line.split(" "); for(String wd : words) { collector.emit(new Values(wd ,1)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word","num")); } }class WordCountBolt extends BaseRichBolt{ ConcurrentHashMap wordsMap = new ConcurrentHashMap(); private static final long serialVersionUID = 1L; OutputCollector collector = null; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String word = input.getString(0); Integer num = input.getInteger(1); if(wordsMap.containsKey(word)) { wordsMap.put(word, wordsMap.get(word)+num); }else { wordsMap.put(word, num); } System.out.println(word +"----"+wordsMap.get(word)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } 以上是"storm如何配置使用"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!
配置
内容
篇文章
字段
方法
学习
帮助
代码
信息
命令
数据
易懂
更多
条理
知识
示例
端口
编带
行业
资讯
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
宁波计算机软件开发管理
网络普法包括网络安全吗
切换安全补丁服务器
印之互联网科技有限公司
有人串口服务器sina
网络安全更高的要求
服务器远程桌面空白怎么解决
服务器对电脑有影响吗
我的世界国外盗版服务器
维普科技期刊文摘索引数据库
原神之后服务器会合并吗
正式服怎么换服务器
云服务器 独立ip
南昌定制软件开发多少钱
软件开发项目面试问题
服务器的管理模式
天天微视系统模式定制软件开发
西工大网络安全专业就业情况
数据库大数据ppt
洛阳服务器租用
网络安全编程与实践微盘
完全自主代码数据库
软件开发主持人的主要工作
链家网络安全有限公司
物联网 网络技术
大学生网络安全教育的内涵
白果服务器
甘肃电商软件开发费用
网络安全在国家的战略地位论文
战地1服务器选什么