(版本定制)第15课:Spark Streaming源码解读之No Receivers彻底思考
发表于:2025-12-04 作者:千家信息网编辑
千家信息网最后更新 2025年12月04日,hu本期内容:1、Kafka解密背景:目前No Receivers在企业中使用的越来越多,No Receivers具有更强的控制度,语义一致性。No Receivers是我们操作数据来源自然方式,操作
千家信息网最后更新 2025年12月04日(版本定制)第15课:Spark Streaming源码解读之No Receivers彻底思考
hu本期内容:
1、Kafka解密
背景:
目前No Receivers在企业中使用的越来越多,No Receivers具有更强的控制度,语义一致性。No Receivers是我们操作数据来源自然方式,操作数据来源使用一个封装器,且是RDD类型的。
所以Spark Streaming就产生了自定义RDD -> KafkaRDD.
源码分析:
1、KafkaRDD源码
private[kafka]class KafkaRDD[K: ClassTag,V: ClassTag,U <: Decoder[_]: ClassTag,T <: Decoder[_]: ClassTag,R: ClassTag] private[spark] ( sc: SparkContext,kafkaParams: Map[String, String],val offsetRanges: Array[OffsetRange], //指定数据范围leaders: Map[TopicAndPartition, (String, Int)],messageHandler: MessageAndMetadata[K, V] => R) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {override def getPartitions: Array[Partition] = { offsetRanges.zipWithIndex.map { case (o, i) =>val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port) }.toArray }2、HasOffsetRanges
/** * Represents any object that has a collection of [[OffsetRange]]s. This can be used to access the * offset ranges in RDDs generated by the direct Kafka DStream (see * [[KafkaUtils.createDirectStream()]]). * {{{* KafkaUtils.createDirectStream(...).foreachRDD { rdd => * val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges * ... * } * }}}*/trait HasOffsetRanges {def offsetRanges: Array[OffsetRange]}3、KafkaRDD中的compute
override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {val part = thePart.asInstanceOf[KafkaRDDPartition]assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))if (part.fromOffset == part.untilOffset) { log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +s"skipping ${part.topic} ${part.partition}")Iterator.empty} else {new KafkaRDDIterator(part, context) }}SparkStreaming一般使用KafkaUtils的createDirectStream读取数据
def createDirectStream[K: ClassTag,V: ClassTag,KD <: Decoder[K]: ClassTag,VD <: Decoder[V]: ClassTag] ( ssc: StreamingContext,kafkaParams: Map[String, String],topics: Set[String]): InputDStream[(K, V)] = {val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)val kc = new KafkaCluster(kafkaParams)val fromOffsets = getFromOffsets(kc, kafkaParams, topics)new DirectKafkaInputDStream[K, V, KD, VD, (K, V)]( ssc, kafkaParams, fromOffsets, messageHandler)}4、通过getFromOffsets的方法获取topic的fromOffset值
[kafka] ( kc: KafkaClusterkafkaParams: []topics: [] ): [TopicAndPartition] = {reset = kafkaParams.get().map(_.toLowerCase)result = { topicPartitions <- kc.getPartitions(topics).right leaderOffsets <- ((reset == ()) { kc.getEarliestLeaderOffsets(topicPartitions) } { kc.getLatestLeaderOffsets(topicPartitions) }).right } { leaderOffsets.map { (tplo) => (tplo.offset) } } KafkaCluster.(result)}createDirectStream其实生成的是DirectKafkaInputDStream对象,通过compute方法会产生KafkaRDD
(validTime: Time): Option[KafkaRDD[]] = {untilOffsets = clamp(latestLeaderOffsets())rdd = []( context.sparkContextkafkaParamsuntilOffsetsmessageHandler)offsetRanges = .map { (tpfo) =>uo = untilOffsets(tp)(tp.topictp.partitionfouo.offset) }description = offsetRanges.filter { offsetRange =>offsetRange.fromOffset != offsetRange.untilOffset }.map { offsetRange =>{offsetRange.topic}{offsetRange.partition}+{offsetRange.fromOffset}{offsetRange.untilOffset}}.mkString()metadata = (-> offsetRanges.toListStreamInputInfo.-> description)inputInfo = (rdd.countmetadata) ssc...reportInfo(validTimeinputInfo)= untilOffsets.map(kv => kv._1 -> kv._2.offset)(rdd)}采用Direct的好处?
1. Direct方式没有数据缓存,因此不会出现内存溢出,但是如果采用Receiver的话就需要缓存。
2. 如果采用Receiver的方式,不方便做分布式,而Direct方式默认数据就在多台机器上。
3. 在实际操作的时候如果采用Receiver的方式的弊端是假设数据来不及处理,但是Direct就不会,因为是直接读取数据。
4. 语义一致性,Direct的方式数据一定会被执行。
数据
方式
源码
一致
一致性
方法
来源
缓存
语义
企业
内存
内容
分布式
制度
多台
好处
实际
对象
弊端
时候
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
教育系统网络安全检查报告
ics和代理服务器有什么不同
应急预案数据库
网络安全技术检查
ip连接mysql数据库
大脚插件进不了燃烧远征服务器
视图在数据库安全中的作用
一个网站的服务器需要多少核
幼儿园网络安全图文
有关俄罗斯的数据库
金圭网络技术有限公司
2021年软件开发工作好找吗
战地5服务器怎么进游戏
科研数据库技术指标
红颜瑟网络技术
做渲染的服务器
网络安全竞赛官网
r语言 服务器
传统的软件开发采用
余姚企业服务器
电脑租一个mc服务器多少钱
数据库签到系统设计
网络安全大牛的学历
服务器可以用键鼠吗
燕夏服务器
qq 合并旧数据库
密云区一站式网络技术服务优点
个人电脑装服务器系统
虚拟网络安全学校
网络安全密室大逃脱