spark streaming窗口聚合操作后怎么管理offset
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,这篇文章主要介绍"spark streaming窗口聚合操作后怎么管理offset",在日常操作中,相信很多人在spark streaming窗口聚合操作后怎么管理offset问题上存在疑惑,小编查阅
千家信息网最后更新 2025年12月01日spark streaming窗口聚合操作后怎么管理offset
这篇文章主要介绍"spark streaming窗口聚合操作后怎么管理offset",在日常操作中,相信很多人在spark streaming窗口聚合操作后怎么管理offset问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"spark streaming窗口聚合操作后怎么管理offset"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
对于spark streaming来说窗口操作之后,是无法管理offset的,因为offset的存储于HasOffsetRanges。只有kafkaRDD继承了他,所以假如我们对KafkaRDD进行了转化之后就无法再获取offset了。
还有窗口之后的offset的管理,也是很麻烦的,主要原因就是窗口操作会包含若干批次的RDD数据,那么提交offset我们只需要提交最近的那个批次的kafkaRDD的offset即可。如何获取呢?
对于spark 来说代码执行位置分为driver和executor,我们希望再driver端获取到offset,在处理完结果提交offset,或者直接与结果一起管理offset。
说到driver端执行,其实我们只需要使用transform获取到offset信息,然后在输出操作foreachrdd里面使用提交即可。
package bigdata.spark.SparkStreaming.kafka010import java.util.Propertiesimport org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer}import org.apache.kafka.common.TopicPartitionimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.rdd.RDDimport org.apache.spark.streaming.kafka010._import org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.{SparkConf, TaskContext}import scala.collection.JavaConverters._import scala.collection.mutableobject kafka010NamedRDD {def main(args: Array[String]) {// 创建一个批处理时间是2s的context 要增加环境变量val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[*]")val ssc = new StreamingContext(sparkConf, Seconds(5))ssc.checkpoint("/opt/checkpoint")// 使用broker和topic创建DirectStreamval topicsSet = "test".split(",").toSetval kafkaParams = Map[String, Object]("bootstrap.servers" -> "mt-mdh.local:9093","key.deserializer"->classOf[StringDeserializer],"value.deserializer"-> classOf[StringDeserializer],"group.id"->"test4","auto.offset.reset" -> "latest","enable.auto.commit"->(false: java.lang.Boolean))// 没有接口提供 offsetval messages = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams,getLastOffsets(kafkaParams ,topicsSet)))//var A:mutable.HashMap[String,Array[OffsetRange]] = new mutable.HashMap()val trans = messages.transform(r =>{val offsetRanges = r.asInstanceOf[HasOffsetRanges].offsetRangesA += ("rdd1"->offsetRanges)r}).countByWindow(Seconds(10), Seconds(5))trans.foreachRDD(rdd=>{if(!rdd.isEmpty()){val offsetRanges = A.get("rdd1").get//.asInstanceOf[HasOffsetRanges].offsetRangesrdd.foreachPartition { iter =>val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")}println(rdd.count())println(offsetRanges)// 手动提交offset ,前提是禁止自动提交messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}// A.-("rdd1")})// 启动流ssc.start()ssc.awaitTermination()}def getLastOffsets(kafkaParams : Map[String, Object],topics:Set[String]): Map[TopicPartition, Long] ={val props = new Properties()props.putAll(kafkaParams.asJava)val consumer = new KafkaConsumer[String, String](props)consumer.subscribe(topics.asJavaCollection)paranoidPoll(consumer)val map = consumer.assignment().asScala.map { tp =>println(tp+"---" +consumer.position(tp))tp -> (consumer.position(tp))}.toMapprintln(map)consumer.close()map}def paranoidPoll(c: Consumer[String, String]): Unit = {val msgs = c.poll(0)if (!msgs.isEmpty) {// position should be minimum offset per topicpartitionmsgs.asScala.foldLeft(Map[TopicPartition, Long]()) { (acc, m) =>val tp = new TopicPartition(m.topic, m.partition)val off = acc.get(tp).map(o => Math.min(o, m.offset)).getOrElse(m.offset)acc + (tp -> off)}.foreach { case (tp, off) =>c.seek(tp, off)}}}}
到此,关于"spark streaming窗口聚合操作后怎么管理offset"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!
管理
学习
批次
更多
结果
帮助
实用
接下来
代码
位置
信息
前提
原因
变量
只有
就是
手动
接口
数据
文章
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
七日杀服务器怎么安装
贵州网络技术服务采购
网信办网络安全工作思路
海南服务管理软件开发
奥的斯用服务器呼梯方法
银行储蓄系统数据库管理软件
mariadb数据库学习视频
本地服务器网络
网络安全必读书
python软件开发工资
战地5怎么管理自己的服务器
SQL数据库怎么重新安装
数据库怎么处理过期机制
网络安全周 奖品
如何查看服务器是否安装yum
晋安区医院 网络安全 项目
安卓串口i软件开发
严格落实网络安全管理措施
医学常用的外文文献数据库
服务器3389
黄金岛与服务器
udc服务器成本
数据库页8k与32k的差别
战舰世界服务器排名查询网址
数据库设计教学
迁安咨询网络技术不二之选
网络安全自护小提示
妖精的尾巴服务器爆满怎么办
126企业邮箱收件服务器
现代战舰私人服务器