千家信息网

sparkRDD 算子的创建和使用

发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,spark是大数据领域近几年比较火的编程开发语言。有众多的好处,比如速度快,基于内存式计算框架。不多说直接讲 spark的RDD 算子的使用。如果有spark环境搭建等问题,请自行查找资料。本文不做讲
千家信息网最后更新 2025年12月03日sparkRDD 算子的创建和使用

spark是大数据领域近几年比较火的编程开发语言。有众多的好处,比如速度快,基于内存式计算框架。

不多说直接讲 spark的RDD 算子的使用。

如果有spark环境搭建等问题,请自行查找资料。本文不做讲述。

spark rdd的创建有两种方式:

1>从集合创建。也就是从父rdd继承过来

2>从外部创建。



import java.util.Arrays;import java.util.Iterator;import java.util.List;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;import com.google.common.base.Optional;import scala.Tuple2;public class Demo01 {        public static void main(String[] args) {                                SparkConf conf = new SparkConf().setAppName("Demo01").setMaster("local");                JavaSparkContext jsc = new JavaSparkContext(conf);                                //map(jsc);                //filter(jsc);            // flatMap(jsc);                //groupByKey(jsc);                //reduceByKey(jsc);                //sortByKey(jsc);                //join(jsc);                leftOutJoin(jsc);                jsc.stop();        }        //每一条元素 都乘以2,并且打印        private static void map(JavaSparkContext jsc) {                                //数据源                List lst = Arrays.asList(1,2,3,4,5,6,7,8);                                JavaRDD numRDD = jsc.parallelize(lst);                                JavaRDD resultRDD = numRDD.map(new Function() {                        private static final long serialVersionUID = 1L;                        @Override                        public Integer call(Integer num) throws Exception {                                                                return num * 2;                        }                });                                resultRDD.foreach(new VoidFunction() {                                                private static final long serialVersionUID = 1L;                        @Override                        public void call(Integer num) throws Exception {                                          System.out.println(num);                        }                });                         }                // 把集合中的偶数过滤出来        private static void filter(JavaSparkContext jsc) {                        //数据源                List lst = Arrays.asList(1,2,3,4,5,6,7,8);                                JavaRDD numRDD = jsc.parallelize(lst);                                System.out.println(numRDD.filter(new Function() {                        private static final long serialVersionUID = 1L;                        @Override                        public Boolean call(Integer num) throws Exception {                                                                return num % 2 ==0;                        }                }).collect());        }        //将一行行数据的单词拆分为一个个单词        private static void flatMap(JavaSparkContext jsc) {                                List lst = Arrays.asList("hi tim ","hello girl","hello spark");                                JavaRDD lines = jsc.parallelize(lst);                                JavaRDD resultRDD = lines.flatMap(new FlatMapFunction() {                        private static final long serialVersionUID = 1L;                        @Override                        public Iterable call(String line) throws Exception {                                                        return Arrays.asList(line.split(" "));                        }                });                                System.out.println(resultRDD.collect());        }        // 根据班级进行分组        private static void groupByKey(JavaSparkContext jsc) {                // int ,Integer                 // scala 里面的类型,没有像Java这样分为基本类型和包装类,因为scala是一种更加强的面向对象语言,                //一切皆对象,里面的类型,也有对应的方法可以调用,隐式转换                // 模拟数据                @SuppressWarnings("unchecked")                List> lst = Arrays.asList(                                new Tuple2("class01", 100),                                new Tuple2("class02",101),                                new Tuple2("class01",199),                                new Tuple2("class02",121),                                new Tuple2("class02",120));                                JavaPairRDD cla***DD = jsc.parallelizePairs(lst);                JavaPairRDD> groupedRDD = cla***DD.groupByKey();                                groupedRDD.foreach(new VoidFunction>>() {                        private static final long serialVersionUID = 1L;                        @Override                        public void call(Tuple2> tuple)                                        throws Exception {                                                                String classKey = tuple._1;                                Iterator values = tuple._2.iterator();                                while (values.hasNext()) {                                                                                Integer value = values.next();                                                                                System.out.println("key:" + classKey + "\t" + "value:" + value);                                }                        }                });        }                        private static void reduceByKey(JavaSparkContext jsc) {                                @SuppressWarnings("unchecked")                List> lst = Arrays.asList(                                new Tuple2("class01", 100),                                new Tuple2("class02",101),                                new Tuple2("class01",199),                                new Tuple2("class02",121),                                new Tuple2("class02",120));                                JavaPairRDD cla***DD = jsc.parallelizePairs(lst);                                JavaPairRDD resultRDD = cla***DD.reduceByKey(new Function2() {                        private static final long serialVersionUID = 1L;                        @Override                        public Integer call(Integer v1, Integer v2) throws Exception {                                                                return v1 + v2;                        }                });                                resultRDD.foreach(new VoidFunction>() {                        private static final long serialVersionUID = 1L;                        @Override                        public void call(Tuple2 tuple) throws Exception {                                System.out.println("key:" + tuple._1 + "\t" + "value:" + tuple._2);                                                        }                });        }        // 把学生的成绩前3名取出来,并打印        // 1.先排序sortByKey,然后take(3),再foreach        private static void sortByKey(JavaSparkContext jsc) {                                @SuppressWarnings("unchecked")                List> lst = Arrays.asList(                                new Tuple2("tom", 60),                                new Tuple2("kate",80),                                new Tuple2("kobe",100),                                new Tuple2("马蓉",4),                                new Tuple2("宋哲",2),                                new Tuple2("白百合",3),                                new Tuple2("隔壁老王",1));                                JavaPairRDD cla***DD = jsc.parallelizePairs(lst);                                JavaPairRDD pairRDD = cla***DD.mapToPair(new PairFunction,Integer , String>() {                                                private static final long serialVersionUID = 1L;                        @Override                        public Tuple2 call(Tuple2 tuple)                                        throws Exception {                                                                return new Tuple2(tuple._2, tuple._1);                        }                });                //do no                 JavaPairRDD sortedRDD = pairRDD.sortByKey();                JavaPairRDD sortedRDD01 = sortedRDD.mapToPair(new PairFunction, String, Integer>() {                        private static final long serialVersionUID = 1L;                        @Override                        public Tuple2 call(Tuple2 tuple)                                        throws Exception {                                                                return new Tuple2(tuple._2, tuple._1);                        }                } );                // take 也是一个action操作                List> result = sortedRDD01.take(3);                System.out.println(result);        }                        private static void join(JavaSparkContext jsc) {                                // 模拟数据                @SuppressWarnings("unchecked")                List> names =Arrays.asList(                                new Tuple2(1,"jack"),                                new Tuple2(2,"rose"),                                new Tuple2(3,"tom"),                                new Tuple2(4,"赵丽颖"));                                JavaPairRDD num2NamesRDD = jsc.parallelizePairs(names);                        List> scores = Arrays.asList(                                new Tuple2(1,60),                                new Tuple2(4,100),                                new Tuple2(2,30));                                    JavaPairRDD num2scoresRDD = jsc.parallelizePairs(scores);                                JavaPairRDD> joinedRDD = num2scoresRDD.join(num2NamesRDD);                                //姓名成绩排序,取前2名                JavaPairRDD score2NameRDD = joinedRDD.mapToPair(new PairFunction>,Integer, String>() {                        private static final long serialVersionUID = 1L;                        @Override                        public Tuple2 call(                                        Tuple2> tuple)                                        throws Exception {                                Integer score = tuple._2._1;                                String name = tuple._2._2;                                return new Tuple2(score,name);                        }                });                // sortByKey之后,你可以执行一个maptoPair的操作,转换为                System.out.println(score2NameRDD.sortByKey(false).take(2));        }                // 学生成绩改良版        private static void leftOutJoin(JavaSparkContext jsc) {                // 模拟数据                                @SuppressWarnings("unchecked")                                List> names =Arrays.asList(                                                new Tuple2(1,"jack"),                                                new Tuple2(2,"rose"),                                                new Tuple2(3,"tom"),                                                new Tuple2(4,"赵丽颖"));                                                                JavaPairRDD num2NamesRDD = jsc.parallelizePairs(names);                                                        List> scores = Arrays.asList(                                                new Tuple2(1,60),                                                new Tuple2(4,100),                                                new Tuple2(2,30));                                                                    JavaPairRDD num2scoresRDD = jsc.parallelizePairs(scores);                                                // num2scoresRDD num2NamesRDD                                //JavaPairRDD>> joinedRDD = num2NamesRDD.leftOuterJoin(num2scoresRDD);                                // 注意join,谁join谁,没区别,但是leftoutjoin 是有顺序的                                JavaPairRDD>> joinedRDD = num2NamesRDD.leftOuterJoin(num2scoresRDD);                                                                JavaPairRDD pairRDD = joinedRDD.mapToPair(new PairFunction>>, Integer, String>() {                                        private static final long serialVersionUID = 1L;                                        @Override                                        public Tuple2 call(                                                        Tuple2>> tuple)                                                        throws Exception {                                                                                                String name = tuple._2._1;                                                Optional scoreOptional = tuple._2._2;                                                Integer score = null;                                         if(scoreOptional.isPresent()){                                                score= scoreOptional.get();                                                  }else {                                                 score = 0;                                         }                                                                                                return new Tuple2(score, name);                                        }                                });                                                                JavaPairRDD sortedRDD = pairRDD.sortByKey(false);                                                                sortedRDD.foreach(new VoidFunction>() {                                        private static final long serialVersionUID = 1L;                                        @Override                                        public void call(Tuple2 tuple)                                                        throws Exception {                                                                                        if(tuple._1 == 0){                                                        System.out.println("name:" + tuple._2 + "\t" + "要努力了,你的成绩0分" );                                                }else{                                                        System.out.println("姓名:" + tuple._2 + "\t" + "分数:" + tuple._1);                                                }                                        }                                });                                        }}

如有疑问可跟帖讨论。欢迎拍砖

0