如何理解Apache Flink CDC原理与使用
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,如何理解Apache Flink CDC原理与使用,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。CDC (Change Data Cap
千家信息网最后更新 2025年12月01日如何理解Apache Flink CDC原理与使用
如何理解Apache Flink CDC原理与使用,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
CDC (Change Data Capture)
Flink在1.11版本中新增了CDC的特性,简称 改变数据捕获。名称来看有点乱,我们先从之前的数据架构来看CDC的内容。
以上是之前的mysql binlog日志处理流程,例如canal监听binlog把日志写入到kafka中。而Apache Flink实时消费Kakfa的数据实现mysql数据的同步或其他内容等。拆分来说整体上可以分为以下几个阶段。
mysql开启binlog canal同步binlog数据写入到kafka flink读取kakfa中的binlog数据进行相关的业务处理。
整体的处理链路较长,需要用到的组件也比较多。Apache Flink CDC可以直接从数据库获取到binlog供下游进行业务计算分析。简单来说链路会变成这样
也就是说数据不再通过canal与kafka进行同步,而flink直接进行处理mysql的数据。节省了canal与kafka的过程。
Flink 1.11中实现了mysql-cdc与postgre-CDC,也就是说在Flink 1.11中我们可以直接通过Flink来直接消费mysql,postgresql的数据进行业务的处理。
使用场景
数据库数据的增量同步 数据库表之上的物理化视图 维表join 其他业务处理 ...
MySQL CDC 操作实践
首先需要保证mysql数据库开启了binlog。未开启请查阅相关资料进行binlog的启用。自建默认是不开启binlog的。
源表
DROP TABLE IF EXISTS `t_test`;
CREATE TABLE `t_test` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`ip` varchar(255) DEFAULT NULL,
`size` bigint(20) DEFAULT NULL
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=183 DEFAULT CHARSET=utf8mb4;
添加mysql-cdc相关依赖
com.alibaba.ververica
flink-connector-mysql-cdc
1.1.0
compile
相关代码实现
def main(args: Array[String]): Unit = {
val envSetting = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env, envSetting)
val sourceDDL =
"CREATE TABLE test_binlog (" +
" id INT NOT NULl," +
" ip STRING," +
" size INT" +
") WITH (" +
"'connector' = 'mysql-cdc'," +
"'hostname' = 'localhost'," +
"'port' = '3306'," +
"'username' = 'root'," +
"'password' = 'cain'," +
"'database-name' = 'test'," +
"'table-name' = 't_test'" +
")"
// 输出目标表
val sinkDDL =
"CREATE TABLE test_sink (\n" +
" ip STRING,\n" +
" countSum BIGINT,\n" +
" PRIMARY KEY (ip) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")"
val exeSQL =
"INSERT INTO test_sink " +
"SELECT ip, COUNT(1) " +
"FROM test_binlog " +
"GROUP BY ip"
tableEnv.executeSql(sourceDDL)
tableEnv.executeSql(sinkDDL)
val result = tableEnv.executeSql(exeSQL)
result.print()
}
启动flink job,并且插入数据
INSERT INTO `test`.`t_test`( `ip`, `size`) VALUES (UUID(), 1231231);
INSERT INTO `test`.`t_test`( `ip`, `size`) VALUES (UUID(), 1231231);
INSERT INTO `test`.`t_test`( `ip`, `size`) VALUES (UUID(), 1231231);
...
插入数据可直接在console中看到flink处理的结果

Apache Flink CDC的方式替代了之前的canal+kafka节点.直接通过sql的方式来实现对mysql数据的同步。
看完上述内容,你们掌握如何理解Apache Flink CDC原理与使用的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!
数据
处理
同步
业务
内容
数据库
原理
也就是
也就是说
整体
方式
方法
日志
更多
链路
问题
消费
束手无策
为此
代码
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
区块链网络安全怎么学
芜湖宦勇网络技术有限公司
数据库一般连接
数据库挖掘系统结构
退伍军人能做软件开发吗
hannah数据库
网络安全和禁毒知识
gta5连接不到游戏服务器
正规网络技术报价
电网哪个部门负责网络安全
上海一站式网络技术
广东东莞最好的软件开发技校
电力系统 软件开发
辅警大专计算机网络技术
维护移动网络安全
在工厂软件开发
iec服务器靠谱吗
思科计算机网络技术实训答案
网络技术自我发展规划
怎么改rfid数据库
qq游戏收件服务器
一张有关网络安全的书漫画作品
济南交警系统软件开发公司
网络安全讲师 兼职
软件开发合作存在问题
网络技术学习励志头像
筛选不连续的数据库
数据库书店管理系统ER图
区块链数据库
盐城营销软件开发定做价格