sparl sql有哪些
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这篇文章给大家分享的是有关sparl sql有哪些的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。1、读取json格式的文件创建DataFramejava (spark1.6
千家信息网最后更新 2025年12月02日sparl sql有哪些
这篇文章给大家分享的是有关sparl sql有哪些的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
1、读取json格式的文件创建DataFrame
java (spark1.6)
public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("javaSpark01"); SparkContext sc = new SparkContext(conf); SQLContext sqlContext = new SQLContext(sc);// Dataset df = sqlContext.read().format("json").load("G:/idea/scala/spark02/json"); Dataset df2 = sqlContext.read().json("G:/idea/scala/spark02/json"); df2.show(); //树形的形式显示schema信息 df2.printSchema(); //注册临时表 将DataFrame注册成临时的一张表,这张表临时注册到内存中,是逻辑上的表,不会雾化到磁盘 df2.registerTempTable("baidukt_table"); Dataset sql = sqlContext.sql("select * from baidukt_table"); sql.show(); Dataset sql1 = sqlContext.sql("select age,count(1) from baidukt_table group by age"); sql1.show(); }
scala(spark 1.6)
def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local").setAppName("Spark08 1.6") val sc = new SparkContext(conf) val sqlContext: SQLContext = new SQLContext(sc) val df = sqlContext.read.format("json").load("G:/idea/scala/spark02/json")// val df1 = sqlContext.read.json("G:/idea/scala/spark02/json") //显示前50行数据 df.show(50) //树形的形式显示schema信息 df.printSchema() //注册临时表 df.registerTempTable("baidukt_com_table") val result = sqlContext.sql("select age,count(1) from baidukt_com_table group by age") result.show() val result1 = sqlContext.sql("select * from baidukt_com_table") result1.show() sc.stop() }java (spark 2.0++)
public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("Spark 2.0 ++"); SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); Dataset df = spark.read().json("G:/idea/scala/spark02/json");// Dataset df1 = spark.read().format("json").load("G:/idea/scala/spark02/json"); df.show(); df.printSchema(); df.createOrReplaceGlobalTempView("baidu_com_spark2"); Dataset resut = spark.sql("select * from baidu_com_spark2"); resut.show(); spark.stop(); }
scala(spark 2.0++)
def main(args: Array[String]): Unit = { //用户的当前工作目录// val location = System.setProperties("user.dir","spark_2.0" val conf = new SparkConf().setAppName("Spark08 2.0++").setMaster("local[3]") val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() //数据导入方式 val df: DataFrame = spark.read.json("G:/idea/scala/spark02/json")// val df1: DataFrame = spark.read.format("json").load("G:/idea/scala/spark02/json") //查看表 df.show() //查看表 df.printSchema() //直接使用spark SQL进行查询 //先注册为临时表 //createOrReplaceTempView:创建临时视图,此视图的生命周期与用于创建此数据集的[SparkSession]相关联。 //createGlobalTempView:创建全局临时视图,此时图的生命周期与Spark Application绑定。 df.createOrReplaceTempView("people") val result: DataFrame = spark.sql("select * from people") result.show() spark.stop() }2、通过json格式的RDD创建DataFrame
java(spark 1.6)
public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("jsonRDD"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); JavaRDD data = sc.parallelize(Arrays.asList( "{\"name\":\"zhangsan\",\"score\":\"100\"}", "{\"name\":\"lisi\",\"score\":\"200\"}", "{\"name\":\"wangwu\",\"score\":\"300\"}" )); Dataset df = sqlContext.read().json(data); df.show(); df.printSchema(); df.createOrReplaceTempView("baidu_com_spark2"); Dataset resut = sqlContext.sql("select * from baidu_com_spark2"); resut.show(); sc.stop(); }
scala(spark 1.6)
def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("spark10") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val data: RDD[String] = sc.parallelize(Array( "{\"name\":\"zhangsan\",\"age\":18}", "{\"name\":\"lisi\",\"age\":19}", "{\"name\":\"wangwu\",\"age\":20}" )) val df = sqlContext.read.json(data) df.show() df.printSchema() df.createOrReplaceTempView("baidukt_com_spark1.6") val result = sqlContext.sql("select * from baidukt_com_spark1.6") result.show() result.printSchema() sc.stop() }3、非json格式的RDD创建DataFrame
3.1 通过反射的方式将非json格式的RDD转换成DataFrame(不推荐,所以不复制代码过来了)
3.2、态创建Schema将非json格式的RDD转换成DataFrame
4、读取parquet文件创建DataFrame(多次io 不推荐)
5、读取JDBC中的数据创建DataFrame(MySql为例)
java(spark 1.6)
public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("mysql"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); /** * 第一种方式读取MySql数据库表,加载为DataFrame */ Map options = new HashMap(); options.put("url", "jdbc:mysql://localhost:3306/spark");//连接地址和数据库名称 options.put("driver", "com.mysql.jdbc.Driver");//驱动 options.put("user", "root");//用户名 options.put("password", "admin");//密码 options.put("dbtable", "person");//表 Dataset person = sqlContext.read().format("jdbc").options(options).load(); person.show(); //注册临时表 person.registerTempTable("person"); /** * 第二种方式读取MySql数据表加载为DataFrame */ DataFrameReader reader = sqlContext.read().format("jdbc"); reader.option("url", "jdbc:mysql://localhost:3306/spark"); reader.option("driver", "com.mysql.jdbc.Driver"); reader.option("user", "root"); reader.option("password", "admin"); reader.option("dbtable", "score"); Dataset score = reader.load(); score.show(); score.registerTempTable("score"); Dataset result = sqlContext.sql("select person.id,person.name,score.score from person,score where person.name = score.name"); result.show(); /** * 将DataFrame结果保存到Mysql中 */ Properties properties = new Properties(); properties.setProperty("user", "root"); properties.setProperty("password", "admin"); result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://localhost:3306/spark", "result", properties); sc.stop(); }
scala (spark 1.6)
def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local").setAppName("mysql") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) /** * 第一种方式读取Mysql数据库表创建DF */ val options = new mutable.HashMap[String,String](); options.put("url", "jdbc:mysql://localhost:3306/spark") options.put("driver","com.mysql.jdbc.Driver") options.put("user","root") options.put("password", "admin") options.put("dbtable","person") val person = sqlContext.read.format("jdbc").options(options).load() person.show() person.registerTempTable("person") /** * 第二种方式读取Mysql数据库表创建DF */ val reader = sqlContext.read.format("jdbc") reader.option("url", "jdbc:mysql://localhost:3306/spark") reader.option("driver","com.mysql.jdbc.Driver") reader.option("user","root") reader.option("password","admin") reader.option("dbtable", "score") val score = reader.load() score.show() score.registerTempTable("score") val result = sqlContext.sql("select person.id,person.name,score.score from person,score where person.name = score.name") result.show() /** * 将数据写入到Mysql表中 */ val properties = new Properties() properties.setProperty("user", "root") properties.setProperty("password", "admin") result.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/spark", "result", properties) sc.stop() }6、读取Hive中的数据加载成DataFrame
HiveContext是SQLContext的子类,连接Hive建议使用HiveContext。
java(spark 1.6)
public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName("hive"); JavaSparkContext sc = new JavaSparkContext(conf); //HiveContext是SQLContext的子类。 HiveContext hiveContext = new HiveContext(sc); hiveContext.sql("USE spark"); hiveContext.sql("DROP TABLE IF EXISTS student_infos"); //在hive中创建student_infos表 hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING,age INT) row format delimited fields terminated by '\t' "); hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos"); hiveContext.sql("DROP TABLE IF EXISTS student_scores"); hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT) row format delimited fields terminated by '\t'"); hiveContext.sql("LOAD DATA LOCAL INPATH '/root/test/student_scores INTO TABLE student_scores"); /** * 查询表生成DataFrame */ Dataset goodStudentsDF = hiveContext.sql("SELECT si.name, si.age, ss.score FROM student_infos si JOIN student_scores ss ON si.name=ss.name WHERE ss.score>=80"); hiveContext.sql("DROP TABLE IF EXISTS good_student_infos"); goodStudentsDF.registerTempTable("goodstudent"); Dataset result = hiveContext.sql("select * from goodstudent"); result.show(); result.printSchema(); /** * 将结果保存到hive表 good_student_infos */ goodStudentsDF.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos"); Row[] goodStudentRows = hiveContext.table("good_student_infos").collect(); for(Row goodStudentRow : goodStudentRows) { System.out.println(goodStudentRow); } sc.stop(); }
scala(spark 1.6)
def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("HiveSource") val sc = new SparkContext(conf) /** * HiveContext是SQLContext的子类。 */ val hiveContext = new HiveContext(sc) hiveContext.sql("use spark") hiveContext.sql("drop table if exists student_infos") hiveContext.sql("create table if not exists student_infos (name string,age int) row format delimited fields terminated by '\t'") hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos") hiveContext.sql("drop table if exists student_scores") hiveContext.sql("create table if not exists student_scores (name string,score int) row format delimited fields terminated by '\t'") hiveContext.sql("load data local inpath '/root/test/student_scores' into table student_scores") val df = hiveContext.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name") hiveContext.sql("drop table if exists good_student_infos") /** * 将结果写入到hive表中 */ df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos") sc.stop() }7、自定义udf
scala(spark 1.6)
def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("spark13") val spark = SparkSession.builder().config(conf).getOrCreate() //rdd转df val rdd: RDD[String] = spark.sparkContext.parallelize(Array("zhangsan","wangwu","masi")) val rowRDD: RDD[Row] = rdd.map(RowFactory.create(_)) val schema = DataTypes.createStructType(Array(StructField("name",StringType,true))) val df: DataFrame = spark.sqlContext.createDataFrame(rowRDD,schema) df.show(50) df.printSchema() df.createOrReplaceTempView("test") //自定义udf函数 函数名为StrLen,参数为String、Int String有问题,网上说需要java.lang.String类型 // spark.sqlContext.udf.register("StrLen",(s:String,i:Int)=>{s.length+i})// spark.sqlContext.udf.register("StrLen",(i:Int)=>{i})// spark.sql("select name ,StrLen(name,10) as length from test").show(20) spark.sql("select name ,StrLen(10) as length from test").show(20) }java (spark 1.6)
public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("udf"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); JavaRDD parallelize = sc.parallelize(Arrays.asList("zhansan","lisi","wangwu")); JavaRDD rowRDD = parallelize.map(new Function() { private static final long serialVersionUID = 1L; @Override public Row call(String s) throws Exception { return RowFactory.create(s); } }); List fields = new ArrayList(); fields.add(DataTypes.createStructField("name", DataTypes.StringType,true)); StructType schema = DataTypes.createStructType(fields); Dataset df = sqlContext.createDataFrame(rowRDD, schema); df.registerTempTable("user"); /** * 根据UDF函数参数的个数来决定是实现哪一个UDF UDF1,UDF2。。。。UDF1xxx */ sqlContext.udf().register("StrLen", new UDF1() { private static final long serialVersionUID = 1L; @Override public Integer call(String t1) throws Exception { return t1.length(); } }, DataTypes.IntegerType); sqlContext.sql("select name ,StrLen(name) as length from user").show(); sc.stop(); }
感谢各位的阅读!关于"sparl sql有哪些"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!
数据
方式
格式
数据库
函数
子类
结果
视图
信息
内容
参数
周期
形式
文件
更多
树形
生命
用户
篇文章
推荐
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
徐州江苏高性能服务器规格
web服务器会话管理功能
服务器管理ip如何设置
c 动态查询数据库连接
UNIX软件开发工程师招聘
长宁区专业网络技术检修
恩牛网络技术有限公司销售
商品交易软件开发
空间数据库 选择题
vb读取mysql数据库
佳星网络技术有限公司
什么叫idc服务器
中国人网络安全故事
新建查询创建数据库
世界互联网大会黑科技无线电
高中网络技术考试重要知识点
什么是服务器异步测试
河南有哪些网络技术服务
csgo新加坡服务器
软件开发应届本科毕业生工资
c http 服务器
服务器风扇是多少分贝
东莞软件开发驻场收费标准
警察网络安全管理培训课
手机网页服务器
aliyun的发件服务器设置
未来5年网络安全市场
数据库跳跃式扫描
淮南软件视频系统服务器
服务器下载速度慢上传速度快