Flink Connectors怎么连接MySql
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这篇文章主要讲解了"Flink Connectors怎么连接MySql",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flink Connectors怎
千家信息网最后更新 2025年12月02日Flink Connectors怎么连接MySql
这篇文章主要讲解了"Flink Connectors怎么连接MySql",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flink Connectors怎么连接MySql"吧!
通过使用Flink DataStream Connectors 数据流连接器连接到Mysql数据源,并基于JDBC提供数据流输入与输出操作
示例环境
java.version: 1.8.xflink.version: 1.11.1mysql:5.7.x
数据流输入
DataStreamSource.java
package com.flink.examples.mysql;import com.flink.examples.TUser;import com.google.gson.Gson;import org.apache.flink.api.java.io.jdbc.JDBCOptions;import org.apache.flink.api.java.io.jdbc.JDBCTableSource;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableSchema;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/** * @Description 将mysql表中数据查询输出到DataStream流中 */public class DataStreamSource { /** * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/jdbc.html */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //查询sql String sql = "SELECT id,name,age,sex,address,createTimeSeries FROM t_user"; //设置表视图字段与类型 TableSchema tableSchema = TableSchema.builder() .field("id", DataTypes.INT()) .field("name", DataTypes.STRING()) .field("age", DataTypes.INT()) .field("sex", DataTypes.INT()) .field("address", DataTypes.STRING()) //.field("createTime", DataTypes.TIMESTAMP()) .field("createTimeSeries", DataTypes.BIGINT()) .build(); //配置jdbc数据源选项 JDBCOptions jdbcOptions = JDBCOptions.builder() .setDriverName(MysqlConfig.DRIVER_CLASS) .setDBUrl(MysqlConfig.SOURCE_DRIVER_URL) .setUsername(MysqlConfig.SOURCE_USER) .setPassword(MysqlConfig.SOURCE_PASSWORD) .setTableName("t_user") .build(); JDBCTableSource jdbcTableSource = JDBCTableSource.builder().setOptions(jdbcOptions).setSchema(tableSchema).build(); //将数据源注册到tableEnv视图student中 tEnv.registerTableSource("t_user", jdbcTableSource); Table table = tEnv.sqlQuery(sql); DataStream sourceStream = tEnv.toAppendStream(table, TUser.class); sourceStream.map((t)->new Gson().toJson(t)).print(); env.execute("flink mysql source"); }} 数据流输出
DataStreamSink.java
package com.flink.examples.mysql;import com.flink.examples.TUser;import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableSchema;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.sinks.TableSink;import static org.apache.flink.table.api.Expressions.$;/** * @Description 将DataStream数据流插入到mysql表中 */public class DataStreamSink { /** * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/jdbc.html */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(2000); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //查询sql String sql = "insert into t_user (id,name,age,sex,address,createTimeSeries) values (?,?,?,?,?,?)"; //封装数据 TUser user = new TUser(); user.setId(0); user.setName("zhao1"); user.setAge(22); user.setSex(1); user.setAddress("CN"); user.setCreateTimeSeries(System.currentTimeMillis()); DataStream sourceStream = env.fromElements(user); //从DataStream获取数据// Expression id = ExpressionParser.parse_Expression("id");// Expression name = ExpressionParser.parse_Expression("name");// Expression age = ExpressionParser.parse_Expression("age");// Expression sex = ExpressionParser.parse_Expression("sex");// Expression address = ExpressionParser.parse_Expression("address");// Expression createTimeSeries = ExpressionParser.parse_Expression("createTimeSeries");// Table table = tEnv.fromDataStream(sourceStream, id, name, age, sex, address, createTimeSeries ); Table table = tEnv.fromDataStream(sourceStream,$("id"),$("name"),$("age"),$("sex"),$("address"),$("createTimeSeries")); //输出到mysql //设置表视图字段与类型 TableSchema tableSchema = TableSchema.builder() .field("id", DataTypes.INT()) .field("name", DataTypes.STRING()) .field("age", DataTypes.INT()) .field("sex", DataTypes.INT()) .field("address", DataTypes.STRING()) //.field("createTime", DataTypes.TIMESTAMP()) .field("createTimeSeries", DataTypes.BIGINT()) .build(); //设置sink输出jdbc TableSink tableSink = JDBCAppendTableSink.builder() .setDrivername(MysqlConfig.DRIVER_CLASS) .setDBUrl(MysqlConfig.SOURCE_DRIVER_URL) .setUsername(MysqlConfig.SOURCE_USER) .setPassword(MysqlConfig.SOURCE_PASSWORD) .setQuery(sql) .setParameterTypes(tableSchema.getFieldTypes()) .setBatchSize(100) .build(); //将数据源注册到tableEnv视图result中 tEnv.registerTableSink("result", tableSchema.getFieldNames(), tableSchema.getFieldTypes(), tableSink); //在指定的路径下注册,然后执行插入操作 table.executeInsert("result"); }} 数据源配置类
MysqlConfig.java
package com.flink.examples.mysql;/** * @Description Mysql数据库连接配置 */public class MysqlConfig { public final static String DRIVER_CLASS="com.mysql.jdbc.Driver"; public final static String SOURCE_DRIVER_URL="jdbc:mysql://127.0.0.1:3306/flink?useUnicode=true&characterEncoding=utf-8&useSSL=false"; public final static String SOURCE_USER="root"; public final static String SOURCE_PASSWORD="root";}数据展示
感谢各位的阅读,以上就是"Flink Connectors怎么连接MySql"的内容了,经过本文的学习后,相信大家对Flink Connectors怎么连接MySql这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
数据
数据流
数据源
输出
视图
学习
查询
配置
内容
字段
官方
文档
类型
输入
就是
思路
情况
数据库
数据查询
文章
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
合巢柘皋产业新城空间数据库
收款 企业网络安全 数字证书
青少年网络安全ppt
我的世界服务器不安全账号
邮政手持软件开发
将图片上传到服务器
简述数据库受著作权保护的条件
dell服务器散热器拆解
中外数据库
佛山市六意互联网科技有限公司
永诚恒互联网科技有限公司
信息网络安全工程师证书
新乡市润泽网络技术有限公司
信息网络安全在多网合一
a7m3 影像数据库错误
安卓软件开发培训费用
贩卖数据库犯法吗
廊坊专题护苗网络安全系列课堂
网络技术论文报告
华夏物联网络技术有限公司
成都管理软件开发费用多少
qq炫舞唱歌断开服务器
jso数据库增删改查
数据库查阅所有记录
网络安全工作会议讲话稿
数据库中R7_SDE
北京 数据库恢复
电脑怎么弄一个数据库
计算机网络技术苏州订单班
广州艾飞软件开发