Spark2.3.1+Kafka0.9使用Direct模式消费信息异常怎么办
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,Spark2.3.1+Kafka0.9使用Direct模式消费信息异常怎么办,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。Spark2.
千家信息网最后更新 2025年12月03日Spark2.3.1+Kafka0.9使用Direct模式消费信息异常怎么办
Spark2.3.1+Kafka0.9使用Direct模式消费信息异常怎么办,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
Spark2.3.1+Kafka使用Direct模式消费信息
Maven依赖
org.apache.spark spark-streaming-kafka-0-8_2.11 2.3.1 org.apache.spark spark-streaming_2.11 2.3.1
2.3.1即spark版本
Direct模式代码
import kafka.serializer.StringDecoderimport org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.{SparkConf, SparkContext}object Test { val zkQuorum = "mirrors.mucang.cn:2181" val groupId = "nginx-cg" val topic = Map("nginx-log" -> 1) val KAFKA_INTERVAL = 10 case class NginxInof(domain: String, ip: String) def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("NginxLogAnalyze").setMaster("local[*]") val sparkContext = new SparkContext(sparkConf) val streamContext = new StreamingContext(sparkContext, Seconds(KAFKA_INTERVAL)) val kafkaParam = Map[String, String]( "bootstrap.servers" -> "xx.xx.cn:9092", "group.id" -> "nginx-cg", "auto.offset.reset" -> "largest" ) val topic = Set("nginx-log") val kafkaStream = KafkaUtils.createDirectStream(streamContext, kafkaParam, topic) val counter = kafkaStream .map(_.toString().split(" ")) .map(item => (item(0).split(",")(1) + "-" + item(2), 1)) .reduceByKey((x, y) => (x + y)) counter.foreachRDD(rdd => { rdd.foreach(println) }) streamContext.start() streamContext.awaitTermination() }}largest 因为kafka版本过低不支持latest
异常信息
Caused by: java.lang.NoSuchMethodException: scala.runtime.Nothing$.(kafka.utils.VerifiableProperties) at java.lang.Class.getConstructor0(Class.java:3082) at java.lang.Class.getConstructor(Class.java:1825) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator. (KafkaRDD.scala:153) at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:136) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) ... 3 more
解决方案
在验证kafka属性时不能使用scala默认的类,需要指定kafka带的类createDirectStream[String, String, StringDecoder, StringDecoder]其中StringDecoder必须是kafka.serializer.StringDecoder
看完上述内容,你们掌握Spark2.3.1+Kafka0.9使用Direct模式消费信息异常怎么办的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!
信息
模式
消费
怎么办
内容
方法
更多
版本
问题
束手无策
为此
代码
原因
对此
属性
技能
方案
篇文章
经验
行业
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
fiery服务器与EFI公司
学习通数据库技术答案
北京华为软件开发
网络安全动漫图片
原神服务器维修要多久
会软件开发需要学什么
serv复制到ftp服务器
网络安全体系设计论证的主要方法
游戏软件开发策划工资
计算机网络技术多少分能上
vc连接数据库
杭州学习网络技术的学校
饥荒服务器管理员放哪里
网络安全全法手抄报图片
随州网络安全手抄报
华为手机激活显示网络服务器繁忙
国际mc如何开服务器
自己的服务器
软件开发网上商城范围
ncre三级数据库成绩
华为云软件开发区域划分
烟台龙口dns的服务器地址
未成年攻击服务器
泰安网络安全等级保护测评采购
网络安全个人观点
汤森路透并购数据库
计算机网络技术与应用张建忠
云南ios软件开发
数据库技术在工程学中的应用
网络安全手抄报的名言