sparkRDD 算子的创建和使用
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,spark是大数据领域近几年比较火的编程开发语言。有众多的好处,比如速度快,基于内存式计算框架。不多说直接讲 spark的RDD 算子的使用。如果有spark环境搭建等问题,请自行查找资料。本文不做讲
千家信息网最后更新 2025年12月03日sparkRDD 算子的创建和使用
spark是大数据领域近几年比较火的编程开发语言。有众多的好处,比如速度快,基于内存式计算框架。
不多说直接讲 spark的RDD 算子的使用。
如果有spark环境搭建等问题,请自行查找资料。本文不做讲述。
spark rdd的创建有两种方式:
1>从集合创建。也就是从父rdd继承过来
2>从外部创建。
import java.util.Arrays;import java.util.Iterator;import java.util.List;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;import com.google.common.base.Optional;import scala.Tuple2;public class Demo01 { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Demo01").setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(conf); //map(jsc); //filter(jsc); // flatMap(jsc); //groupByKey(jsc); //reduceByKey(jsc); //sortByKey(jsc); //join(jsc); leftOutJoin(jsc); jsc.stop(); } //每一条元素 都乘以2,并且打印 private static void map(JavaSparkContext jsc) { //数据源 List lst = Arrays.asList(1,2,3,4,5,6,7,8); JavaRDD numRDD = jsc.parallelize(lst); JavaRDD resultRDD = numRDD.map(new Function() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer num) throws Exception { return num * 2; } }); resultRDD.foreach(new VoidFunction() { private static final long serialVersionUID = 1L; @Override public void call(Integer num) throws Exception { System.out.println(num); } }); } // 把集合中的偶数过滤出来 private static void filter(JavaSparkContext jsc) { //数据源 List lst = Arrays.asList(1,2,3,4,5,6,7,8); JavaRDD numRDD = jsc.parallelize(lst); System.out.println(numRDD.filter(new Function() { private static final long serialVersionUID = 1L; @Override public Boolean call(Integer num) throws Exception { return num % 2 ==0; } }).collect()); } //将一行行数据的单词拆分为一个个单词 private static void flatMap(JavaSparkContext jsc) { List lst = Arrays.asList("hi tim ","hello girl","hello spark"); JavaRDD lines = jsc.parallelize(lst); JavaRDD resultRDD = lines.flatMap(new FlatMapFunction() { private static final long serialVersionUID = 1L; @Override public Iterable call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); System.out.println(resultRDD.collect()); } // 根据班级进行分组 private static void groupByKey(JavaSparkContext jsc) { // int ,Integer // scala 里面的类型,没有像Java这样分为基本类型和包装类,因为scala是一种更加强的面向对象语言, //一切皆对象,里面的类型,也有对应的方法可以调用,隐式转换 // 模拟数据 @SuppressWarnings("unchecked") List> lst = Arrays.asList( new Tuple2("class01", 100), new Tuple2("class02",101), new Tuple2("class01",199), new Tuple2("class02",121), new Tuple2("class02",120)); JavaPairRDD cla***DD = jsc.parallelizePairs(lst); JavaPairRDD> groupedRDD = cla***DD.groupByKey(); groupedRDD.foreach(new VoidFunction>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2> tuple) throws Exception { String classKey = tuple._1; Iterator values = tuple._2.iterator(); while (values.hasNext()) { Integer value = values.next(); System.out.println("key:" + classKey + "\t" + "value:" + value); } } }); } private static void reduceByKey(JavaSparkContext jsc) { @SuppressWarnings("unchecked") List> lst = Arrays.asList( new Tuple2("class01", 100), new Tuple2("class02",101), new Tuple2("class01",199), new Tuple2("class02",121), new Tuple2("class02",120)); JavaPairRDD cla***DD = jsc.parallelizePairs(lst); JavaPairRDD resultRDD = cla***DD.reduceByKey(new Function2() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); resultRDD.foreach(new VoidFunction>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2 tuple) throws Exception { System.out.println("key:" + tuple._1 + "\t" + "value:" + tuple._2); } }); } // 把学生的成绩前3名取出来,并打印 // 1.先排序sortByKey,然后take(3),再foreach private static void sortByKey(JavaSparkContext jsc) { @SuppressWarnings("unchecked") List> lst = Arrays.asList( new Tuple2("tom", 60), new Tuple2("kate",80), new Tuple2("kobe",100), new Tuple2("马蓉",4), new Tuple2("宋哲",2), new Tuple2("白百合",3), new Tuple2("隔壁老王",1)); JavaPairRDD cla***DD = jsc.parallelizePairs(lst); JavaPairRDD pairRDD = cla***DD.mapToPair(new PairFunction,Integer , String>() { private static final long serialVersionUID = 1L; @Override public Tuple2 call(Tuple2 tuple) throws Exception { return new Tuple2(tuple._2, tuple._1); } }); //do no JavaPairRDD sortedRDD = pairRDD.sortByKey(); JavaPairRDD sortedRDD01 = sortedRDD.mapToPair(new PairFunction, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2 call(Tuple2 tuple) throws Exception { return new Tuple2(tuple._2, tuple._1); } } ); // take 也是一个action操作 List> result = sortedRDD01.take(3); System.out.println(result); } private static void join(JavaSparkContext jsc) { // 模拟数据 @SuppressWarnings("unchecked") List> names =Arrays.asList( new Tuple2(1,"jack"), new Tuple2(2,"rose"), new Tuple2(3,"tom"), new Tuple2(4,"赵丽颖")); JavaPairRDD num2NamesRDD = jsc.parallelizePairs(names); List> scores = Arrays.asList( new Tuple2(1,60), new Tuple2(4,100), new Tuple2(2,30)); JavaPairRDD num2scoresRDD = jsc.parallelizePairs(scores); JavaPairRDD> joinedRDD = num2scoresRDD.join(num2NamesRDD); //姓名成绩排序,取前2名 JavaPairRDD score2NameRDD = joinedRDD.mapToPair(new PairFunction>,Integer, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2 call( Tuple2> tuple) throws Exception { Integer score = tuple._2._1; String name = tuple._2._2; return new Tuple2(score,name); } }); // sortByKey之后,你可以执行一个maptoPair的操作,转换为 System.out.println(score2NameRDD.sortByKey(false).take(2)); } // 学生成绩改良版 private static void leftOutJoin(JavaSparkContext jsc) { // 模拟数据 @SuppressWarnings("unchecked") List> names =Arrays.asList( new Tuple2(1,"jack"), new Tuple2(2,"rose"), new Tuple2(3,"tom"), new Tuple2(4,"赵丽颖")); JavaPairRDD num2NamesRDD = jsc.parallelizePairs(names); List> scores = Arrays.asList( new Tuple2(1,60), new Tuple2(4,100), new Tuple2(2,30)); JavaPairRDD num2scoresRDD = jsc.parallelizePairs(scores); // num2scoresRDD num2NamesRDD //JavaPairRDD>> joinedRDD = num2NamesRDD.leftOuterJoin(num2scoresRDD); // 注意join,谁join谁,没区别,但是leftoutjoin 是有顺序的 JavaPairRDD>> joinedRDD = num2NamesRDD.leftOuterJoin(num2scoresRDD); JavaPairRDD pairRDD = joinedRDD.mapToPair(new PairFunction>>, Integer, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2 call( Tuple2>> tuple) throws Exception { String name = tuple._2._1; Optional scoreOptional = tuple._2._2; Integer score = null; if(scoreOptional.isPresent()){ score= scoreOptional.get(); }else { score = 0; } return new Tuple2(score, name); } }); JavaPairRDD sortedRDD = pairRDD.sortByKey(false); sortedRDD.foreach(new VoidFunction>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2 tuple) throws Exception { if(tuple._1 == 0){ System.out.println("name:" + tuple._2 + "\t" + "要努力了,你的成绩0分" ); }else{ System.out.println("姓名:" + tuple._2 + "\t" + "分数:" + tuple._1); } } }); }} 如有疑问可跟帖讨论。欢迎拍砖
数据
成绩
类型
单词
姓名
学生
对象
数据源
语言
排序
算子
一行
也就是
偶数
元素
内存
分数
好处
方式
方法
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库服务器大概多少钱
轻钢别墅龙骨设备加工软件开发
软件开发属于前端还是后端输出物
ftp文件共享服务器
山东综合软件开发直销价格
时序数据库论坛
服务器硬件安全
江西营销软件开发
征途服务器维护
网络技术售后服务方案
对淘宝网络安全的建议
达州市住建局数据库安全
网络安全系统补贴
中国移动网络技术类工作
LTE网络安全案例及分析
网络安全工作经费年度
忻州网络安全宣传周
itexam 未找到考试服务器
网络技术维护是什么
计算机网络设计和软件开发哪个好
山东综合软件开发直销价格
汕尾数据链软件开发报价行情
软件开发生命周期模型特点
软件开发企业即征即退政策
公安局网络安全检查表
迷你世界服务器被黑客给炸了
外派软件开发员怎么样
湖南服务器硬盘测评
ar眼镜软件开发
云服务器是一种虚拟技术吗