spark2.x由浅入深深到底系列六之RDD java api用JdbcRDD读取关系型数据库
发表于:2025-12-04 作者:千家信息网编辑
千家信息网最后更新 2025年12月04日,学习任何的spark技术之前,请先正确理解spark,可以参考:正确理解spark以下是用spark RDD java api实现从关系型数据库中读取数据,这里使用的是derby本地数据库,当然可以是
千家信息网最后更新 2025年12月04日spark2.x由浅入深深到底系列六之RDD java api用JdbcRDD读取关系型数据库
学习任何的spark技术之前,请先正确理解spark,可以参考:正确理解spark
以下是用spark RDD java api实现从关系型数据库中读取数据,这里使用的是derby本地数据库,当然可以是mysql或者oracle等关系型数据库:
package com.twq.javaapi.java7;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.rdd.JdbcRDD;import java.io.Serializable;import java.sql.*;public class JavaJdbcRDDSuite implements Serializable { public static void prepareData() throws ClassNotFoundException, SQLException { //使用本地数据库derby,当然可以使用mysql等关系型数据库 Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); Connection connection = DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;create=true"); try { //创建一张表FOO,ID是一个自增的主键,DATA是一个INTEGER列 Statement create = connection.createStatement(); create.execute( "CREATE TABLE FOO(" + "ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," + "DATA INTEGER)"); create.close(); //插入数据 PreparedStatement insert = connection.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)"); for (int i = 1; i <= 5; i++) { insert.setInt(1, i * 2); insert.executeUpdate(); } insert.close(); } catch (SQLException e) { // If table doesn't exist... if (e.getSQLState().compareTo("X0Y32") != 0) { throw e; } } finally { connection.close(); } } public static void shutdownDB() throws SQLException { try { DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;shutdown=true"); } catch (SQLException e) { // Throw if not normal single database shutdown // https://db.apache.org/derby/docs/10.2/ref/rrefexcept71493.html if (e.getSQLState().compareTo("08006") != 0) { throw e; } } } public static void main(String[] args) throws Exception { JavaSparkContext sc = new JavaSparkContext("local", "JavaAPISuite"); //准备数据 prepareData(); //构建JdbcRDD JavaRDD rdd = JdbcRDD.create( sc, new JdbcRDD.ConnectionFactory() { @Override public Connection getConnection() throws SQLException { return DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb"); } }, "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?", 1, 5, 1, new Function() { @Override public Integer call(ResultSet r) throws Exception { return r.getInt(1); } } ); //结果: [2, 4, 6, 8, 10] System.out.println(rdd.collect()); shutdownDB(); sc.stop(); }} 详细了解RDD的api的话,可以参考: spark core RDD api原理详解
数据
数据库
参考
原理
技术
结果
建一
准备
学习
由浅入深
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库如何查找隐藏表
服务器类型英语
网络安全类型none是什么意思
关于网络安全胡乱追星的手抄报
网络技术指导合同范本
如果取消网络安全证书
怎么看路由器服务器时间
长沙酒店软件开发多少钱
我的世界末日服务器
软件开发被分为哪几个阶段
服务器与硬盘连接线
怎么用向导创建数据库
软件开发紧缺人才
云数据库怎么管理软件
搞网络安全的叫什么小宏的人
株洲软件开发培训哪家最好
成都中国网络安全论坛
极限生存手机版服务器
气象局加强网络安全和数据管理
阳江电信服务器
网络技术做实验的软件叫什么
服务器空间无法启动
软件开发致富
网络安全系统组建
迅雷无法连接服务器
数据库的安全目不包括
古地图数据库
深圳快享互联网科技有限公司
锐捷网络安全产品事业部
relay非数据管理服务器