千家信息网

Storm开发细节是什么

发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,这篇文章主要介绍"Storm开发细节是什么",在日常操作中,相信很多人在Storm开发细节是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Storm开发细节是什么"
千家信息网最后更新 2025年12月03日Storm开发细节是什么

这篇文章主要介绍"Storm开发细节是什么",在日常操作中,相信很多人在Storm开发细节是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Storm开发细节是什么"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

package test;import java.io.IOException;import java.util.Map;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import storm.copyFromClass.TestWordSpout;import com.esotericsoftware.minlog.Log;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.TopologyBuilder;import backtype.storm.topology.base.BaseBasicBolt;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Tuple;// 测试目的,在这里我们需要测试一下当前Spout 不断产生数据的过程public class testWordSpoutTopology {        public static class TestSimpleBolt extends BaseBasicBolt {                @Override                public void execute(Tuple input, BasicOutputCollector collector) {                        System.out.println(input.toString());                }                @Override                public void declareOutputFields(OutputFieldsDeclarer declarer) {                        System.out.println("Method declare");                }        }                public static void main(String[] args) throws IOException {                        // 首先,我们必须建立一个新的TopologyBuilder                TopologyBuilder builder = new TopologyBuilder();                                //其次,我们需要配置如下的组件: 1 Spout,2Bolt                builder.setSpout("word-emit-byThread", new TestWordSpout());                                //在这个Spout之中,我们约定将 【word-emit-byThread】Spout组件 发射的元祖进行                shuffleGrouping                                builder.setBolt("word-show", new TestSimpleBolt()).shuffleGrouping(                                "word-emit-byThread");                Config config = new Config();                config.setDebug(false);                //最后进行本地提交                LocalCluster cluster = new LocalCluster();                cluster.submitTopology("simple", config, builder.createTopology());        }}

以上,

testWordSpoutTopology

是我们运行的主类

package storm.copyFromClass;import backtype.storm.Config;import backtype.storm.topology.OutputFieldsDeclarer;import java.util.Map;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;import java.util.HashMap;import java.util.Random;import org.slf4j.Logger;import org.slf4j.LoggerFactory;//public class TestWordSpout extends BaseRichSpout {    public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class);    boolean _isDistributed;    SpoutOutputCollector _collector;    public TestWordSpout() {        this(true);    }    public TestWordSpout(boolean isDistributed) {        _isDistributed = isDistributed;    }            public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {        _collector = collector;    }        public void close() {            }                    // 发送    public void nextTuple() {        Utils.sleep(100);        final String[] words = new String[] { "张兵", "吴哥", "仝志维", "前辈", "禅师"};        final Random rand = new Random();        final String word = words[rand.nextInt(words.length)];        _collector.emit(new Values(word));    }        //在这里,我们没有进行ACK    public void ack(Object msgId) {    }    //在这里,我们没有进行fail    public void fail(Object msgId) {            }        public void declareOutputFields(OutputFieldsDeclarer declarer) {        declarer.declare(new Fields("word"));    }    @Override    public Map getComponentConfiguration() {        if(!_isDistributed) {            Map ret = new HashMap();            ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);            return ret;        } else {            return null;        }    }    }

结果:

请注意在这里,我们的Stream 默认的id为空

到此,关于"Storm开发细节是什么"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

0