9.spark core之共享变量
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,简介 spark执行操作时,可以使用驱动器程序Driver中定义的变量,但有时这种默认的使用方式却并不理想。集群中运行的每个任务都会连接驱动器获取变量。如果获取的变量比较大,执行效率会非常低下。每个
千家信息网最后更新 2025年12月02日9.spark core之共享变量
简介
spark执行操作时,可以使用驱动器程序Driver中定义的变量,但有时这种默认的使用方式却并不理想。
- 集群中运行的每个任务都会连接驱动器获取变量。如果获取的变量比较大,执行效率会非常低下。
- 每个任务都会得到这些变量的一份新的副本,更新这些副本的值不会影响驱动器中的对应变量。如果驱动器需要获取变量的结果值,这种方式是不可行的。
spark为了解决这两个问题,提供了两种类型的共享变量:广播变量(broadcast variable)和累加器(accumulator)。
- 广播变量用于高效分发较大的对象。会在每个执行器本地缓存一份大对象,而避免每次都连接驱动器获取。
- 累加器用于在驱动器中对数据结果进行聚合。
广播变量
原理
- 广播变量只能在Driver端定义,不能在Executor端定义。
- 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。
- 如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本;如果使用广播变量在每个Executor中只有一份Driver端的变量副本。
用法
- 通过对一个类型T的对象调用SparkContext.broadcast创建出一个BroadCast[T]对象,任何可序列化的类型都可以这么实现。
- 通过value属性访问该对象的值
- 变量只会被发到各个节点一次,应作为只读值处理。(修改这个值不会影响到别的节点)
实例
查询每个国家的呼号个数
python
# 将呼号前缀(国家代码)作为广播变量signPrefixes = sc.broadcast(loadCallSignTable())def processSignCount(sign_count, signPrefixes): country = lookupCountry(sign_count[0], signPrefixes.value) count = sign_count[1] return (country, count)countryContactCounts = (contactCounts.map(processSignCount).reduceByKey((lambda x, y: x+y)))countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")scala
// 将呼号前缀(国家代码)作为广播变量val signPrefixes = sc.broadcast(loadCallSignTable())def processSignCount(sign_count, signPrefixes): country = lookupCountry(sign_count[0], signPrefixes.value) count = sign_count[1] return (country, count)val countryContactCounts = contactCounts.map{case (sign, count) => { val country = lookupInArray(sign, signPrefixes.value) (country, count) }}.reduceByKey((x, y) => x+y)countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")java
// 将呼号前缀(国家代码)作为广播变量final Broadcast signPrefixes = sc.broadcast(loadCallSignTable());JavaPairRDD countryContactCounts = contactCounts.mapToPair(new PairFunction, String, Integer>() { public Tuple2 call(Tuple2 callSignCount) { String sign = callSignCount._1(); String country = lookupCountry(sign, signPrefixes.value()); return new Tuple2(country, callSignCount._2()); }}).reduceByKey(new SumInts());countryContactCounts.saveAsTextFile(outputDir + "/countries.txt"); 累加器
原理
- 累加器在Driver端定义赋初始值。
- 累加器只能在Driver端读取最后的值,在Excutor端更新。
用法
- 通过调用sc.accumulator(initivalValue)方法,创建出存有初始值的累加器。返回值为org.apache.spark.Accumulator[T]对象,其中T是初始值initialValue的类型。
- Spark闭包里的执行器代码可以使用累加器的+=方法增加累加器的值
- 驱动器程序可以调用累加器的value属性来访问累加器的值
实例
累加空行
python
file = sc.textFile(inputFile)# 创建Accumulator[Int]并初始化为0blankLines = sc.accumulator(0)def extractCallSigns(line): global blankLines # 访问全局变量 if (line == ""): blankLines += 1 return line.split(" ")callSigns = file.flatMap(extractCallSigns)callSigns.saveAsTextFile(outputDir + "/callsigns")print "Blank lines: %d" % blankLines.valuescala
val file = sc.textFile("file.txt")val blankLines = sc.accumulator(0) //创建Accumulator[Int]并初始化为0val callSigns = file.flatMap(line => { if (line == "") { blankLines += 1 //累加器加1 } line.split(" ")})callSigns.saveAsTextFile("output.txt")println("Blank lines:" + blankLines.value)java
JavaRDD rdd = sc.textFile(args[1]);final Accumulator blankLines = sc.accumulator(0);JavaRDD callSigns = rdd.flatMap(new FlatMapFunction() { public Iterable call(String line) { if ("".equals(line)) { blankLines.add(1); } return Arrays.asList(line.split(" ")); }});callSigns.saveAsTextFile("output.text");System.out.println("Blank lines:" + blankLines.value()); 忠于技术,热爱分享。欢迎关注公众号:java大数据编程,了解更多技术内容。
变量
累加器
广播
驱动器
驱动
对象
代码
副本
呼号
国家
类型
前缀
任务
原理
实例
属性
技术
数据
方式
方法
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
存储和服务器的启动顺序
数据库补丁sp4
学网络安全要学英语
校园网络安全手抄报中学生
校园软件开发简介
不能设置数据库的安全性
引领网络安全教育
数据库管理软件中文
滨湖到吴山中间有没有服务器
数据库例题专升本
软件开发个税怎么交
鄂尔多斯网络安全题
国际服务器数量
什么是外部数据库
绿园区网络技术咨询质量推荐
我的世界斗罗封神服务器QQ群号
非Oracle数据库如何布署
dell电源服务器
电子股软件开发
海岩小说软件开发
软件开发专业的前言
想去学习网络技术
电厂网络安全区
黎明觉醒服务器会爆满么
四川微信分销软件开发
eset 服务器版
软件开发技术选型标准是什么
服务器怎么防御
数据库中小数类型是什么
网络安全最好的是哪家