storm-kafka-client使用的示例分析
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,storm-kafka-client使用的示例分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。package hgs.core.sk;
千家信息网最后更新 2025年12月02日storm-kafka-client使用的示例分析
storm-kafka-client使用的示例分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
package hgs.core.sk;import java.util.Map;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.kafka.spout.ByTopicRecordTranslator;import org.apache.storm.kafka.spout.KafkaSpout;import org.apache.storm.kafka.spout.KafkaSpoutConfig;import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseRichBolt;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;//参考如下//https://community.hortonworks.com/articles/87597/how-to-write-topology-with-the-new-kafka-spout-cli.html//https://github.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java#L52public class StormKafkaMainTest { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); //该类将传入的kafka记录转换为storm的tuple ByTopicRecordTranslator brt = new ByTopicRecordTranslator<>( (r) -> new Values(r.value(),r.topic()),new Fields("values","test7")); //设置要消费的topic即test7 brt.forTopic("test7", (r) -> new Values(r.value(),r.topic()), new Fields("values","test7")); //类似之前的SpoutConfig KafkaSpoutConfig ksc = KafkaSpoutConfig //bootstrapServers 以及topic(test7) .builder("bigdata01:9092,bigdata02:9092,bigdata03:9092", "test7") //设置group.id .setProp(ConsumerConfig.GROUP_ID_CONFIG, "skc-test") //设置开始消费的气势位置 .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST) //设置提交消费边界的时长间隔 .setOffsetCommitPeriodMs(10_000) //Translator .setRecordTranslator(brt) .build(); builder.setSpout("kafkaspout", new KafkaSpout<>(ksc), 2); builder.setBolt("mybolt1", new MyboltO(), 4).shuffleGrouping("kafkaspout"); Config config = new Config(); config.setNumWorkers(2); config.setNumAckers(0); try { StormSubmitter.submitTopology("storm-kafka-clients", config, builder.createTopology()); } catch (Exception e) { e.printStackTrace(); } /* LocalCluster cu = new LocalCluster(); cu.submitTopology("test", config, builder.createTopology());*/ }}class MyboltO extends BaseRichBolt{ private static final long serialVersionUID = 1L; OutputCollector collector = null; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { //这里把消息大一出来,在对应的woker下面的日志可以找到打印的内容 String out = input.getString(0); System.out.println(out); //collector.ack(input); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } } pom.xml文件
4.0.0 hgs core.sk 1.0.0-SNAPSHOT jar core.sk http://maven.apache.org UTF-8 junit junit 3.8.1 test org.apache.storm storm-kafka-client 1.1.3 org.apache.storm storm-core 1.1.3 provided org.apache.kafka kafka_2.11 1.0.0 org.slf4j slf4j-log4j12 org.apache.zookeeper zookeeper org.clojure clojure 1.7.0 org.apache.kafka kafka-clients 1.0.0 maven-assembly-plugin 2.2 hgs.core.sk.StormKafkaMainTest jar-with-dependencies make-assembly package single org.apache.maven.plugins maven-compiler-plugin 1.8 1.8
//以下为lambda表达式,因为在上面用大了,所以在这儿记录一下,以免以后看不懂import java.util.UUID;import org.junit.jupiter.api.Test;public class TEst { @Test public void sysConfig() { String[] ags = {"his is my first storm program so i hope it will success", "i love bascketball", "the day of my birthday i was alone"}; String uuid = UUID.randomUUID().toString(); String nexttuple= ags[new Random().nextInt(ags.length)]; System.out.println(nexttuple); } @Test public void lambdaTest() { int b = 100; //该出返回10*a的值、 //"(a) -> 10*a" 相当于 new testinter(); printPerson((a) -> 10*a) ; } void printPerson( testinter t) { //穿过来的t需要一个参数a 即下面借口中定义的方法sysoutitems(int a ) System.out.println(t.sysoutitems(100)); }; }//定义接口,在lambda表达式运用中,必须为借口,并且借口只能有一个方法interface testinter{ T sysoutitems(int a ); //void aAndb(int a, int b );} 看完上述内容,你们掌握storm-kafka-client使用的示例分析的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!
方法
借口
内容
消费
示例
分析
更多
表达式
问题
束手无策
为此
位置
原因
参数
对此
技能
接口
文件
日志
时长
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
汇智服务器如何做raid
dnf 与服务器断开连接
小型服务器能不能让上网更快
南昌大同欣网络技术有限公司
数据库给用户赋权限
vr软件开发时间
我的世界服务器新手问答指令
my sql数据库实训报告
足球经理2010数据库制作
社团网络技术部的面试问题
利用个人pc搭建服务器
上海子健网络技术有限公司
光遇国际服测试版服务器推荐
联想电脑服务器专卖
大数据软件开发哪家可靠
软件开发 取费
电脑软件开发入门自学教程
支付宝删除人脸数据库
黑龙江任务态势系统软件开发
上亿简历大数据库
长沙哪里有软件开发
淄川文件审批oa软件开发公司
数据库有哪些实现原理
在数据库中什么是实体集
网络安全配备工具
nacivat建数据库
eicu数据库住院时间
数据库中用什么表示实体
成绩差女孩子学软件开发怎么样
删除或者屏蔽信息网络技术