spark-streaming-kafka怎样通过KafkaUtils.createDirectStream的方式处理数据
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,spark-streaming-kafka怎样通过KafkaUtils.createDirectStream的方式处理数据,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通
千家信息网最后更新 2025年12月03日spark-streaming-kafka怎样通过KafkaUtils.createDirectStream的方式处理数据
spark-streaming-kafka怎样通过KafkaUtils.createDirectStream的方式处理数据,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
package hgs.spark.streamingimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.streaming.StreamingContextimport org.apache.spark.streaming.Secondsimport org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.kafka.KafkaClusterimport scala.collection.immutable.Mapimport java.util.NoSuchElementExceptionimport org.apache.spark.SparkExceptionimport kafka.common.TopicAndPartitionimport kafka.message.MessageAndMetadataimport org.codehaus.jackson.map.deser.std.PrimitiveArrayDeserializers.StringDeserimport kafka.serializer.StringDecoderimport org.apache.spark.streaming.kafka.DirectKafkaInputDStreamimport org.apache.spark.rdd.RDDimport org.apache.spark.streaming.kafka.HasOffsetRangesimport org.apache.spark.HashPartitionerobject SparkStreamingKafkaDirectWordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[5]") conf.set("spark.streaming.kafka.maxRatePerPartition", "1") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc,Seconds(1)) ssc.checkpoint("d:\\checkpoint") val kafkaParams = Map[String,String]( "metadata.broker.list"->"bigdata01:9092,bigdata02:9092,bigdata03:9092", "group.id"->"group_hgs", "zookeeper.connect"->"bigdata01:2181,bigdata02:2181,bigdata03:2181") val kc = new KafkaCluster(kafkaParams) val topics = Set[String]("test") //每个rdd返回的数据是(K,V)类型的,该函数规定了函数返回数据的类型 val mmdFunct = (mmd: MessageAndMetadata[String, String])=>(mmd.topic+" "+mmd.partition,mmd.message()) val rds = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc, kafkaParams, getOffsets(topics,kc,kafkaParams),mmdFunct) val updateFunc=(iter:Iterator[(String,Seq[Int],Option[Int])])=>{ //iter.flatMap(it=>Some(it._2.sum+it._3.getOrElse(0)).map((it._1,_)))//方式一 //iter.flatMap{case(x,y,z)=>{Some(y.sum+z.getOrElse(0)).map((x,_))}}//方式二 iter.flatMap(it=>Some(it._1,(it._2.sum.toInt+it._3.getOrElse(0))))//方式三 } val words = rds.flatMap(x=>x._2.split(" ")).map((_,1)) //val wordscount = words.map((_,1)).updateStateByKey(updateFunc, new HashPartitioner(sc.defaultMinPartitions), true) //println(getOffsets(topics,kc,kafkaParams)) rds.foreachRDD(rdd=>{ if(!rdd.isEmpty()){ //对每个dataStreamoffset进行更新 upateOffsets(topics,kc,rdd,kafkaParams) } } ) words.print() ssc.start() ssc.awaitTermination() } def getOffsets(topics : Set[String],kc:KafkaCluster,kafkaParams:Map[String,String]):Map[TopicAndPartition, Long]={ val topicAndPartitionsOrNull = kc.getPartitions(topics) if(topicAndPartitionsOrNull.isLeft){ throw new SparkException(s"$topics in the set may not found") } else{ val topicAndPartitions = topicAndPartitionsOrNull.right.get val groups = kafkaParams.get("group.id").get val offsetOrNull = kc.getConsumerOffsets(groups, topicAndPartitions) if(offsetOrNull.isLeft){ println(s"$groups you assignment may not exists!now redirect to zero!") //如果没有消费过,则从最开始的位置消费 val erliestOffset = kc.getEarliestLeaderOffsets(topicAndPartitions) if(erliestOffset.isLeft) throw new SparkException(s"Topics and Partions not definded not found!") else erliestOffset.right.get.map(x=>(x._1,x._2.offset)) } else{ //如果消费组已经存在则从记录的地方开始消费 offsetOrNull.right.get } } } //每次拉取数据后存储offset到ZK def upateOffsets(topics : Set[String],kc:KafkaCluster,directRDD:RDD[(String,String)],kafkaParams:Map[String,String]){ val offsetRanges = directRDD.asInstanceOf[HasOffsetRanges].offsetRanges for(offr <-offsetRanges){ val topicAndPartitions = TopicAndPartition(offr.topic,offr.partition) val yesOrNo = kc.setConsumerOffsets(kafkaParams.get("group.id").get, Map(topicAndPartitions->offr.untilOffset)) if(yesOrNo.isLeft){ println(s"Error when update offset of $topicAndPartitions") } } } }/* val conf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc,Seconds(4)) val kafkaParams = Map[String,String]( "metadata.broker.list"->"bigdata01:9092,bigdata02:9092,bigdata03:9092") val kc = new KafkaCluster(kafkaParams) //获取topic与paritions的信息 //val tmp = kc.getPartitions(Set[String]("test7")) //结果:topicAndPartitons=Set([test7,0], [test7,1], [test7,2]) //val topicAndPartitons = tmp.right.get //println(topicAndPartitons) //每个分区对应的leader信息 //val tmp = kc.getPartitions(Set[String]("test7")) //val topicAndPartitons = tmp.right.get //结果:leadersPerPartitions= Right(Map([test7,0] -> (bigdata03,9092), [test7,1] -> (bigdata01,9092), [test7,2] -> (bigdata02,9092))) //val leadersPerPartitions = kc.findLeaders(topicAndPartitons) //println(leadersPerPartitions) //每增加一条消息,对应的partition的offset都会加1,即LeaderOffset(bigdata02,9092,23576)第三个参数会加一 //val tmp = kc.getPartitions(Set[String]("test")) //val topicAndPartitons = tmp.right.get //结果t= Right(Map([test7,0] -> LeaderOffset(bigdata03,9092,23568), [test7,2] -> LeaderOffset(bigdata02,9092,23576), [test7,1] -> LeaderOffset(bigdata01,9092,23571))) //val t = kc.getLatestLeaderOffsets(topicAndPartitons) // println(t) //findLeader需要两个参数 topic 分区编号 //val tmp = kc.findLeader("test7",0) //结果leader=RightProjection(Right((bigdata03,9092))) //val leader = tmp.right //val tp = leader.flatMap(x=>{Either.cond(false, None,(x._1,x._2))}) val tmp = kc.getPartitions(Set[String]("test")) val ttp = tmp.right.get while(true){ try{ val tp = kc.getConsumerOffsets("group_test1", ttp) val maps = tp.right.get println(maps) Thread.sleep(2000) } catch{ case ex:NoSuchElementException=>{println("test")} } }*/看完上述内容,你们掌握spark-streaming-kafka怎样通过KafkaUtils.createDirectStream的方式处理数据的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!
数据
方式
结果
消费
处理
信息
内容
函数
参数
方法
更多
类型
问题
加一
束手无策
为此
三个
两个
位置
原因
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发工程师的潜在工作部门
服务器安全策略下载文件
小旋风php服务器
多个微信小程序共用一个服务器
如何做好通信网络安全
字超少的网络安全小报带字的
g200e服务器显卡驱动
上海互联网软件开发不二之选
密山dns服务器地址
叮咚买菜云服务器
内存数据库sqlite使用
服务器无法复制文件
恩施管理软件开发价格
内网构建服务器
中科可控服务器文档
手机诈骗网络安全ppt
不属于关系型数据库的是
对接数据库需要知道哪些信息
考勤现实不可识别数据库
用友t3数据库后台怎么进
网络安全入侵科普
st芯片软件开发入门
计算机网络技术竞争力大不大
万方数据库硕士论文如何替换
冒险岛数据库恢复教程
网络安全顺口溜八句
我的世界网易服务器手机进不去
电脑与服务器链接出现问题
团队软件开发工具
鼓楼区提供软件开发创新服务