如何进行storm1.1.3与kafka1.0.0整合
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,本篇文章给大家分享的是有关如何进行storm1.1.3与kafka1.0.0整合,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。packa
千家信息网最后更新 2025年12月02日如何进行storm1.1.3与kafka1.0.0整合
本篇文章给大家分享的是有关如何进行storm1.1.3与kafka1.0.0整合,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
package hgs.core.sk;import java.util.Map;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.kafka.BrokerHosts;import org.apache.storm.kafka.KafkaSpout;import org.apache.storm.kafka.SpoutConfig;import org.apache.storm.kafka.ZkHosts;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseRichBolt;import org.apache.storm.tuple.Tuple;@SuppressWarnings("deprecation")public class StormKafkaMainTest { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); //zookeeper链接地址 BrokerHosts hosts = new ZkHosts("bigdata01:2181,bigdata02:2181,bigdata03:2181"); //KafkaSpout需要一个config,参数代表的意义1:zookeeper链接,2:消费kafka的topic,3,4:记录消费offset的zookeeper地址 ,这里会保存在 zookeeper //集群的/test7/consume下面 SpoutConfig sconfig = new SpoutConfig(hosts, "test7", "/test7", "consume"); //消费的时候忽略offset从头开始消费,这里可以注释掉,因为消费的offset在zookeeper中可以找到 sconfig.ignoreZkOffsets=true; //sconfig.scheme = new SchemeAsMultiScheme( new StringScheme() ); builder.setSpout("kafkaspout", new KafkaSpout(sconfig), 1); builder.setBolt("mybolt1", new MyboltO(), 1).shuffleGrouping("kafkaspout"); Config config = new Config(); config.setNumWorkers(1); try { StormSubmitter.submitTopology("storm----kafka--test", config, builder.createTopology()); } catch (Exception e) { e.printStackTrace(); } /* LocalCluster cu = new LocalCluster(); cu.submitTopology("test", config, builder.createTopology());*/ }}class MyboltO extends BaseRichBolt{ private static final long serialVersionUID = 1L; OutputCollector collector = null; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { //这里把消息大一出来,在对应的woker下面的日志可以找到打印的内容 //因为得到的内容是byte数组,所以需要转换 String out = new String((byte[])input.getValue(0)); System.out.println(out); collector.ack(input); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } }pom.xml文件的依赖
4.0.0 hgs core.sk 1.0.0-SNAPSHOT jar core.sk http://maven.apache.org UTF-8 junit junit 3.8.1 test org.apache.storm storm-kafka 1.1.3 org.apache.storm storm-core 1.1.3 provided org.apache.kafka kafka_2.11 1.0.0 org.slf4j slf4j-log4j12 org.apache.zookeeper zookeeper org.clojure clojure 1.7.0 org.apache.kafka kafka-clients 1.0.0 maven-assembly-plugin 2.2 hgs.core.sk.StormKafkaMainTest jar-with-dependencies make-assembly package single org.apache.maven.plugins maven-compiler-plugin 1.8 1.8
以上就是如何进行storm1.1.3与kafka1.0.0整合,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。
消费
整合
内容
地址
更多
知识
篇文章
链接
实用
从头
代表
参数
就是
工作会
意义
数组
文件
文章
日志
时候
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库表约束表达式
现在建立数据库的常用软件
企业十四五网络安全规划重点
计算机软件开发前端和后端
中国中医案例数据库
华悦智能门服务器地址
excel服务器2015
sql文件如何查看数据库名字
华为台式机服务器
网络安全管理工程师工资
常州网络安全实战
奕城连接不上服务器
西安平台软件开发
湖北pdu服务器专用电源购买
什么叫数字货币网络技术
关于网络安全的博文英语结尾
软件开发技术靠谱吗
网络安全宣传主题海报宣传画
沈阳网络安全报案
网络安全画报简单漂亮
泰国服务器
数据库单个的数据是以什么存放的
c服务器开发书籍
sql获取数据库日志名称
数据库无法导入xlsx
广州软件开发收费
怎么检查服务器的安全性
长治市网络安全宣传活动
怎么查看服务器的端口号
衢州品牌网络技术哪家好