storm启动类怎么定义
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,这篇文章主要介绍"storm启动类怎么定义",在日常操作中,相信很多人在storm启动类怎么定义问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"storm启动类怎么定义"
千家信息网最后更新 2025年12月01日storm启动类怎么定义
这篇文章主要介绍"storm启动类怎么定义",在日常操作中,相信很多人在storm启动类怎么定义问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"storm启动类怎么定义"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
在storm集群中真正运行Topology的主要有三个实例:工作进程丶线程和任务.
Storm集群中的每台机器上都可以运行多个工作进程,每个工作进程又可以创建多个线程,每个线程可以执行多个任务.
Storm可靠性:是通过对消息树给定一个唯一的ID,每送一个消息,都会同步发送一个ack或fail,对于网络的宽带会有一定的消耗,如果对于可靠性要求不高,可以通过使用不同的emit接口关闭该模式.
一、storm启动类的定义。
package com.cmsz.storm.trading.test;import backtype.storm.topology.TopologyBuilder;import backtype.storm.tuple.Fields;public class MainStorm { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("A", new ASpout()); builder.setBolt("B", new BBolt()).shuffleGrouping("A", "streamId_B");//componentId和streamId builder.setBolt("C", new CBolt()).shuffleGrouping("A", "streamId_C");//componentId和streamId builder.setBolt("D", new DBolt()).fieldsGrouping("B", new Fields("id"));//componentId和streamId builder.setBolt("E", new EBolt()).fieldsGrouping("C", new Fields("id")); Config conf = new Config(); if (args != null && args.length > 0) { conf.setNumWorkers(1); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("myTopo", conf, builder.createTopology()); } }}二、spout定义了streamId,接受的bolt要定义componentId与spout中定义的streamId("streamId_B"、"streamId_C")对应定义去接收,fiedsGrouping的new Fields("id")中的id要和componentId的对应bolt中new Fields("id","message")中的"id"对应就会以"id"进行分组
package com.cmsz.storm.trading.test;import java.util.Map;import java.util.Random;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;public class ASpout extends BaseRichSpout{ SpoutOutputCollector collector; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } @Override public void nextTuple() { Utils.sleep(10); final String[] words = new String[] {"B_nathan", "C_mike", "B_jackson", "C_golda", "B_bertels"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; if(word.indexOf("B_")>-1){ collector.emit("streamId_B",new Values(word)); }else if(word.indexOf("C_")>-1){ collector.emit("streamId_C",new Values(word)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream("streamId_B", new Fields("streamId_B")); declarer.declareStream("streamId_C", new Fields("streamId_C")); } @Override public void ack(Object msgId) { super.ack(msgId); } @Override public void fail(Object msgId) { super.fail(msgId); }}package com.cmsz.storm.trading.test;import java.util.Map;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;public class BBolt implements IBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id","message")); } @Override public Map getComponentConfiguration() { return null; } @Override public void prepare(Map stormConf, TopologyContext context) { } @Override public void execute(Tuple input, BasicOutputCollector collector) { String msg = input.getString(0); System.out.println(msg); collector.emit(new Values(msg,msg+"BBolt")); } @Override public void cleanup() { }} package com.cmsz.storm.trading.test;import java.util.Map;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;public class CBolt implements IBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id","message")); } @Override public Map getComponentConfiguration() { return null; } @Override public void prepare(Map stormConf, TopologyContext context) { } @Override public void execute(Tuple input, BasicOutputCollector collector) { String msg = input.getString(0); System.out.println(msg); collector.emit(new Values(msg,msg+"CBolt")); } @Override public void cleanup() { }} package com.cmsz.storm.trading.test;import java.util.Map;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;public class DBolt implements IBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("message")); } @Override public Map getComponentConfiguration() { return null; } @Override public void prepare(Map stormConf, TopologyContext context) { } @Override public void execute(Tuple input, BasicOutputCollector collector) { System.out.println("DBolt"+input.getString(0)); } @Override public void cleanup() { }} package com.cmsz.storm.trading.test;import java.util.Map;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;public class EBolt implements IBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("message")); } @Override public Map getComponentConfiguration() { return null; } @Override public void prepare(Map stormConf, TopologyContext context) { } @Override public void execute(Tuple input, BasicOutputCollector collector) { System.out.println("EBolt"+input.getString(0)); } @Override public void cleanup() { }} 到此,关于"storm启动类怎么定义"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!
学习
多个
线程
进程
工作
任务
可靠性
更多
消息
集群
帮助
运行
不同
实用
接下来
三个
可以通过
实例
接口
文章
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库设计安全设置
网络技术副总经理岗位职责
北京唔哩网络技术招聘
我的世界国际版32k服务器ip
计算机网络技术的技能是什么
服务器必须装数据库吗
无锡橙久网络技术有限公司
公安网络安全总结
阿里云服务器可以退吗
国标软件开发模型
数据库联合主键优势
广州创游网络技术有限公司
饥荒独立服务器添加管理
青海省市场监督管理局网络安全
网络安全法宣贯的意义
网络数据库环境特性相关题目
工商银行软件开发中心英语
如何做网络安全宣传视频
数据库支持管理工具
陕西麦森网络技术有限公司
青岛存储服务器拆机
安徽享宿互联网科技有限公司
网络安全与信息化领域的课题
福建省电信网络安全管理条例
网络安全通知中学
计算网络技术属于什么专业
农行的软件开发
株洲服务器软件工程师寒假班
db2 联邦数据库
怎么成为苹果软件开发者