Flink批处理之读写Mysql
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,1、添加Maven坐标 mysql mysql-connector-java 5.1.48 org.apache.flink flin
千家信息网最后更新 2025年12月02日Flink批处理之读写Mysql
1、添加Maven坐标
mysql mysql-connector-java 5.1.48 org.apache.flink flink-jdbc_2.12 1.8.0 2、建表
CREATE TABLE `temp` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, `time` varchar(255) DEFAULT NULL, `type` bigint(20) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf83、 Show Code
package com.fwmagic.flink.batch;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.typeinfo.BasicTypeInfo;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;import org.apache.flink.api.java.operators.DataSource;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.api.java.typeutils.RowTypeInfo;import org.apache.flink.types.Row;import java.util.concurrent.TimeUnit;public class BatchDemoOperatorMysql { public static void main(String[] args) throws Exception { String driverClass = "com.mysql.jdbc.Driver"; String dbUrl = "jdbc:mysql://localhost:3306/test"; String userNmae = "root"; String passWord = "123456"; String sql = "insert into test.temp (name,time,type) values (?,?,?)"; ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); /** * 文件内容: * 关羽,2019-10-14 00:00:01,1 * 张飞,2019-10-14 00:00:02,2 * 赵云,2019-10-14 00:00:03,3 */ String filePath = "/Users/temp/data.csv"; //读csv文件内容,转成Row对象 DataSet outputData = env.readCsvFile(filePath).fieldDelimiter(",").types(String.class, String.class, Long.class).map(new MapFunction, Row>() { @Override public Row map(Tuple3 t) throws Exception { Row row = new Row(3); row.setField(0, t.f0.getBytes("UTF-8")); row.setField(1, t.f1.getBytes("UTF-8")); row.setField(2, t.f2.longValue()); return row; } }); //将Row对象写到mysql outputData.output(JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername(driverClass) .setDBUrl(dbUrl) .setUsername(userNmae) .setPassword(passWord) .setQuery(sql) .finish()); //触发执行 env.execute("insert data to mysql"); System.out.println("mysql写入成功!"); TimeUnit.SECONDS.sleep(6); //读mysql DataSource dataSource = env.createInput(JDBCInputFormat.buildJDBCInputFormat() .setDrivername(driverClass) .setDBUrl(dbUrl) .setUsername(userNmae) .setPassword(passWord) .setQuery("select * from temp") .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO)) .finish()); //获取数据并打印 dataSource.map(new MapFunction() { @Override public String map(Row value) throws Exception { System.out.println(value); return value.toString(); } }).print(); }}
4、注意事项
- 数据写入mysql的DataSet泛型要求是row,需要转换;
- 数据读取的结果也是row类型,不能直接print,需要转换;
- 数据写入后一定要加上env.execute(),触发任务执行;
- 涉及到中文的,需要转换成UTF-8,不然数据库中会出现乱码。
数据
UTF-8
内容
对象
文件
成功
乱码
事项
任务
坐标
数据库
注意事项
类型
结果
关羽
赵云
中文
求是
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
服务器管理口ip地址
福山网络安全
读秀数据库一框式检索
西科大网络安全技术
数据库xml 结构
数据库表 树
长宁区项目网络技术怎么样
网页设计师网络技术问题
艾洛裳网络技术有限公司
关于网络安全的英语词汇
社交软件开发制作步骤
服务器移动关机
学习软件开发比较好的学校
软件开发专项奖励申请报告
四川智慧养老管理平台软件开发
杭州游卡网络技术有限公司好吗
java 服务器通信安全
石峰区软件开发培训学费
数据库与教育
金华精快软件开发有限公司
深圳身边软件开发公司
网络安全规定等级
铁木真网络技术代运营怎么样
有一份网络安全手册请查收
网络安全有哪几个特征
净网2018网络安全执法检查
国外网站的代理服务器
网络安全编程学什么
幻塔推荐服务器名称
网络安全法进社区宣传活动