千家信息网

spark源码系列之累加器实现机制及自定义累加器

发表于:2025-12-04 作者:千家信息网编辑
千家信息网最后更新 2025年12月04日,一,基本概念累加器是Spark的一种变量,顾名思义该变量只能增加。有以下特点:1,累加器只能在Driver端构建及并只能是Driver读取结果,Task只能累加。2,累加器不会改变Spark Lazy
千家信息网最后更新 2025年12月04日spark源码系列之累加器实现机制及自定义累加器

一,基本概念

累加器是Spark的一种变量,顾名思义该变量只能增加。有以下特点:

1,累加器只能在Driver端构建及并只能是Driver读取结果,Task只能累加。

2,累加器不会改变Spark Lazy计算的特点。只会在Job触发的时候进行相关累加操作。

3,现有累加器的类型。
相信有很多学习大数据的道友,在这里我给大家说说我滴群哦,大数据海量知识分享,784789432.在此我保证,绝对大数据的干货,等待各位的到来,我们一同从入门到精通吧!

二,累加器的使用

Driver端初始化,并在Action之后获取值。

val accum = sc.accumulator(0, "test Accumulator")
accum.value

Executor端进行计算

accum+=1;

三,累加器的重点类

Class Accumulator extends Accumulable

主要是实现了累加器的初始化及封装了相关的累加器操作方法。同时在类对象构建的时候向我们的Accumulators注册了累加器。累加器的add操作的返回值类型和我们传入的值类型可以不一样。所以,我们一定要定义好如何累加和合并值。也即add方法

object Accumulators:

该方法在Driver端管理着我们的累加器,也包含了特定累加器的聚合操作。

trait AccumulatorParam[T] extends AccumulableParam[T, T]:

AccumulatorParam的addAccumulator操作的泛型封装,具体的实现还是要再具体实现类里面实现addInPlace方法。

object AccumulatorParam:

主要是进行隐式类型转换的操作。

TaskContextImpl:

在Executor端管理着我们的累加器。

四,累加器的源码解析

1,Driver端的初始化

val accum = sc.accumulator(0, "test Accumulator")

val acc = new Accumulator(initialValue, param, Some(name))

主要是在Accumulable(Accumulator)中调用了,这样我们就可以使用Accumulator使用了。

Accumulators.register(this)

2,Executor端的反序列化得到我们对象的过程

首先,我们的value_ 可以看到其并不支持序列化

@volatile @transient private var value_ : R = initialValue // Current value on master

其初始化是在我们反序列化的时候做的,反序列化还完成了Accumulator向我们的TaskContextImpl的注册

反序列化是在调用ResultTask的RunTask方法的时候做的

val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

过程中会调用

private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
value_ = zero
deserialized = true
// Automatically register the accumulator when it is deserialized with the task closure.
//
// Note internal accumulators sent with task are deserialized before the TaskContext is created
// and are registered in the TaskContext constructor. Other internal accumulators, such SQL
// metrics, still need to register here.
val taskContext = TaskContext.get()
if (taskContext != null) {
taskContext.registerAccumulator(this)
}
}

3,累加器的累加

accum+=1;

param.addAccumulator(value_, term)

根据不同的累加器参数有不同的实现AccumulableParam

如,int类型。最终调用的AccumulatorParam特质的addAccumulator方法。

trait AccumulatorParam[T] extends AccumulableParam[T, T] {
def addAccumulator(t1: T, t2: T): T = {
addInPlace(t1, t2)
}
}

然后,调用的是各个具体实现的addInPlace方法

implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
def zero(initialValue: Int): Int = 0
}

返回后更新了我们的Accumulators的value_的值。

4,Accumulator的各个节点累加的之后的聚合操作

在Task类的run方法里面得到并返回的

(runTask(context), context.collectAccumulators())

最终在DAGScheduler里面调用了updateAccumulators(event)

在updateAccumulators方法中

Accumulators.add(event.accumUpdates)

具体内容如下:

def add(values: Map[Long, Any]): Unit = synchronized {
for ((id, value) <- values) {
if (originals.contains(id)) {
// Since we are now storing weak references, we must check whether the underlying data
// is valid.
originals(id).get match {
case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
case None =>
throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
}
} else {
logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
}
}
}

5,最后我们就可以获取到累加器的值了

accum.value

五,累加器使用注意事项

累加器不会改变我们RDD的Lazy的特性,之后再Action之后完成计算和更新。

但是假如出现两个Action公用一个转化操作,如map,在map里面进行累加器累加,那么每次action都会累加,造成某些我们不需要的结果。

六,自定义累加器

自定义累加器输出

七,总结

主要牵涉点就是序列化及类加载执行,这是深入玩spark的必须.
累加器 方法 序列 类型 时候 数据 是在 不同 变量 对象 特点 端的 结果 过程 封装 更新 管理 源码 顾名思义 两个 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 阿里巴巴公司网络技术 腾讯云服务器一核多少钱 杭州erp软件开发哪家正规 火车站报站软件开发 如何学习网络安全战略 徐州江苏服务器代理厂家虚拟主机 移动电子商务网络安全的类型 搭建红月服务器 服务器检车 记录小康工程黑龙江数据库 培养学生网络安全心得体会 电脑显示服务器白屏 教务管理系统是数据库管理系统吗 网络安全教育黑板报简单 学了数学该怎么进入软件开发 中科可控服务器管理口地址 学习通显示无法连接到服务器 我的世界精灵服务器库在哪 怎样做产品数据库 北京瑞星网络安全教育 四川数据网络技术服务工程 如何设计等级经验值数据库 上海岑茂网络技术有限公司 化学平衡常数数据库 上海博顼网络技术有限公司 苏州网络安全审计系统咨询中心 深圳市亿伯斯网络技术 外文论文数据库收录 2020网络安全宣传周答题 游戏数据库测试
0