SparkSQL如何运用
发表于:2025-11-10 作者:千家信息网编辑
千家信息网最后更新 2025年11月10日,今天小编给大家分享一下SparkSQL如何运用的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解
千家信息网最后更新 2025年11月10日SparkSQL如何运用
今天小编给大家分享一下SparkSQL如何运用的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。
一:SparkSQL
1.SparkSQL简介
Spark SQL是Spark的一个模块,用于处理结构化的数据,它提供了一个数据抽象DataFrame(最核心的编程抽象就是DataFrame),并且SparkSQL作为分布式SQL查询引擎。
Spark SQL就是将SQL转换成一个任务,提交到集群上运行,类似于Hive的执行方式。
2.SparkSQL运行原理
将Spark SQL转化为RDD,然后提交到集群执行。
3.SparkSQL特点
(1)容易整合,Spark SQL已经集成在Spark中
(2)提供了统一的数据访问方式:JSON、CSV、JDBC、Parquet等都是使用统一的方式进行访问
(3)兼容 Hive
(4)标准的数据连接:JDBC、ODBC
二、SparkSQL运用
package sqlimport org.apache.avro.ipc.specific.Personimport org.apache.sparkimport org.apache.spark.rdd.RDDimport org.apache.spark.sqlimport org.apache.spark.sql.catalyst.InternalRowimport org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import org.junit.Testclass Intro { @Test def dsIntro(): Unit ={ val spark: SparkSession = new sql.SparkSession.Builder() .appName("ds intro") .master("local[6]") .getOrCreate() //导入隐算是shi转换 import spark.implicits._ val sourceRDD: RDD[Person] =spark.sparkContext.parallelize(Seq(Person("张三",10),Person("李四",15))) val personDS: Dataset[Person] =sourceRDD.toDS();//personDS.printSchema()打印出错信息 val resultDS: Dataset[Person] =personDS.where('age>10) .select('name,'age) .as[Person] resultDS.show() } @Test def dfIntro(): Unit ={ val spark: SparkSession =new SparkSession.Builder() .appName("ds intro") .master("local") .getOrCreate() import spark.implicits._ val sourceRDD: RDD[Person] = spark.sparkContext.parallelize(Seq(Person("张三",10),Person("李四",15))) val df: DataFrame = sourceRDD.toDF()//隐shi转换 df.createOrReplaceTempView("person")//创建表 val resultDF: DataFrame =spark.sql("select name from person where age>=10 and age<=20") resultDF.show() } @Test def database1(): Unit ={ //1.创建sparkSession val spark: SparkSession =new SparkSession.Builder() .appName("database1") .master("local[6]") .getOrCreate() //2.导入引入shi子转换 import spark.implicits._ //3.演示 val sourceRDD: RDD[Person] =spark.sparkContext.parallelize(Seq(Person("张三",10),Person("李四",15))) val dataset: Dataset[Person] =sourceRDD.toDS() //Dataset 支持强类型的API dataset.filter(item => item.age >10).show() //Dataset 支持若弱类型的API dataset.filter('age>10).show() //Dataset 可以直接编写SQL表达式 dataset.filter("age>10").show() } @Test def database2(): Unit ={ val spark: SparkSession = new SparkSession.Builder() .master("local[6]") .appName("database2") .getOrCreate() import spark.implicits._ val dataset: Dataset[Person] =spark.createDataset(Seq(Person("张三",10),Person("李四",20))) //无论Dataset中放置的是什么类型的对象,最终执行计划中的RDD上都是internalRow //直接获取到已经分析和解析过得Dataset的执行计划,从中拿到RDD val executionRdd: RDD[InternalRow] =dataset.queryExecution.toRdd //通过将Dataset底层的RDD通过Decoder转成了和Dataset一样的类型RDD val typedRdd:RDD[Person] = dataset.rdd println(executionRdd.toDebugString) println() println() println(typedRdd.toDebugString) } @Test def database3(): Unit = { //1.创建sparkSession val spark: SparkSession = new SparkSession.Builder() .appName("database1") .master("local[6]") .getOrCreate() //2.导入引入shi子转换 import spark.implicits._ val dataFrame: DataFrame = Seq(Person("zhangsan", 15), Person("lisi", 20)).toDF() //3.看看DataFrame可以玩出什么花样 //select name from... dataFrame.where('age > 10) .select('name) .show() }// @Test// def database4(): Unit = {// //1.创建sparkSession// val spark: SparkSession = new SparkSession.Builder()// .appName("database1")// .master("local[6]")// .getOrCreate()// //2.导入引入shi子转换// import spark.implicits._// val personList=Seq(Person("zhangsan",15),Person("lisi",20))//// //1.toDF// val df1: DataFrame =personList.toDF()// val df2: DataFrame =spark.sparkContext.parallelize(personList).toDF()// //2.createDataFrame// val df3: DataFrame =spark.createDataFrame(personList)//// //3.read// val df4: DataFrame =spark.read.csv("")// df4.show()// } //toDF()是转成DataFrame,toDs是转成Dataset // DataFrame就是Dataset[Row] 代表弱类型的操作,Dataset代表强类型的操作,中的类型永远是row,DataFrame可以做到运行时类型安全,Dataset可以做到 编译时和运行时都安全@Testdef database4(): Unit = { //1.创建sparkSession val spark: SparkSession = new SparkSession.Builder() .appName("database1") .master("local[6]") .getOrCreate() //2.导入引入shi子转换 import spark.implicits._ val personList=Seq(Person("zhangsan",15),Person("lisi",20)) //DataFrame代表弱类型操作是编译时不安全 val df: DataFrame =personList.toDF() //Dataset是强类型的 val ds: Dataset[Person] =personList.toDS() ds.map((person:Person) =>Person(person.name,person.age))} @Test def row(): Unit ={ //1.Row如何创建,它是什么 //row对象必须配合Schema对象才会有列名 val p: Person =Person("zhangsan",15) val row: Row =Row("zhangsan",15) //2.如何从row中获取数据 row.getString(0) row.getInt(1) //3.Row也是样例类、 row match { case Row(name,age) => println(name,age) } }}case class Person(name: String, age: Int)以上就是"SparkSQL如何运用"这篇文章的所有内容,感谢各位的阅读!相信大家阅读完这篇文章都有很大的收获,小编每天都会为大家更新不同的知识,如果还想学习更多的知识,请关注行业资讯频道。
类型
数据
就是
知识
篇文章
运行
代表
对象
方式
张三
李四
安全
内容
集群
统一
编译
不同
很大
中放
从中
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
radius服务器源代码
软件开发中复杂报表开发
征途手游怎么更换服务器
计算机网络安全课程大纲
千峰培训网络安全水平
上海上汽软件开发待遇
苏州多功能软件开发方法
bbc音频事件数据库
软件开发简历模版百度云
天龙八部茶马古道服务器人气
软件开发项目dmaic
服务器安全错误 》
网络安全计划表
软件开发员笔试真题
数据库中的故障种类有哪些
erp软件开发源代码
dell服务器鼠标键盘不通电
数据库怎么样解决高并发
软件开发用i5-6500
数据库建设要讯
网络安全电子信息
软件开发项目建设规范用表
软件开发人员事迹材料
软件开发小培训
数据库模型工程师
杭州安卓应用软件开发怎么收费
商用关系型数据库系统
安卓软件开发实践周士凯
美国网络安全与发展
签订软件开发合同引发