Spark RDD转换成DataFrame的两种方式
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,Spark SQL支持两种方式将现有RDD转换为DataFrame。第一种方法使用反射来推断RDD的schema并创建DataSet然后将其转化为DataFrame。这种基于反射方法十分简便,但是前提
千家信息网最后更新 2025年12月01日Spark RDD转换成DataFrame的两种方式
Spark SQL支持两种方式将现有RDD转换为DataFrame。
第一种方法使用反射来推断RDD的schema并创建DataSet然后将其转化为DataFrame。这种基于反射方法十分简便,但是前提是在您编写Spark应用程序时就已经知道RDD的schema类型。
第二种方法是通过编程接口,使用您构建的StructType,然后将其应用于现有RDD。虽然此方法很麻烦,但它允许您在运行之前并不知道列及其类型的情况下构建DataSet
方法如下 1.将RDD转换成Rows 2.按照第一步Rows的结构定义StructType 3.基于rows和StructType使用createDataFrame创建相应的DF测试数据为order.data
1 小王 电视 12 2015-08-01 09:08:311 小王 冰箱 24 2015-08-01 09:08:142 小李 空调 12 2015-09-02 09:01:31代码如下:
object RDD2DF { /** * 主要有两种方式 * 第一种是在已经知道schema已经知道的情况下,我们使用反射把RDD转换成DS,进而转换成DF * 第二种是你不能提前定义好case class,例如数据的结构是以String类型存在的。我们使用接口自定义一个schema * @param args */ def main(args: Array[String]): Unit = { val spark=SparkSession.builder() .appName("DFDemo") .master("local[2]") .getOrCreate()// rdd2DFFunc1(spark) rdd2DFFunc2(spark) spark.stop() } /** * 提前定义好case class * @param spark */ def rdd2DFFunc1(spark:SparkSession): Unit ={ import spark.implicits._ val orderRDD=spark.sparkContext.textFile("F:\\JAVA\\WorkSpace\\spark\\src\\main\\resources\\order.data") val orderDF=orderRDD.map(_.split("\t")) .map(attributes=>Order(attributes(0),attributes(1),attributes(2),attributes(3),attributes(4))) .toDF() orderDF.show() Thread.sleep(1000000) } /** *总结:第二种方式就是通过最基础的DF接口方法,将 * @param spark */ def rdd2DFFunc2(spark:SparkSession): Unit ={ //TODO: 1.将RDD转换成Rows 2.按照第一步Rows的结构定义StructType 3.基于rows和StructType使用createDataFrame创建相应的DF val orderRDD=spark.sparkContext.textFile("F:\\JAVA\\WorkSpace\\spark\\src\\main\\resources\\order.data") //TODO: 1.将RDD转换成Rows val rowsRDD=orderRDD// .filter((str:String)=>{val arr=str.split("\t");val res=arr(1)!="小李";res}) .map(_.split("\t")) .map(attributes=>Row(attributes(0).trim,attributes(1),attributes(2),attributes(3).trim,attributes(4))) //TODO: 2.按照第一步Rows的结构定义StructTypeval schemaString="id|name|commodity|age|date" val fields=schemaString.split("\\|") .map(filedName=>StructField(filedName,StringType,nullable = true)) val schema=StructType(fields) //TODO: 3.基于rows和StructType使用createDataFrame创建相应的DF val orderDF= spark.createDataFrame(rowsRDD,schema) orderDF.show() orderDF.groupBy("name").count().show() orderDF.select("name","commodity").show() Thread.sleep(10000000) }}case class Order(id:String,name:String,commodity:String,age:String,date:String)生产中创建DataFrame代码举例
在实际生产环境中,我们其实选择的是方式二这种进行创建DataFrame的,因为我们生产中很难提前定义case class ,因为业务处理之后字段常常会发生意想不到的变化,所以一定要掌握这种方法。
测试数据
baidu CN A E [01/May/2018:02:15:52 +0800] 2 61.237.59.0 - 112.29.213.35:80 0 movieshow2000.edu.chinaren.com GET http://movieshow2000.edu.chinaren.com/user_upload/15316339776271455.mp4 HTTP/1.1 - bytes 13869056-13885439/25136186 TCP_HIT/206 112.29.213.35 video/mp4 16374 16384 -:0 0 0 - - - 11451601 - "JSP3/2.0.14" "-" "-" "-" http - 2 v1.go2yd.com 0.002 25136186 16384 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 1531818470104-11451601-112.29.213.66#2705261172 644514568baidu CN A E [01/May/2018:02:25:33 +0800] 2 61.232.37.228 - 112.29.213.35:80 0 github.com GET http://github.com/user_upload/15316339776271/44y.mp4 HTTP/1.1 - bytes 13869056-13885439/25136186 TCP_HIT/206 112.29.213.35 video/mp4 83552 16384 -:0 0 0 - - - 11451601 - "JSP3/2.0.14" "-" "-" "-" http - 2 v1.go2yd.com 0.002 25136186 16384 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 1531818470104-11451601-112.29.213.66#2705261172 644514568Schema方法类
import org.apache.spark.sql.Rowimport org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}object LogConverUtil { privateval struct=StructType( Array( StructField("domain",StringType) ,StructField("url",StringType) ,StructField("pv",LongType) ,StructField("traffic",LongType) ,StructField("date",StringType) ) ) def getStruct():StructType={ struct } def parseLog(logLine:String): Row ={ val sourceFormat=new SimpleDateFormat("[dd/MMM/yyyy:hh:mm:ss +0800]",Locale.ENGLISH) val targetFormat=new SimpleDateFormat("yyyyMMddhh") try{ val fields=logLine.split("\t") val domain=fields(10) val url=fields(12) val pv=1L val traffic=fields(19).trim.toLong val date=getFormatedDate(fields(4),sourceFormat,targetFormat) Row(domain,url,pv,traffic,date) }catch { case e:Exception=>Row(0) } } /** * * @param sourceDate Log中的未格式化日期 [01/May/2018:01:09:45 +0800] * @return 按照需求格式化字段 2018050101 */ def getFormatedDate(sourceDate: String, sourceFormat: SimpleDateFormat, targetFormat: SimpleDateFormat) = { val targetTime=targetFormat.format(sourceFormat.parse(sourceDate)) targetTime }}RDD2DataFrame主类
import org.apache.spark.sql.SparkSessionobject SparkCleanJob { def main(args: Array[String]): Unit = { val spark=SparkSession.builder() .master("local[2]") .appName("SparkCleanJob") .getOrCreate() val logRDD=spark.sparkContext.textFile("file:///D:/baidu.log")// logRDD.take(2).foreach(println(_)) //调用LogConverUtil里的parseLog方法和getStruct方法获得Rows对象和StructType对象 val logDF=spark.createDataFrame(logRDD.map(LogConverUtil.parseLog(_)),LogConverUtil.getStruct()) logDF.show(false) logDF.printSchema() }}结果
+------------------------------+-------------------------------------------------------------------------+---+-------+----------+|domain |url |pv |traffic|date |+------------------------------+-------------------------------------------------------------------------+---+-------+----------+|movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271455.mp4 |1 |16374 |2018050102||github.com |http://github.com/user_upload/15316339776271/44y.mp4 |1 |83552 |2018050102||yooku.com |http://yooku.com/user_upload/15316339776271x0.html |1 |74986 |2018050101||rw.uestc.edu.cn |http://rw.uestc.edu.cn/user_upload/15316339776271515.mp4 |1 |55297 |2018050101||github.com |http://github.com/user_upload/15316339776271x05.mp4 |1 |26812 |2018050102||movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271y4.html |1 |50392 |2018050103||github.com |http://github.com/user_upload/15316339776271x15.html |1 |40092 |2018050101||movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/153163397762714z.mp4 |1 |8368 |2018050102||movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271/5z.html |1 |29677 |2018050103||rw.uestc.edu.cn |http://rw.uestc.edu.cn/user_upload/153163397762710w.mp4 |1 |26124 |2018050102||yooku.com |http://yooku.com/user_upload/15316339776271yz.mp4 |1 |32219 |2018050101||yooku.com |http://yooku.com/user_upload/153163397762713w.html |1 |90389 |2018050101||movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271z/.html |1 |15623 |2018050101||yooku.com |http://yooku.com/user_upload/1531633977627142.html |1 |53453 |2018050103||yooku.com |http://yooku.com/user_upload/15316339776271230.mp4 |1 |20309 |2018050102||movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271/4w1.html|1 |87804 |2018050103||movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271y5y.html |1 |69469 |2018050103||yooku.com |http://yooku.com/user_upload/15316339776271011/.mp4 |1 |3782 |2018050103||github.com |http://github.com/user_upload/15316339776271wzw.mp4 |1 |89642 |2018050102||github.com |http://github.com/user_upload/15316339776271/1/.mp4 |1 |63551 |2018050103|+------------------------------+-------------------------------------------------------------------------+---+-------+----------+only showing top 20 rowsroot |-- domain: string (nullable = true) |-- url: string (nullable = true) |-- pv: long (nullable = true) |-- traffic: long (nullable = true) |-- date: string (nullable = true)Process finished with exit code 0注:除了这种使用RDD读取文本进而转化成DataFrame之外,我们也会使用自定义DefaultSource来直接将text转化成DataFrame
方法
方式
结构
接口
数据
类型
反射
生产
代码
字段
对象
情况
是在
格式
生产中
小李
小王
应用
测试
简便
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全构架
数据库服务器招标参数
网络安全仍需加强
湖南宁可互联网科技公司
幼儿园网络安全管理结构图
玉林市总工会网络技术部杨东
京东挂豆服务器搭建
中国平安综合金融互联网科技
服务器的主要作用和特点
svn 数据库实现
与艾尔登法环服务器断开连接
网页服务器被恶意攻击怎么办
c 服务器资源管理器
mc服务器管理隐身指令
清华大学软件开发教材
数据库文件备份的扩展名
我的世界服务器计分板文件在哪
高级数据库技术难吗
手机网络安全管控意见建议
tcp 服务器安全性
如何查邮箱的服务器地址
非你莫属最牛软件开发
摄像头网络安全事件
做软件开发的简称什么意思
银行软件开发环境
数据库迁移到数据盘
沧州人工智能软件开发
数据库讲座宣传稿怎么写
鼎捷erp软件开发
免费 数据库 空间