如何使用Tbale SQL与Flink JDBC连接器读取MYSQL数据
发表于:2025-12-04 作者:千家信息网编辑
千家信息网最后更新 2025年12月04日,这篇文章主要讲解了"如何使用Tbale SQL与Flink JDBC连接器读取MYSQL数据",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"如何使用Tb
千家信息网最后更新 2025年12月04日如何使用Tbale SQL与Flink JDBC连接器读取MYSQL数据
这篇文章主要讲解了"如何使用Tbale SQL与Flink JDBC连接器读取MYSQL数据",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"如何使用Tbale SQL与Flink JDBC连接器读取MYSQL数据"吧!
使用Tbale&SQL与Flink JDBC连接器读取MYSQL数据,并用GROUP BY语句根据一个或多个列对结果集进行分组。
示例环境
java.version: 1.8.xflink.version: 1.11.1kafka:2.11
GroupToMysql.java
package com.flink.examples.mysql;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;import org.apache.flink.util.Collector;import static org.apache.flink.table.api.Expressions.$;/** * @Description 使用Tbale&SQL与Flink JDBC连接器读取MYSQL数据,并用GROUP BY语句根据一个或多个列对结果集进行分组。 */public class GroupToMysql { /** 官方参考:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html 分区扫描 为了加速并行Source任务实例中的数据读取,Flink为JDBC表提供了分区扫描功能。 scan.partition.column:用于对输入进行分区的列名。 scan.partition.num:分区数。 scan.partition.lower-bound:第一个分区的最小值。 scan.partition.upper-bound:最后一个分区的最大值。 */ //flink-jdbc-1.11.1写法,所有属性名在JdbcTableSourceSinkFactory工厂类中定义 static String table_sql = "CREATE TABLE my_users (\n" + " id BIGINT,\n" + " name STRING,\n" + " age INT,\n" + " status INT,\n" + " PRIMARY KEY (id) NOT ENFORCED\n" + ") WITH (\n" + " 'connector.type' = 'jdbc',\n" + " 'connector.url' = 'jdbc:mysql://192.168.110.35:3306/flink?useUnicode=true&characterEncoding=utf-8', \n" + " 'connector.driver' = 'com.mysql.jdbc.Driver', \n" + " 'connector.table' = 'users', \n" + " 'connector.username' = 'root',\n" + " 'connector.password' = 'password' \n" +// " 'connector.read.fetch-size' = '10' \n" + ")"; public static void main(String[] args) throws Exception { //构建StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置setParallelism并行度 env.setParallelism(1); //构建EnvironmentSettings 并指定Blink Planner EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //构建StreamTableEnvironment StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings); //注册mysql数据维表 tEnv.executeSql(table_sql); //Table table = avg(tEnv); //Table table = count(tEnv); //Table table = min(tEnv); Table table = max(tEnv); //打印字段结构 table.printSchema(); //普通查询操作用toAppendStream //tEnv.toAppendStream(table, Row.class).print(); //group操作用toRetractStream //tEnv.toRetractStream(table, Row.class).print(); //table 转成 dataStream 流,Tuple2第一个参数flag是true表示add添加新的记录流,false表示retract表示旧的记录流 DataStream> behaviorStream = tEnv.toRetractStream(table, Row.class); behaviorStream.flatMap(new FlatMapFunction, Object>() { @Override public void flatMap(Tuple2 value, Collector 建表SQL
CREATE TABLE `users` ( `id` bigint(8) NOT NULL AUTO_INCREMENT, `name` varchar(40) DEFAULT NULL, `age` int(8) DEFAULT NULL, `status` tinyint(2) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
打印结果
root |-- status: INT |-- age1: INT0,160,181,211,282,31
感谢各位的阅读,以上就是"如何使用Tbale SQL与Flink JDBC连接器读取MYSQL数据"的内容了,经过本文的学习后,相信大家对如何使用Tbale SQL与Flink JDBC连接器读取MYSQL数据这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
数据
语句
连接器
数据流
方法
分组
结果
学习
最大
最小
之和
内容
多个
数值
最大值
普通
任务
写法
功能
参数
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
关于幼儿园网络安全讲话
sq2008数据库下载
常熟智能服务器客户至上
学校怎么核查饭卡数据库
大脚插件进不了燃烧远征服务器
新能源软件开发售后服务
更换根服务器
网络技术专业就业
什么是通信的客户端和服务器端
灵寿信息化软件开发专业服务
北京前端软件开发价钱
未来网络安全的职位
刀塔自走棋服务器选择上海没信号
红颜瑟网络技术
cs1.6怎么进入正常版服务器
国外dns 服务器
服务器如何防止误触关机
软件开发规范php
网络安全与黑客攻防宝典》
地理数据库和arcgis区别
路由器远程服务器没声音
高级数据库技术与应用电子版
2020网络安全主题教育
电脑租一个mc服务器多少钱
服务器硬盘监控
外交部网络安全事件白皮书
近年来出现的网络安全问题
软件开发人员干嘛的
orcle数据库被锁
网易云课堂计算机网络技术