生产常用Spark累加器剖析之三(自定义累加器)
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,思路 & 需求参考IntAccumulatorParam的实现思路(上述文章中有讲):trait AccumulatorParam[T] extends AccumulableParam[T, T]
千家信息网最后更新 2025年12月02日生产常用Spark累加器剖析之三(自定义累加器)
思路 & 需求
参考IntAccumulatorParam的实现思路(上述文章中有讲):
trait AccumulatorParam[T] extends AccumulableParam[T, T] { def addAccumulator(t1: T, t2: T): T = { // addInPlace有很多具体的实现类 // 如果想要实现自定义的话,就得实现这个方法 addInPlace(t1, t2) }}自定义也可以通过这个方法去实现,从而兼容我们自定义的累加器
需求:这里实现一个简单的案例,用分布式的方法去实现随机数
** * 自定义的AccumulatorParam * * Created by lemon on 2018/7/28. */object UniqueKeyAccumulator extends AccumulatorParam[Map[Int, Int]] { override def addInPlace(r1: Map[Int, Int], r2: Map[Int, Int]): Map[Int, Int] = { // ++用于两个集合相加 r1++r2 } override def zero(initialValue: Map[Int, Int]): Map[Int, Int] = { var data: Map[Int, Int] = Map() data }}/** * 使用自定义的累加器,实现随机数 * * Created by lemon on 2018/7/28. */object CustomAccumulator { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("CustomAccumulator").setMaster("local[2]") val sc = new SparkContext(sparkConf) val uniqueKeyAccumulator = sc.accumulable(Map[Int, Int]())(UniqueKeyAccumulator) val distData = sc.parallelize(1 to 10) val mapCount = distData.map(x => { val randomNum = new Random().nextInt(20) // 构造一个k-v对 val map: Map[Int, Int] = Map[Int, Int](randomNum -> randomNum) uniqueKeyAccumulator += map }) println(mapCount.count()) // 获取到累加器的值 中的key值,并进行打印 uniqueKeyAccumulator.value.keys.foreach(println) sc.stop() }}运行结果如下图:## 思路 & 需求
参考IntAccumulatorParam的实现思路(上述文章中有讲):
trait AccumulatorParam[T] extends AccumulableParam[T, T] { def addAccumulator(t1: T, t2: T): T = { // addInPlace有很多具体的实现类 // 如果想要实现自定义的话,就得实现这个方法 addInPlace(t1, t2) }}自定义也可以通过这个方法去实现,从而兼容我们自定义的累加器
需求:这里实现一个简单的案例,用分布式的方法去实现随机数
** * 自定义的AccumulatorParam * * Created by lemon on 2018/7/28. */object UniqueKeyAccumulator extends AccumulatorParam[Map[Int, Int]] { override def addInPlace(r1: Map[Int, Int], r2: Map[Int, Int]): Map[Int, Int] = { // ++用于两个集合相加 r1++r2 } override def zero(initialValue: Map[Int, Int]): Map[Int, Int] = { var data: Map[Int, Int] = Map() data }}/** * 使用自定义的累加器,实现随机数 * * Created by lemon on 2018/7/28. */object CustomAccumulator { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("CustomAccumulator").setMaster("local[2]") val sc = new SparkContext(sparkConf) val uniqueKeyAccumulator = sc.accumulable(Map[Int, Int]())(UniqueKeyAccumulator) val distData = sc.parallelize(1 to 10) val mapCount = distData.map(x => { val randomNum = new Random().nextInt(20) // 构造一个k-v对 val map: Map[Int, Int] = Map[Int, Int](randomNum -> randomNum) uniqueKeyAccumulator += map }) println(mapCount.count()) // 获取到累加器的值 中的key值,并进行打印 uniqueKeyAccumulator.value.keys.foreach(println) sc.stop() }}运行结果如下图:
累加器
方法
思路
随机数
需求
两个
分布式
可以通过
文章
案例
结果
得实
参考
运行
常用
剖析
生产
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
l4d2怎么开服务器
服务器错误请通知管理员
网络安全清单化管理制度
计算机网络技术与火箭
福建电信网络服务器虚拟主机
数据库原理和数据库技术
市教育信息化与网络安全领导小组
英国知名网络安全公司
服务器的管理员怎么设置
网络数据库营销特点
好奇数据库HCIA考证
长沙智能软件开发工程师
24小时news金融网络安全
互联网是高科技企业吗
无线网络技术从哪几个方面分类
2020年黑魂3连接不了服务器
超市数据库功能流程图
山东金桥网络技术
数据库中运行脚本查询
王者未连接服务器是什么意思
基因组数据库的优势
网络安全可靠化
广州智慧网络技术有限公司
网络安全如何影响组织
数据库表管理关键技术
蜜蜂剪辑软件开发
汕尾专业软件开发供应商
网络安全协议分析 张连成
数据库性别怎么添加约束
翼道服务器安装centos系统