spark-sql的进阶案例
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,(1)骨灰级案例--UDTF求wordcount数据格式:每一行都是字符串并且以空格分开。代码实现:object SparkSqlTest { def main(args: Array[Stri
千家信息网最后更新 2025年12月02日spark-sql的进阶案例
(1)骨灰级案例--UDTF求wordcount
数据格式:
每一行都是字符串并且以空格分开。
代码实现:
object SparkSqlTest { def main(args: Array[String]): Unit = { //屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") val spark: SparkSession = SparkSession.builder().config(conf) .enableHiveSupport() .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext val wordDF: DataFrame = sqlContext.read.text("C:\\z_data\\test_data\\ip.txt").toDF("line") wordDF.createTempView("lines") val sql= """ |select t1.word,count(1) counts |from ( |select explode(split(line,'\\s+')) word |from lines) t1 |group by t1.word |order by counts """.stripMargin spark.sql(sql).show() }}结果:
(2)窗口函数求topN
数据格式:
取每门课程中成绩最好的前三
代码实现:
object SparkSqlTest { def main(args: Array[String]): Unit = { //屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") val spark: SparkSession = SparkSession.builder().config(conf) .enableHiveSupport() .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext val topnDF: DataFrame = sqlContext.read.json("C:\\z_data\\test_data\\score.json") topnDF.createTempView("student") val sql= """select |t1.course course, |t1.name name, |t1.score score |from ( |select |course, |name, |score, |row_number() over(partition by course order by score desc ) top |from student) t1 where t1.top<=3 """.stripMargin spark.sql(sql).show() }}结果:
(3)SparkSQL去处理DataSkew数据倾斜的问题
思路: (使用两阶段的聚合)
- 找到发生数据倾斜的key
- 对发生倾斜的数据的key进行拆分
- 做局部聚合
- 去后缀
- 全局聚合
以上面的wordcount为例,找出相应的数据量比较大的单词
代码实现:
object SparkSqlTest { def main(args: Array[String]): Unit = { //屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") val spark: SparkSession = SparkSession.builder().config(conf) .enableHiveSupport() .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //注册UDF sqlContext.udf.register[String,String,Integer]("add_prefix",add_prefix) sqlContext.udf.register[String,String]("remove_prefix",remove_prefix) //创建sparkContext对象 val sc: SparkContext = spark.sparkContext val lineRDD: RDD[String] = sc.textFile("C:\\z_data\\test_data\\ip.txt") //找出数据倾斜的单词 val wordsRDD: RDD[String] = lineRDD.flatMap(line => { line.split("\\s+") }) val sampleRDD: RDD[String] = wordsRDD.sample(false,0.2) val sortRDD: RDD[(String, Int)] = sampleRDD.map(word=>(word,1)).reduceByKey(_+_).sortBy(kv=>kv._2,false) val hot_word = sortRDD.take(1)(0)._1 val bs: Broadcast[String] = sc.broadcast(hot_word) import spark.implicits._ //将数据倾斜的key打标签 val lineDF: DataFrame = sqlContext.read.text("C:\\z_data\\test_data\\ip.txt") val wordDF: Dataset[String] = lineDF.flatMap(row => { row.getAs[String](0).split("\\s+") }) //有数据倾斜的word val hotDS: Dataset[String] = wordDF.filter(row => { val hot_word = bs.value row.equals(hot_word) }) val hotDF: DataFrame = hotDS.toDF("word") hotDF.createTempView("hot_table") //没有数据倾斜的word val norDS: Dataset[String] = wordDF.filter(row => { val hot_word = bs.value !row.equals(hot_word) }) val norDF: DataFrame = norDS.toDF("word") norDF.createTempView("nor_table") var sql= """ |(select |t3.word, |sum(t3.counts) counts |from (select |remove_prefix(t2.newword) word, |t2.counts |from (select |t1.newword newword, |count(1) counts |from |(select |add_prefix(word,3) newword |from hot_table) t1 |group by t1.newword) t2) t3 |group by t3.word) |union |(select | word, | count(1) counts |from nor_table |group by word) """.stripMargin spark.sql(sql).show() } //自定义UDF加前缀 def add_prefix(word:String,range:Integer): String ={ val random=new Random() random.nextInt(range)+"_"+word } //自定义UDF去除后缀 def remove_prefix(word:String): String ={ word.substring(word.indexOf("_")+1) }}结果:
数据
对象
入口
日志
结果
编程
代码
单词
后缀
格式
案例
一行
全局
函数
前缀
字符
字符串
局部
思路
成绩
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
最容易考的网络安全证书是啥
网络安全情况调查
网络安全联盟联系方式
乙方网络安全宣贯
安徽互联网软件开发公司
网络安全罚款条例
吉林潮流软件开发服务推广
医保代码数据库动态维护上班时间
建明长城数据库视频
数据库 字段 非法字符串
软件开发合同书范本
石家庄热巢网络技术
花都软件开发哪家不错
青少年网络安全必要性
卵菌功能分类数据库
苏州小程序软件开发教程
上海探索网络安全解决之道
网络安全员和工程师的区别
浪潮服务器机柜
网络安全法域外适用效力
服务器错误怎么回事啊
dns域名解析服务器
网络安全教育平台浙江
连接数据库的三个包是
数据库立即执行定时清除任务
coturn如何配置数据库
徐州机电软件开发服务电话
数据库原理名词解释系统故障
杭州网络技术客服电话
小米11无线网络技术第几代