RDS与POLARDB归档到X-Pack Spark计算的方法
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,本篇内容介绍了"RDS与POLARDB归档到X-Pack Spark计算的方法"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家
千家信息网最后更新 2025年12月01日RDS与POLARDB归档到X-Pack Spark计算的方法
本篇内容介绍了"RDS与POLARDB归档到X-Pack Spark计算的方法"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
X-Pack Spark服务通过外部计算资源的方式,为Redis、Cassandra、MongoDB、HBase、RDS存储服务提供复杂分析、流式处理及入库、机器学习的能力,从而更好的解决用户数据处理相关场景问题。
RDS & POLARDB分表归档到X-Pack Spark步骤
一键关联POLARDB到Spark集群
POLARDB表存储
在database 'test1'中每5分钟生成一张表,这里假设为表 'test1'、'test2'、'test2'、...
具体的建表语句如下:
*请左右滑动阅览
CREATE TABLE `test1` ( `a` int(11) NOT NULL, `b` time DEFAULT NULL, `c` double DEFAULT NULL, PRIMARY KEY (`a`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
归档到Spark的调试
x-pack spark提供交互式查询模式支持直接在控制台提交sql、python脚本、scala code来调试。
1、首先创建一个交互式查询的session,在其中添加mysql-connector的jar包。
2、创建交互式查询
以pyspark为例,下面是具体归档demo的代码:
*请左右滑动阅览
spark.sql("drop table sparktest").show()# 创建一张spark表,三级分区,分别是天、小时、分钟,最后一级分钟用来存储具体的5分钟的一张polardb表达的数据。字段和polardb里面的类型一致spark.sql("CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string) " "USING parquet PARTITIONED BY (dt ,hh ,mm )").show()#本例子在polardb里面创建了databse test1,具有三张表test1 ,test2,test3,这里遍历这三张表,每个表存储spark的一个5min的分区# CREATE TABLE `test1` (# `a` int(11) NOT NULL,# `b` time DEFAULT NULL,# `c` double DEFAULT NULL,# PRIMARY KEY (`a`)# ) ENGINE=InnoDB DEFAULT CHARSET=utf8for num in range(1, 4): #构造polardb的表名 dbtable = "test1." + "test" + str(num) #spark外表关联polardb对应的表 externalPolarDBTableNow = spark.read \ .format("jdbc") \ .option("driver", "com.mysql.jdbc.Driver") \ .option("url", "jdbc:mysql://pc-xxx.mysql.polardb.rds.aliyuncs.com:3306") \ .option("dbtable", dbtable) \ .option("user", "name") \ .option("password", "xxx*") \ .load().registerTempTable("polardbTableTemp") #生成本次polardb表数据要写入的spark表的分区信息 (dtValue, hhValue, mmValue) = ("20191015", "13", str(05 * num)) #执行导数据sql spark.sql("insert into sparktest partition(dt= %s ,hh= %s , mm=%s ) " "select * from polardbTableTemp " % (dtValue, hhValue, mmValue)).show() #删除临时的spark映射polardb表的catalog spark.catalog.dropTempView("polardbTableTemp") #查看下分区以及统计下数据,主要用来做测试验证,实际运行过程可以删除 spark.sql("show partitions sparktest").show(1000, False) spark.sql("select count(*) from sparktest").show()归档作业上生产
交互式查询定位为临时查询及调试,生产的作业还是建议使用spark作业的方式运行,使用文档参考。这里以pyspark作业为例:
/polardb/polardbArchiving.py 内容如下:
*请左右滑动阅览
# -*- coding: UTF-8 -*-from __future__ import print_functionimport sysfrom operator import addfrom pyspark.sql import SparkSessionif __name__ == "__main__": spark = SparkSession \ .builder \ .appName("PolardbArchiving") \ .enableHiveSupport() \ .getOrCreate() spark.sql("drop table sparktest").show() # 创建一张spark表,三级分区,分别是天、小时、分钟,最后一级分钟用来存储具体的5分钟的一张polardb表达的数据。字段和polardb里面的类型一致 spark.sql("CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string) " "USING parquet PARTITIONED BY (dt ,hh ,mm )").show() #本例子在polardb里面创建了databse test1,具有三张表test1 ,test2,test3,这里遍历这三张表,每个表存储spark的一个5min的分区 # CREATE TABLE `test1` ( # `a` int(11) NOT NULL, # `b` time DEFAULT NULL, # `c` double DEFAULT NULL, # PRIMARY KEY (`a`) # ) ENGINE=InnoDB DEFAULT CHARSET=utf8 for num in range(1, 4): #构造polardb的表名 dbtable = "test1.">"RDS与POLARDB归档到X-Pack Spark计算的方法"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
数据
存储
查询
交互式
作业
内容
方法
一致
例子
字段
实际
小时
方式
更多
知识
类型
过程
建一
关联
处理
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全中常见的攻击方式
软件开发合同时间分析
数据库实体关系识别
中国网络技术有限公司排行
web服务器应用研究毕业设计
网络安全协议的核心功能
低压电源检测软件开发
顺义区专业性软件开发优势
足球小将数据库2008
全局数据库名到哪找
软件开发包括什么专业
阿里云的git服务器
奉贤区提供软件开发服务要求
燕山大学web数据库
北交 网络安全
网络技术开发价目表
网络安全板块儿
软件开发培训学校汽车之家
服务器 pci插槽
奉贤区网络技术服务有哪些
服务器和网络哪个安全
沭阳巨型网络技术价格查询
西双版纳回收服务器
app软件开发公司
南沙网络安全建设怎么收费
新浪nba手机网数据库
软件开发售后服务一般几年
网络安全审查员证书要求
如何宣传网络安全教育平台
武汉微信软件开发