(版本定制)第12课:Spark Streaming源码解读之Executor容错安全性
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,本期内容:1、Executor的WAL容错机制2、消息重放Executor的安全容错主要是数据的安全容错,那为什么不考虑数据计算的安全容错呢?原因是计算的时候Spark Streaming是借助于Sp
千家信息网最后更新 2025年12月01日(版本定制)第12课:Spark Streaming源码解读之Executor容错安全性
本期内容:
1、Executor的WAL容错机制
2、消息重放
Executor的安全容错主要是数据的安全容错,那为什么不考虑数据计算的安全容错呢?
原因是计算的时候Spark Streaming是借助于Spark Core上RDD的安全容错的,所以天然的安全可靠的。
Executor的安全容错主要有:
1、数据副本:
有两种方式:a.借助底层的BlockManager,BlockManager做备份,通过传入的StorageLevel进行备份。
b. WAL方式进行容错。
2、接受到数据之后,不做副本,但是数据源支持存放,所谓存放就是可以反复的读取源数据。
容错的弊端:耗时间、耗空间。
简单的看下源代码:
/** Store block and report it to driver */def pushAndReportBlock( receivedBlock: ReceivedBlock,metadataOption: Option[Any],blockIdOption: Option[StreamBlockId] ) {val blockId = blockIdOption.getOrElse(nextBlockId)val time = System.currentTimeMillisval blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")val numRecords = blockStoreResult.numRecordsval blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) logDebug(s"Reported block $blockId")}privateval receivedBlockHandler: ReceivedBlockHandler = {if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {if (checkpointDirOption.isEmpty) {throw new SparkException("Cannot enable receiver write-ahead log without checkpoint directory set. " +"Please use streamingContext.checkpoint() to set the checkpoint directory. " +"See documentation for more details.") }new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get) //通过WAL容错 } else {new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel) //通过BlockManager进行容错 }}def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {var numRecords = None: Option[Long]val putResult: Seq[(BlockId, BlockStatus)] = block match {case ArrayBufferBlock(arrayBuffer) => numRecords = Some(arrayBuffer.size.toLong) blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,tellMaster = true)case IteratorBlock(iterator) =>val countIterator = new CountingIterator(iterator)val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,tellMaster = true) numRecords = countIterator.count putResultcase ByteBufferBlock(byteBuffer) => blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)case o =>throw new SparkException(s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}") }if (!putResult.map { _._1 }.contains(blockId)) {throw new SparkException(s"Could not store $blockId to block manager with storage level $storageLevel") }BlockManagerBasedStoreResult(blockId, numRecords)}简单流程图:
参考博客:http://blog.csdn.net/hanburgud/article/details/51471089
容错
安全
数据
副本
备份
方式
内容
博客
原因
天然
就是
底层
弊端
数据源
时候
机制
流程
流程图
消息
源代码
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数钥网络技术有限公司陈小姐
电商用什么软件开发
刀片服务器改装台式机
网络安全反诈措施
荒野乱斗 服务器 域名
mc服务器软件终止连接
管家婆创业版非法数据库文件
全境封锁香港服务器
大一数据库技术及应用作业
数据库课程设计报告 vb
软件开发和接口开发
广电网络技术员面试题目
港服psn服务器地址
DB2数据库表的面试题
洛阳洛龙区软件开发
为提高软件开发效率应采用
超威服务器三个灯图解
虚荣游戏服务器连不上
网络安全wow
华为gpu服务器收费标准
瑞松科技工业互联网
审计学的相关数据库
mc服务器软件终止连接
大一数据库技术及应用作业
速达数据库损坏
杨浦区专业软件开发采购
网络安全 罗春
腾讯云服务器价格表
网络安全宣传周正式启动
易语言外部数据库和树形框