如何使用Apache Flink实现自定义Sink
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,如何使用Apache Flink实现自定义Sink,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。socket发送过来的数据,把Strin
千家信息网最后更新 2025年12月03日如何使用Apache Flink实现自定义Sink
如何使用Apache Flink实现自定义Sink,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
socket发送过来的数据,把String类型转成对象,然后把Java对象保存到Mysql数据库中。
创建数据库和表
create database imooc_flink;create table student(id int(11) NOT NULL AUTO_INCREMENT,name varchar(25),age int(10),primary key(id))
导入mysql依赖:
mysql mysql-connector-java 8.0.15
创建POJO Student
package com.vincent.course05;public class Student { private int id; private String name; private int age; @Override public String toString() { return "Student{" + "id=" + id + ", name='" + name + '\'' + ", age=" + age + '}'; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; }}然后创建连接,SinkToMySQL.java
package com.vincent.course05;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;public class SinkToMySQL extends RichSinkFunction{ PreparedStatement ps; private Connection connection; /** * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接 * * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = getConnection(); String sql = "insert into student(id, name, age) values(?, ?, ?);"; ps = this.connection.prepareStatement(sql); } @Override public void close() throws Exception { super.close(); //关闭连接和释放资源 if (connection != null) { connection.close(); } if (ps != null) { ps.close(); } } /** * 每条数据的插入都要调用一次 invoke() 方法 * * @param value * @param context * @throws Exception */ @Override public void invoke(Student value, Context context) throws Exception { //组装数据,执行插入操作 ps.setInt(1, value.getId()); ps.setString(2, value.getName()); ps.setInt(3, value.getAge()); ps.executeUpdate(); } private static Connection getConnection() { Connection con = null; try { Class.forName("com.mysql.cj.jdbc.Driver"); con = DriverManager.getConnection("jdbc:mysql://192.168.152.45:3306/imooc_flink?useUnicode=true&characterEncoding=UTF-8", "root", "123456"); } catch (Exception e) { e.printStackTrace(); System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage()); } return con; }}
main方法:
public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource source = environment.socketTextStream("192.168.152.45", 9999); SingleOutputStreamOperator studentStream = source.map(new MapFunction() { @Override public Student map(String value) throws Exception { String[] splits = value.split(","); Student student = new Student(); student.setId(Integer.parseInt(splits[0])); student.setName(splits[1]); student.setAge(Integer.parseInt(splits[2])); return student; } }); studentStream.addSink(new SinkToMySQL()); environment.execute("JavaCustomSinkToMysql"); } 从socket中获取数据,数据格式使用逗号分割,在控制台中输入:
nc -lk 99991,tom,23
检查数据库,数据库中多了一条数据
mysql> select * from student;+----+------+------+| id | name | age |+----+------+------+| 1 | tom | 23 |+----+------+------+1 row in set (0.00 sec)
这样就很方便的使用自定义的sink,写入到MySQL中去。
总结:
第一步:继承RichSinkFunction
T就是想要写入的对象类型 第二步:重写方法 open/close生命周期方法,invoke每条记录执行一次
默认情况下open方法的并行度不是1,跟具体的电脑有关系。
看完上述内容,你们掌握如何使用Apache Flink实现自定义Sink的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!
数据
方法
数据库
对象
内容
更多
类型
问题
束手无策
为此
不用
原因
周期
对此
就是
情况
技能
控制台
时候
格式
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
昆明云服务器ssh
应用数据库的专业
儿童网络安全的含义
阿里云服务器数据库想删掉在哪里
福建超频服务器价格
青岛东王子软件开发
泰国网络安全公益视频
欣邦网络技术有限公司
什么是网络技术学
国家安全与网络安全教育
福建数字化城管软件开发
数据库怎么判断记录数
tomcat 服务器集群
怎样关闭access数据库
最新计算机网络安全相关书籍
软件开发完成后最麻烦的是什么
如何找到点赞过的服务器我的世界
雨晴网络安全
创建空数据库
农业银行软件开发中心 天津
its2数据库二级结构预测
怎么做软件开发简历
java软件开发参考文献
加强网络安全管理视频
怎样改服务器开机密码
安徽广东网络安全培训线上学习
彩视软件开发商
数据库的资源复制是什么
泰兴进口网络技术大概费用
云端时代最关键是网络安全吗