spark2.x由浅入深深到底系列六之RDD 支持java8 lambda表达式
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,学习spark任何技术之前,请正确理解spark,可以参考:正确理解spark我们在 http://7639240.blog.51cto.com/7629240/1966131 中已经知道了,一个sc
千家信息网最后更新 2025年12月02日spark2.x由浅入深深到底系列六之RDD 支持java8 lambda表达式
学习spark任何技术之前,请正确理解spark,可以参考:正确理解spark
我们在 http://7639240.blog.51cto.com/7629240/1966131 中已经知道了,一个scala函数其实就是java中的一个接口,对于java8 lambda而言,也是一样,一个lambda表达式就是java中的一个接口。接下来我们先看看spark中最简单的wordcount这个例子,分别用java8的非lambda以及lambda来实现:
一、非lambda实现的java spark wordcount程序:
public class WordCount { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("appName").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); //JavaPairRDD inputRDD = sc.hadoopFile("hdfs://master:9999/user/word.txt", // TextInputFormat.class, LongWritable.class, Text.class); JavaRDD inputRDD = sc.textFile("file:///Users/tangweiqun/test.txt"); JavaRDD wordsRDD = inputRDD.flatMap(new FlatMapFunction() { @Override public Iterator call(String s) throws Exception { return Arrays.asList(s.split(" ")).iterator(); } }); JavaPairRDD keyValueWordsRDD = wordsRDD.mapToPair(new PairFunction() { @Override public Tuple2 call(String s) throws Exception { return new Tuple2(s, 1); } }); JavaPairRDD wordCountRDD = keyValueWordsRDD.reduceByKey(new HashPartitioner(2), new Function2() { @Override public Integer call(Integer a, Integer b) throws Exception { return a + b; } }); //如果输出文件存在的话需要删除掉 File outputFile = new File("/Users/tangweiqun/wordcount"); if (outputFile.exists()) { File[] files = outputFile.listFiles(); for(File file: files) { file.delete(); } outputFile.delete(); } wordCountRDD.saveAsTextFile("file:///Users/tangweiqun/wordcount"); System.out.println(wordCountRDD.collect()); }} 二、java8 lambda实现的wordcount代码
public class WordCount { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("appName").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); //JavaPairRDD inputRDD = sc.hadoopFile("hdfs://master:9999/user/word.txt", // TextInputFormat.class, LongWritable.class, Text.class); JavaRDD inputRDD = sc.textFile("file:///Users/tangweiqun/test.txt"); JavaRDD wordsRDD = inputRDD.flatMap(input -> Arrays.asList(input.split(" ")).iterator()); JavaPairRDD keyValueWordsRDD = wordsRDD.mapToPair(word -> new Tuple2(word, 1)); JavaPairRDD wordCountRDD = keyValueWordsRDD.reduceByKey((a, b) -> a + b); //如果输出文件存在的话需要删除掉 File outputFile = new File("/Users/tangweiqun/wordcount"); if (outputFile.exists()) { File[] files = outputFile.listFiles(); for(File file: files) { file.delete(); } outputFile.delete(); } wordCountRDD.saveAsTextFile("file:///Users/tangweiqun/wordcount"); System.out.println(wordCountRDD.collect()); }} 从上面可以看出,lambda的实现更加简洁,也可以看出一个lambda函数表达式就是一个java接口。
我们在http://7639240.blog.51cto.com/7629240/1966958提到的combineByKey,如下的代码:
JavaPairRDDjavaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2("coffee", 1), new Tuple2("coffee", 2), new Tuple2("panda", 3), new Tuple2("coffee", 9)), 2); //当在一个分区中遇到新的key的时候,对这个key对应的value应用这个函数Function > createCombiner = new Function >() { @Override public Tuple2 call(Integer value) throws Exception { return new Tuple2<>(value, 1); }};//当在一个分区中遇到已经应用过上面createCombiner函数的key的时候,对这个key对应的value应用这个函数Function2 , Integer, Tuple2 > mergeValue = new Function2 , Integer, Tuple2 >() { @Override public Tuple2 call(Tuple2 acc, Integer value) throws Exception { return new Tuple2<>(acc._1() + value, acc._2() + 1); } };//当需要对不同分区的数据进行聚合的时候应用这个函数Function2 , Tuple2 , Tuple2 > mergeCombiners = new Function2 , Tuple2 , Tuple2 >() { @Override public Tuple2 call(Tuple2 acc1, Tuple2 acc2) throws Exception { return new Tuple2<>(acc1._1() + acc2._1(), acc1._2() + acc2._2()); } }; JavaPairRDD > combineByKeyRDD = javaPairRDD.combineByKey(createCombiner, mergeValue, mergeCombiners);//结果:[(coffee,(12,3)), (panda,(3,1))]System.out.println("combineByKeyRDD = " + combineByKeyRDD.collect());
可以写成如下的lambda实现的combineByKey:
JavaPairRDDjavaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2("coffee", 1), new Tuple2("coffee", 2), new Tuple2("panda", 3), new Tuple2("coffee", 9)), 2);//当在一个分区中遇到新的key的时候,对这个key对应的value应用这个函数Function > createCombiner = value -> new Tuple2<>(value, 1);//当在一个分区中遇到已经应用过上面createCombiner函数的key的时候,对这个key对应的value应用这个函数Function2 , Integer, Tuple2 > mergeValue = (acc, value) ->new Tuple2<>(acc._1() + value, acc._2() + 1);//当需要对不同分区的数据进行聚合的时候应用这个函数Function2 , Tuple2 , Tuple2 > mergeCombiners = (acc1, acc2) -> new Tuple2<>(acc1._1() + acc2._1(), acc1._2() + acc2._2());JavaPairRDD > combineByKeyRDD = javaPairRDD.combineByKey(createCombiner, mergeValue, mergeCombiners);//结果:[(coffee,(12,3)), (panda,(3,1))]System.out.println("combineByKeyRDD = " + combineByKeyRDD.collect());
如果想深入的系统的理解spark RDD api可以参考: spark core RDD api原理详解
函数
应用
时候
就是
接口
表达式
不同
代码
数据
文件
结果
参考
输出
简洁
接下来
例子
原理
技术
程序
系统
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
集彩网络技术有限公司
联想rs588服务器参数
达州税控盘服务器连接端口
思科零信任网络安全架构
信息技术选修1网络技术总结
滴滴车数据库
网络安全通用词汇
网络安全主任
网络安全测试流程
cs服务器搭建
facebook付款服务器域名
中国科技创新与世界互联网大会
全民网络安全主题
数据库中外码怎么定义
济水县有容网络技术
知网数据库项目
吉林正规软件开发价格检测中心
esb系统部署在服务器上吗
抚顺助友软件开发有限公司
一台戴尔服务器多少钱
关联数据库有哪些
数据库NLR
软件开发公司技术部的职责
软件开发 招投标
监控服务器多少瓦
数据防泄密软件开发公司案例
网络安全 电力企业
网络安全扫码
网络安全的重要性的英语
网络安全周是几月几号2020年