第13课:Spark Streaming源码解读之Drive
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,本期内容:ReceivedBlockTracker容错安全性DStream和JobGenerator容错安全性Driver的容错有两个层面:1. Receiver接收数据的元数据 2. Driver管
千家信息网最后更新 2025年12月03日第13课:Spark Streaming源码解读之Drive
本期内容:
ReceivedBlockTracker容错安全性
DStream和JobGenerator容错安全性
Driver的容错有两个层面:1. Receiver接收数据的元数据 2. Driver管理的各组件信息(调度和驱动层面)
元数据采用了WAL的容错机制
case AddBlock(receivedBlockInfo) => if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) { walBatchingThreadPool.execute(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { if (active) { context.reply(addBlock(receivedBlockInfo)) } else { throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.") } } }) } else { context.reply(addBlock(receivedBlockInfo)) } ... /** Add new blocks for the given stream */private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { receivedBlockTracker.addBlock(receivedBlockInfo)}元数据其实是交由ReceivedBlockTracker管理的。
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { try { val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) if (writeResult) { synchronized { getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo } logDebug(s"Stream ${receivedBlockInfo.streamId} received " + s"block ${receivedBlockInfo.blockStoreResult.blockId}") } else { logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " + s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.") } writeResult } catch { case NonFatal(e) => logError(s"Error adding block $receivedBlockInfo", e) false }}首先会调用writeToLog方法:
/** Write an update to the tracker to the write ahead log */private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = { if (isWriteAheadLogEnabled) { logTrace(s"Writing record: $record") try { writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)), clock.getTimeMillis()) true } catch { case NonFatal(e) => logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e) false } } else { true }}然后再将数据写入streamIdToUnallocatedBlockQueue 队列中。
每隔batchInterval时间后,Streaming的job被触发运行。此时要将streamIdToUnallocatedBlockQueue队列中的数据分配给具体的某个time。
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { val streamIdToBlocks = streamIds.map { streamId => (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) }.toMap val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { timeToAllocatedBlocks.put(batchTime, allocatedBlocks) lastAllocatedBatchTime = batchTime } else { logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery") } } else { // This situation occurs when: // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent, // possibly processed batch job or half-processed batch job need to be processed again, // so the batchTime will be equal to lastAllocatedBatchTime. // 2. Slow checkpointing makes recovered batch time older than WAL recovered // lastAllocatedBatchTime. // This situation will only occurs in recovery time. logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery") }}在此过程中也会写WAL日志
JobGenerator在每隔batchInterval时间,会被触发产生job
/** Generate jobs and perform checkpoint for the given `time`. */private def generateJobs(time: Time) { // Set the SparkEnv in this thread, so that job generation code can access the environment // Example: BlockRDDs are created in this thread, and it needs to access BlockManager // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. SparkEnv.set(ssc.env) Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))}最后往消息循环队列中放一个DoCheckpoint的消息。
JobGenerator接到消息后:
/** Processes all events */private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) case ClearCheckpointData(time) => clearCheckpointData(time) }}/** Perform checkpoint for the give `time`. */private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) { if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) { logInfo("Checkpointing graph for time " + time) ssc.graph.updateCheckpointData(time) checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater) }}根据ssc和time生成了一个Checkpoint对象。而ssc中有Driver的一切信息。所以当Driver崩溃后,能够根据Checkpoint数据来恢复Driver。
恢复的代码如下:
/** Restarts the generator based on the information in checkpoint */private def restart() { // If manual clock is being used for testing, then // either set the manual clock to the last checkpointed time, // or if the property is defined set it to that time if (clock.isInstanceOf[ManualClock]) { val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0) clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime) } val batchDuration = ssc.graph.batchDuration // Batches when the master was down, that is, // between the checkpoint and current restart timeval checkpointTime = ssc.initialCheckpoint.checkpointTimeval restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds)) val downTimes = checkpointTime.until(restartTime, batchDuration) logInfo("Batches during down time (" + downTimes.size + " batches): " + downTimes.mkString(", ")) // Batches that were unprocessed before failureval pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering) logInfo("Batches pending processing (" + pendingTimes.size + " batches): " + pendingTimes.mkString(", ")) // Reschedule jobs for these times val timesToReschedule = (pendingTimes ++ downTimes).filter { _ < restartTime } .distinct.sorted(Time.ordering) logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " + timesToReschedule.mkString(", ")) timesToReschedule.foreach { time => // Allocate the related blocks when recovering from failure, because some blocks that were // added but not allocated, are dangling in the queue after recovering, we have to allocate // those blocks to the next batch, which is the batch they were supposed to go. jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time))) } // Restart the timer timer.start(restartTime.milliseconds) logInfo("Restarted JobGenerator at " + restartTime)}备注:
1、DT大数据梦工厂微信公众号DT_Spark
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains
数据
容错
消息
队列
安全
安全性
层面
时间
管理
两个
中放
代码
件信息
信息
公众
内容
备注
大数
实战
对象
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
原油期货怎么交易软件开发
网络安全方面的挑战杯申报书
重庆云数网络技术服务有限公司
数据库有没有前途
ivx数据库查询系统
网络安全运维工程师防火墙
项目管理软件开发有几个阶段
ipv6服务器搭建v2
末日生存手机游戏无法连接服务器
时代朝阳数据库技术中心
文广新局网络安全自查报告
三一重工软件开发人员
数据库查询商品种类价格
小学四年级网络安全教案
软件开发工程师工资收入
开展等级保护之网络安全域
2021国家网络安全手抄报
r语言抓取亚马逊数据库
免费视频服务器源码
电驴如何选择服务器
mongondb备份数据库
数据库经历几个阶段
原装正版数据库
吉中多互联网科技有限公司
苹果服务器搭建
爱彼迎服务器
软件开发用的程序
鲜时光tv关注显示服务器异常
数据库管理平台 难点
入职计算机网络技术题库