千家信息网

11.spark sql之RDD转换DataSet

发表于:2025-12-04 作者:千家信息网编辑
千家信息网最后更新 2025年12月04日,简介  Spark SQL提供了两种方式用于将RDD转换为Dataset。使用反射机制推断RDD的数据结构  当spark应用可以推断RDD数据结构时,可使用这种方式。这种基于反射的方法可以使代码更简
千家信息网最后更新 2025年12月04日11.spark sql之RDD转换DataSet

简介

  Spark SQL提供了两种方式用于将RDD转换为Dataset。

  • 使用反射机制推断RDD的数据结构

  当spark应用可以推断RDD数据结构时,可使用这种方式。这种基于反射的方法可以使代码更简洁有效。

  • 通过编程接口构造一个数据结构,然后映射到RDD上

  当spark应用无法推断RDD数据结构时,可使用这种方式。

反射方式

  • scala
// For implicit conversions from RDDs to DataFramesimport spark.implicits._// Create an RDD of Person objects from a text file, convert it to a Dataframeval peopleDF = spark.sparkContext  .textFile("examples/src/main/resources/people.txt")  .map(_.split(","))  .map(attributes => Person(attributes(0), attributes(1).trim.toInt))  .toDF()// Register the DataFrame as a temporary viewpeopleDF.createOrReplaceTempView("people")// SQL statements can be run by using the sql methods provided by Sparkval teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")// The columns of a row in the result can be accessed by field indexteenagersDF.map(teenager => "Name: " + teenager(0)).show()// +------------+// |       value|// +------------+// |Name: Justin|// +------------+// or by field nameteenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()// +------------+// |       value|// +------------+// |Name: Justin|// +------------+// No pre-defined encoders for Dataset[Map[K,V]], define explicitlyimplicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]// Primitive types and case classes can be also defined as// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()// Array(Map("name" -> "Justin", "age" -> 19))
  • java
import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.MapFunction;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.Encoder;import org.apache.spark.sql.Encoders;// Create an RDD of Person objects from a text fileJavaRDD peopleRDD = spark.read()  .textFile("examples/src/main/resources/people.txt")  .javaRDD()  .map(line -> {    String[] parts = line.split(",");    Person person = new Person();    person.setName(parts[0]);    person.setAge(Integer.parseInt(parts[1].trim()));    return person;  });// Apply a schema to an RDD of JavaBeans to get a DataFrameDataset peopleDF = spark.createDataFrame(peopleRDD, Person.class);// Register the DataFrame as a temporary viewpeopleDF.createOrReplaceTempView("people");// SQL statements can be run by using the sql methods provided by sparkDataset teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");// The columns of a row in the result can be accessed by field indexEncoder stringEncoder = Encoders.STRING();Dataset teenagerNamesByIndexDF = teenagersDF.map(    (MapFunction) row -> "Name: " + row.getString(0),    stringEncoder);teenagerNamesByIndexDF.show();// +------------+// |       value|// +------------+// |Name: Justin|// +------------+// or by field nameDataset teenagerNamesByFieldDF = teenagersDF.map(    (MapFunction) row -> "Name: " + row.getAs("name"),    stringEncoder);teenagerNamesByFieldDF.show();// +------------+// |       value|// +------------+// |Name: Justin|// +------------+
  • python
from pyspark.sql import Rowsc = spark.sparkContext# Load a text file and convert each line to a Row.lines = sc.textFile("examples/src/main/resources/people.txt")parts = lines.map(lambda l: l.split(","))people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))# Infer the schema, and register the DataFrame as a table.schemaPeople = spark.createDataFrame(people)schemaPeople.createOrReplaceTempView("people")# SQL can be run over DataFrames that have been registered as a table.teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")# The results of SQL queries are Dataframe objects.# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()for name in teenNames:    print(name)# Name: Justin

编程方式

  • scala
import org.apache.spark.sql.types._// Create an RDDval peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")// The schema is encoded in a stringval schemaString = "name age"// Generate the schema based on the string of schemaval fields = schemaString.split(" ")  .map(fieldName => StructField(fieldName, StringType, nullable = true))val schema = StructType(fields)// Convert records of the RDD (people) to Rowsval rowRDD = peopleRDD  .map(_.split(","))  .map(attributes => Row(attributes(0), attributes(1).trim))// Apply the schema to the RDDval peopleDF = spark.createDataFrame(rowRDD, schema)// Creates a temporary view using the DataFramepeopleDF.createOrReplaceTempView("people")// SQL can be run over a temporary view created using DataFramesval results = spark.sql("SELECT name FROM people")// The results of SQL queries are DataFrames and support all the normal RDD operations// The columns of a row in the result can be accessed by field index or by field nameresults.map(attributes => "Name: " + attributes(0)).show()// +-------------+// |        value|// +-------------+// |Name: Michael|// |   Name: Andy|// | Name: Justin|// +-------------+
  • java
import java.util.ArrayList;import java.util.List;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.function.Function;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;// Create an RDDJavaRDD peopleRDD = spark.sparkContext()  .textFile("examples/src/main/resources/people.txt", 1)  .toJavaRDD();// The schema is encoded in a stringString schemaString = "name age";// Generate the schema based on the string of schemaList fields = new ArrayList<>();for (String fieldName : schemaString.split(" ")) {  StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);  fields.add(field);}StructType schema = DataTypes.createStructType(fields);// Convert records of the RDD (people) to RowsJavaRDD rowRDD = peopleRDD.map((Function) record -> {  String[] attributes = record.split(",");  return RowFactory.create(attributes[0], attributes[1].trim());});// Apply the schema to the RDDDataset peopleDataFrame = spark.createDataFrame(rowRDD, schema);// Creates a temporary view using the DataFramepeopleDataFrame.createOrReplaceTempView("people");// SQL can be run over a temporary view created using DataFramesDataset results = spark.sql("SELECT name FROM people");// The results of SQL queries are DataFrames and support all the normal RDD operations// The columns of a row in the result can be accessed by field index or by field nameDataset namesDS = results.map(    (MapFunction) row -> "Name: " + row.getString(0),    Encoders.STRING());namesDS.show();// +-------------+// |        value|// +-------------+// |Name: Michael|// |   Name: Andy|// | Name: Justin|// +-------------+
  • python
# Import data typesfrom pyspark.sql.types import *sc = spark.sparkContext# Load a text file and convert each line to a Row.lines = sc.textFile("examples/src/main/resources/people.txt")parts = lines.map(lambda l: l.split(","))# Each line is converted to a tuple.people = parts.map(lambda p: (p[0], p[1].strip()))# The schema is encoded in a string.schemaString = "name age"fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]schema = StructType(fields)# Apply the schema to the RDD.schemaPeople = spark.createDataFrame(people, schema)# Creates a temporary view using the DataFrameschemaPeople.createOrReplaceTempView("people")# SQL can be run over DataFrames that have been registered as a table.results = spark.sql("SELECT name FROM people")results.show()# +-------+# |   name|# +-------+# |Michael|# |   Andy|# | Justin|# +-------+

忠于技术,热爱分享。欢迎关注公众号:java大数据编程,了解更多技术内容。

数据 方式 数据结构 结构 反射 推断 编程 技术 应用 有效 简洁 代码 公众 内容 可以使 接口 方法 更多 机制 简介 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 数据库中的映射转换 广东网络技术试题 xml和数据库使用效率差 青岛石化网络安全 浦东新区网络技术销售厂 数据库中如何统计缴费次数 手机游戏软件开发平台有哪些 数据库export命令用法 开发网页游戏需要服务器吗 思派数据库 易官网管打开数据库后图标变小 买服务器就相当于拉根网线吗 信息管理软件开发工具 厦门翔升软件开发 工业互联网平台概念股华工科技 软件开发用什么内存条 长春软件开发都选吉网传媒 如何访问学校机房数据库 荒野乱斗是用什么软件开发的 结构化面试软件开发 计算机网络技术属于理学吗 网易我的世界炮爷出击的服务器 手机游戏软件开发平台有哪些 魔兽台服 服务器 IBM服务器改不了IP地址 东莞通讯软件开发供应商 网络安全概论刘建伟课程 华材计算机网络技术分数 安防领域网络安全防护主流 图神经网络技术路线图
0