storm-kafka(storm spout作为kafka的消费端)
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,storm是grovvy写的kafka是scala写的storm-kafka storm连接kafka consumer的插件下载地址:https://github.com/wurstmeister/
千家信息网最后更新 2025年12月03日storm-kafka(storm spout作为kafka的消费端)
storm是grovvy写的
kafka是scala写的
storm-kafka storm连接kafka consumer的插件
下载地址:
https://github.com/wurstmeister/storm-kafka-0.8-plus
除了需要storm和kafka相关jar包还需要google-collections-1.0.jar
以及zookeeper相关包 curator-framework-1.3.3.jar和curator-client-1.3.3.jar
以前由com.netflix.curator组织开发现在归到org.apache.curator下面
1.Kafka Consumer即Storm Spout代码
package demo;import java.util.ArrayList;import java.util.List;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.generated.AlreadyAliveException;import backtype.storm.generated.InvalidTopologyException;import backtype.storm.spout.SchemeAsMultiScheme;import backtype.storm.topology.TopologyBuilder;import storm.kafka.KafkaSpout;import storm.kafka.SpoutConfig;import storm.kafka.StringScheme;import storm.kafka.ZkHosts;public class MyKafkaSpout {public static void main(String[] args) { String topic ="track"; ZkHosts zkhosts = new ZkHosts("192.168.1.107:2181,192.168.1.108:2181,192.168.1.109:2181"); SpoutConfig spoutConfig = new SpoutConfig(zkhosts, topic, "/MyKafka", //偏移量offset的根目录 "MyTrack");//子目录对应一个应用 List zkServers=new ArrayList(); //zkServers.add("192.168.1.107"); //zkServers.add("192.168.1.108"); for(String host:zkhosts.brokerZkStr.split(",")) { zkServers.add(host.split(":")[0]); } spoutConfig.zkServers=zkServers; spoutConfig.zkPort=2181; spoutConfig.forceFromStart=true;//从头开始消费,实际上是要改成false的 spoutConfig.socketTimeoutMs=60; spoutConfig.scheme=new SchemeAsMultiScheme(new StringScheme());//定义输出为string类型 TopologyBuilder builder=new TopologyBuilder(); builder.setSpout("spout", new KafkaSpout(spoutConfig),1);//引用spout,并发度设为1 builder.setBolt("bolt1", new MyKafkaBolt(),1).shuffleGrouping("spout"); Config config =new Config(); config.setDebug(true);//上线之前都要改成false否则日志会非常多 if(args.length>0){ try { StormSubmitter.submitTopology(args[0], config, builder.createTopology()); } catch (AlreadyAliveException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InvalidTopologyException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else{ LocalCluster localCluster=new LocalCluster(); localCluster.submitTopology("mytopology", config, builder.createTopology()); //本地模式在一个进程里面模拟一个storm集群的所有功能 } }} 2.Bolt代码只是简单打印输出,覆写execute方法即可
package demo;import java.util.Map;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;public class MyKafkaBolt implements IBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer arg0) { // TODO Auto-generated method stub } @Override public Map getComponentConfiguration() { // TODO Auto-generated method stub return null; } @Override public void cleanup() { // TODO Auto-generated method stub } @Override public void execute(Tuple input, BasicOutputCollector arg1) { String kafkaMsg =input.getString(0); System.err.println("bolt"+kafkaMsg); } @Override public void prepare(Map arg0, TopologyContext arg1) { // TODO Auto-generated method stub }}
代码
输出
消费
从头
功能
只是
地址
子目
子目录
实际
实际上
插件
插件下载
方法
日志
根目录
模式
类型
进程
集群
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
手机锁屏连接不到服务器
华为服务器没有响应
服务器代理高匿清洗
广州天是网络技术有限公司
深圳交易软件开发平台
苏州四海升平网络技术有限公司
山西网络时间同步服务器云空间
铁路网络安全的重要性
c s软件开发的核心技术
通州西集网络安全员招聘
csgo全球服务器
恩科网络技术
软件开发应该看哪方面书
服务器3代cpu是什么架构
北航网络安全学院在哪
电信网络安全工作总结
精益软件开发七项原则
mysql数据库怎么建立
2021网络安全宣传周晚会
逃离塔科夫国内加载什么服务器
服务器内存满了怎么办
数据库活锁解决
数据库有左连接为什么还要右连接
网络安全和信息化方针
php写服务器端
数据库带源地址怎么平
php搜索数据库设计
瑞金伯乐网络技术有限公司
网络安全法如何实施
民宗领域网络安全