storm的本地模式demo怎么实现
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,本篇内容介绍了"storm的本地模式demo怎么实现"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!S
千家信息网最后更新 2025年12月02日storm的本地模式demo怎么实现
本篇内容介绍了"storm的本地模式demo怎么实现"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
SimpleTopology.java
package com.zgl.helloword;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.topology.TopologyBuilder;/** * 定义了一个简单的topology,包括一个数据喷发节点spout和一个数据处理节点bolt。 * * @author Administrator * */public class SimpleTopology { public static void main(String[] args) { try { // 实例化TopologyBuilder类。 TopologyBuilder topologyBuilder = new TopologyBuilder(); // 设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。 topologyBuilder.setSpout("SimpleSpout", new SimpleSpout(), 1); // 设置数据处理节点并分配并发数。指定该节点接收喷发节点的策略为随机方式。 topologyBuilder.setBolt("SimpleBolt", new SimpleBolt(), 3).shuffleGrouping("SimpleSpout"); Config config = new Config(); config.setDebug(false); if (args != null && args.length > 0) { config.setNumWorkers(1); StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology()); } else { // 这里是本地模式下运行的启动代码。 config.setMaxTaskParallelism(1); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("simple", config, topologyBuilder.createTopology()); } } catch (Exception e) { e.printStackTrace(); } }}SimpleSpout.java
package com.zgl.helloword;import java.util.Map;import java.util.Random;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;/** * Spout起到和外界沟通的作用,他可以从一个数据库中按照某种规则取数据,也可以从分布式队列中取任务 * * @author Administrator * */public class SimpleSpout extends BaseRichSpout{ /** * */ private static final long serialVersionUID = 1L; //用来发射数据的工具类 private SpoutOutputCollector collector; private static String[] info = new String[]{ "comaple\t,12424,44w46,654,12424,44w46,654,", "lisi\t,435435,6537,12424,44w46,654,", "lipeng\t,45735,6757,12424,44w46,654,", "hujintao\t,45735,6757,12424,44w46,654,", "jiangmin\t,23545,6457,2455,7576,qr44453", "beijing\t,435435,6537,12424,44w46,654,", "xiaoming\t,46654,8579,w3675,85877,077998,", "xiaozhang\t,9789,788,97978,656,345235,09889,", "ceo\t,46654,8579,w3675,85877,077998,", "cto\t,46654,8579,w3675,85877,077998,", "zhansan\t,46654,8579,w3675,85877,077998,"}; Random random=new Random(); /** * 初始化collector */ @SuppressWarnings("rawtypes") public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } /** * 在SpoutTracker类中被调用,每调用一次就可以向storm集群中发射一条数据(一个tuple元组),该方法会被不停的调用 */ public void nextTuple() { try { String msg = info[random.nextInt(11)]; // 调用发射方法 collector.emit(new Values(msg)); // 模拟等待100ms Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。 * 该declarer变量有很大作用,我们还可以调用declarer.declareStream();来定义stramId,该id可以用来定义更加复杂的流拓扑结构 */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("source")); //collector.emit(new Values(msg));参数要对应 }}SimpleBolt.java
package com.zgl.helloword;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseBasicBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;/** * 接收喷发节点(Spout)发送的数据进行简单的处理后,发射出去。 * * @author Administrator * */@SuppressWarnings("serial")public class SimpleBolt extends BaseBasicBolt { public void execute(Tuple input, BasicOutputCollector collector) { try { String msg = input.getString(0); if (msg != null){ System.out.println("msg="+msg); collector.emit(new Values(msg + "msg is processed!")); } } catch (Exception e) { e.printStackTrace(); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("info")); }}pom.xml
4.0.0 strom-zgl storm-zgl 0.0.1-SNAPSHOT jar storm-zgl http://maven.apache.org UTF-8 junit junit 3.8.1 test org.apache.storm storm-core 0.9.1-incubating
"storm的本地模式demo怎么实现"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
数据
节点
模式
发射
处理
很大
作用
内容
字段
数据处理
方法
更多
用处
知识
集群
分配
复杂
实用
学有所成
接下来
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
wps数据库建表
澳洲软件开发公司创业史
计算机神经网络技术
网络安全的正面事例
福山区定制软件开发公司有哪些
绝地求生国服高性能服务器
软件服务费和软件开发费用
宝塔服务器管理系统好用吗
抢占数据库
云端数据库编程模板
和平精英大厅服务器异常
软件开发工程师外包费用
数据库表无法导入的解决方法
主机防护服务器
数据库etl工具
天使之战各平台服务器划分
国内数据库类型
2020年网络安全宣传片
高校 软件开发能力
网络安全非传统安全问题
研究生计算机网络技术学习方法
社区开展网络安全宣传教育
澳洲网络安全年薪
软件开发公司有加盟的吗
手机视频加速服务器打开了会怎样
网络安全专业工作方向
公安网络安全责任书
天猫购物车下单服务器出错怎么办
极进网络技术有限公司
职业学校学计算机数据库管理