千家信息网

DataSourceV2流处理方法是什么

发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,本篇内容介绍了"DataSourceV2流处理方法是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成
千家信息网最后更新 2025年12月02日DataSourceV2流处理方法是什么

本篇内容介绍了"DataSourceV2流处理方法是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

SparkSession结构化流处理最后其实是通过DataSet的writeStream触发执行的。这点与传统的spark sql方式是不一样的。writeStream会找到StreamingQueryManager的startQuery方法,然后一步步到MicroBatchExecution和ContinuousExecution。

核心点:MicroBatchExecution和ContinuousExecution里面会对StreamingRelationV2进行转换,转换成StreamingDataSourceV2Relation。而MicroBatchExecution和ContinuousExecution只有在StreamingQueryManager的createQuery方法中才会被使用到。那么这个StreamingQueryManager的createQuery方法会在哪里被使用到呢?跟踪代码会发现是DataStreamWriter中调用StreamingQueryManager的startQuery方法进而调用到createQuery方法的。

而DataStreamWriter是Dataset的writeStream创建的。

【以上说的是写入流的过程】。

关键类:BaseSessionStateBuilder,里面有analyzer的定义。

protected def analyzer: Analyzer = new Analyzer(catalog, v2SessionCatalog, conf) {    overrideval extendedResolutionRules: Seq[Rule[LogicalPlan]] =      new FindDataSourceTable(session) +:        new ResolveSQLOnFile(session) +:        new FallBackFileSourceV2(session) +:        DataSourceResolution(conf, this.catalogManager) +:        customResolutionRules    overrideval postHocResolutionRules: Seq[Rule[LogicalPlan]] =      new DetectAmbiguousSelfJoin(conf) +:        PreprocessTableCreation(session) +:        PreprocessTableInsertion(conf) +:        DataSourceAnalysis(conf) +:        customPostHocResolutionRules    overrideval extendedCheckRules: Seq[LogicalPlan => Unit] =      PreWriteCheck +:        PreReadCheck +:        HiveOnlyCheck +:        TableCapabilityCheck +:        customCheckRules  }

这里没有特别需要关注的,先忽略。

DataSourceV2是指spark中V2版本的结构化流处理引擎框架。这里说的逻辑计划就是StreamingDataSourceV2Relation,对应的物理计划分成两类:MicroBatchScanExec和ContinuousScanExec,两者的应用场景从取名上就可以分辨出来,一个是微批处理模式;另一个则是连续流模式。

我们先从物理计划开始解析。

这两个物理计划基于同一个父类:DataSourceV2ScanExecBase,先看看父类的代码:

关键代码:

override def doExecute(): RDD[InternalRow] = {    val numOutputRows = longMetric("numOutputRows")    inputRDD.map { r =>      numOutputRows += 1      r    }  }

子类需要重写inputRDD。

StreamExecution

两种重要的checkpoint属性:

val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))

val commitLog = new CommitLog(sparkSession, checkpointFile("commits"))

offsetLog是当前读取到哪个offset了,commitLog是当前处理到哪个Offset了。这两个Log非常重要,合在一起保证了Exactly-once语义。

MicroBatchScanExec

好了,先看看MicroBatchScanExec是怎么重写inputRDD的。

override lazy val partitions: Seq[InputPartition] = stream.planInputPartitions(start, end)  override lazy val readerFactory: PartitionReaderFactory = stream.createReaderFactory()  override lazy val inputRDD: RDD[InternalRow] = {    new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar)  }

有三个地方,第一个是重写Seq[InputPartition],调用stream的planInputPartitions方法,注意下这里的stream类型是MicroBatchStream;第二个是重写readerFactory,获得读取器工厂类;第三个重写是inputRDD,创建DataSourceRDD作为inputRDD,而前两步重写的Seq[InputPartition]和readerFactory作为DataSourceRDD的构造参数。

这里首先大概看下DataSourceRDD的功能是什么。

DataSourceRDD这个类的代码很短,很容易看清楚。最重要的就是compute方法,先给出全部代码:

override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {    val inputPartition = castPartition(split).inputPartition    val reader: PartitionReader[_] = if (columnarReads) {      partitionReaderFactory.createColumnarReader(inputPartition)    } else {      partitionReaderFactory.createReader(inputPartition)    }    context.addTaskCompletionListener[Unit](_ => reader.close())    val iter = new Iterator[Any] {      private[this] var valuePrepared = false      override def hasNext: Boolean = {        if (!valuePrepared) {          valuePrepared = reader.next()        }        valuePrepared      }      override def next(): Any = {        if (!hasNext) {          throw new java.util.NoSuchElementException("End of stream")        }        valuePrepared = false        reader.get()      }    }    // TODO: SPARK-25083 remove the type erasure hack in data source scan    new InterruptibleIterator(context, iter.asInstanceOf[Iterator[InternalRow]])  }

先根据读取器工厂类创建一个PartitionReader,然后调用PartitionReader的get方法获取数据。就是这么简单了!

ContinuousScanExec

最后再看下ContinuousScanExec的定义。

override lazy val partitions: Seq[InputPartition] = stream.planInputPartitions(start)  override lazy val readerFactory: ContinuousPartitionReaderFactory = {    stream.createContinuousReaderFactory()  }  override lazy val inputRDD: RDD[InternalRow] = {    EpochCoordinatorRef.get(      sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),      sparkContext.env)      .askSync[Unit](SetReaderPartitions(partitions.size))    new ContinuousDataSourceRDD(      sparkContext,      sqlContext.conf.continuousStreamingExecutorQueueSize,      sqlContext.conf.continuousStreamingExecutorPollIntervalMs,      partitions,      schema,      readerFactory.asInstanceOf[ContinuousPartitionReaderFactory])  }

和微批处理模式MicroBatchScanExec类似,也有三个地方重写,第一个是重写Seq[InputPartition],调用stream的planInputPartitions方法,注意下这里的stream类型是ContinuousStream;第二个是重写readerFactory,获得读取器工厂类ContinuousPartitionReaderFactory;第三个重写是inputRDD,创建ContinuousDataSourceRDD作为inputRDD,而前两步重写的Seq[InputPartition]和readerFactory作为ContinuousDataSourceRDD的构造参数。

这里首先大概看下ContinuousDataSourceRDD的功能是什么。

ContinuousDataSourceRDD的代码和DataSourceRDD的基本差不多,直接看源码吧,这里就不细说了,也没啥好细说的,显得啰里啰唆。

对于Kafka来说,ContinuousDataSourceRDD和DataSourceRDD其实最终是一样的

"DataSourceV2流处理方法是什么"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

方法 代码 处理 三个 重要 就是 工厂 模式 物理 读取器 两个 关键 内容 功能 参数 地方 更多 知识 类型 结构 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 为什么硕士毕业论文会传到数据库 哪个不是数据库设计建议的方法 校园订餐管理系统数据库 搭建网页服务器 android 机场网络技术专业 网络安全电信级 数据库日常备份周期 无线网络连接到服务器 数据库数据怎么保存到二维码中 在银行做软件开发对学历要求 金蝶k3打印凭证附单数据库 淄博包装软件开发价格 国外的网络安全知识 铁路网络安全工作要求 期货数据库创建及维护 学软件开发排名榜 计算机网络技术赚钱吗 大数据环境下的网络安全包括哪些 小学生网络安全班队会ppt 河南保护网络安全 sql数据库查询 不等于 网络安全哪个大学开设 网络安全法的网络运营者 用cmd如何连接数据库密码 巨之乾网络技术有限公司怎么样 数据库操作工具中文最新版 计算机数据库理论知识 网络安全日2020第几届 软件开发所需专业知识 魔兽怎么知道自己在哪个服务器
0