TiDB+FLINK进行数据实时统计的方法是什么
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,本篇内容主要讲解"TiDB+FLINK进行数据实时统计的方法是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"TiDB+FLINK进行数据实时统计的方法
千家信息网最后更新 2025年12月03日TiDB+FLINK进行数据实时统计的方法是什么
本篇内容主要讲解"TiDB+FLINK进行数据实时统计的方法是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"TiDB+FLINK进行数据实时统计的方法是什么"吧!
1.TiCDC 具体配置如下
# 指定配置文件中涉及的库名、表名是否为大小写敏感# 该配置会同时影响 filter 和 sink 相关配置,默认为 truecase-sensitive = true# 是否输出 old value,从 v4.0.5 开始支持enable-old-value = true[filter]# 忽略指定 start_ts 的事务ignore-txn-start-ts = [1, 2]# 过滤器规则# 过滤规则语法:https://docs.pingcap.com/zh/tidb/stable/table-filter#表库过滤语法 指定了我的销售表rules = ['dspdev.sales_order_header'][mounter]# mounter 线程数,用于解码 TiKV 输出的数据worker-num = 16[sink]# 对于 MQ 类的 Sink,可以通过 dispatchers 配置 event 分发器# 支持 default、ts、rowid、table 四种分发器,分发规则如下:# - default:有多个唯一索引(包括主键)时按照 table 模式分发;只有一个唯一索引(或主键)按照 rowid 模式分发;如果开启了 old value 特性,按照 table 分发# - ts:以行变更的 commitTs 做 Hash 计算并进行 event 分发# - rowid:以所选的 HandleKey 列名和列值做 Hash 计算并进行 event 分发# - table:以表的 schema 名和 table 名做 Hash 计算并进行 event 分发# matcher 的匹配语法和过滤器规则语法相同dispatchers = [ {matcher = ['dspdev.*'], dispatcher = "ts"}]# 对于 MQ 类的 Sink,可以指定消息的协议格式# 目前支持 default、canal、avro 和 maxwell 四种协议。default 为 TiCDC Open Protocolprotocol = "canal"[cyclic-replication]# 是否开启环形同步enable = false# 当前 TiCDC 的复制 IDreplica-id = 1# 需要过滤掉的同步 IDfilter-replica-ids = [2,3]# 是否同步 DDLsync-ddl = true2 cdc sink 配置下游为kafka
--sink-uri="kafka://127.0.0.1:9092/cdc-test?kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1"
这样就会将tidb cdc 数据以protobuf数据发完kafka,我们只需要在下游做解析就好 具体配置解释参考:tidb配置连接
3 新建spring boot项目 引入 canal-client,kafka等配置
pom引入如下:
4.0.0 org.springframework.boot spring-boot-starter-parent 2.3.4.RELEASE com.konka.dsp kafka-parse 0.0.1-SNAPSHOT kafka-parse Demo project for Spring Boot 11 1.2.70 org.springframework.boot spring-boot-starter org.apache.kafka kafka-streams com.alibaba fastjson ${fastjson.version} com.alibaba.otter canal.client 1.1.4 org.springframework.kafka spring-kafka org.springframework.boot spring-boot-starter-test test org.junit.vintage junit-vintage-engine org.springframework.kafka spring-kafka-test test org.springframework.boot spring-boot-maven-plugin
properties 如下:
###########【Kafka集群】###########spring.kafka.bootstrap-servers=192.168.8.71:9092###########【初始化生产者配置】############ 重试次数spring.kafka.producer.retries=0# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)spring.kafka.producer.acks=1# 批量大小spring.kafka.producer.batch-size=16384# 提交延时spring.kafka.producer.properties.linger.ms=0# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了?# 生产端缓冲区大小spring.kafka.producer.buffer-memory = 33554432# Kafka提供的序列化和反序列化类spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer# 自定义分区器# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner###########【初始化消费者配置】############ 默认的消费组IDspring.kafka.consumer.properties.group.id=defaultConsumerGroup# 是否自动提交offsetspring.kafka.consumer.enable-auto-commit=true# 提交offset延时(接收到消息后多久提交offset)spring.kafka.consumer.auto.commit.interval.ms=1000# 当kafka中没有初始offset或offset超出范围时将自动重置offset# earliest:重置为分区中最小的offset;# latest:重置为分区中最新的offset(消费分区中新产生的数据);# none:只要有一个分区不存在已提交的offset,就抛出异常;spring.kafka.consumer.auto-offset-reset=latest# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)spring.kafka.consumer.properties.session.timeout.ms=120000# 消费请求超时时间spring.kafka.consumer.properties.request.timeout.ms=180000# Kafka提供的序列化和反序列化类spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=com.alibaba.otter.canal.client.kafka.MessageDeserializer# 消费端监听的topic不存在时,项目启动会报错(关掉)spring.kafka.listener.missing-topics-fatal=false#过滤table和字段table.data = {"sales_order_header":"id,customer_name,total_amount,created_date"}# 设置批量消费# spring.kafka.listener.type=batch# 批量消费每次最多消费多少条消息sprint boot kafka 消费端代码如下:
import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.serializer.SerializerFeature;import com.alibaba.otter.canal.protocol.CanalEntry;import com.alibaba.otter.canal.protocol.FlatMessage;import com.alibaba.otter.canal.protocol.Message;import com.konka.dsp.kafkaparse.CanalKafkaClientExample;import com.konka.dsp.kafkaparse.tidb.KafkaMessage;import com.konka.dsp.kafkaparse.tidb.TicdcEventData;import com.konka.dsp.kafkaparse.tidb.TicdcEventDecoder;import com.konka.dsp.kafkaparse.tidb.TicdcEventFilter;import com.konka.dsp.kafkaparse.tidb.value.TicdcEventDDL;import com.konka.dsp.kafkaparse.tidb.value.TicdcEventResolve;import com.konka.dsp.kafkaparse.tidb.value.TicdcEventRowChange;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;import java.io.UnsupportedEncodingException;import java.util.ArrayList;import java.util.Arrays;import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Optional;@Componentpublic class kafkaConsumer { protected final static Logger logger = LoggerFactory.getLogger(CanalKafkaClientExample.class); // 消费监听 @Autowired private KafkaTemplate kafkaTemplate; @Value("#{${table.data}}") private Map map; @KafkaListener(topics = {"cdc-test"}) public void onMessage1(ConsumerRecord consumerRecord) throws UnsupportedEncodingException { Message message = consumerRecord.value(); long batchId = message.getId(); FlatMessage fm = new FlatMessage(); List entrys = message.getEntries(); for (CanalEntry.Entry entry : entrys) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } CanalEntry.RowChange rowChage = null; try { rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } fm.setId(entry.getHeader().getExecuteTime()); fm.setDatabase(entry.getHeader().getSchemaName()); fm.setEs(entry.getHeader().getExecuteTime()); fm.setTs(entry.getHeader().getExecuteTime()); fm.setTable(entry.getHeader().getTableName()); fm.setType(rowChage.getEventType().name()); CanalEntry.EventType eventType = rowChage.getEventType(); fm.setIsDdl(rowChage.getIsDdl()); fm.setSql(rowChage.getSql()); Map mysqlTypes = new HashMap<>(); Map sqlType = new HashMap<>(); List pkNames = new ArrayList<>(); logger.info(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); String[] filtercolumn = map.get(entry.getHeader().getTableName()).split(","); logger.info(" filter --> column {}",filtercolumn); for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) { if (eventType == CanalEntry.EventType.DELETE) { fm.setData(saveRowData(rowData.getBeforeColumnsList(),pkNames,filtercolumn)); fm.setMysqlType(setMysqlTypes(rowData.getBeforeColumnsList(),filtercolumn)); fm.setSqlType(setSqlTypes(rowData.getBeforeColumnsList(),filtercolumn)); } else if (eventType == CanalEntry.EventType.INSERT) { fm.setData(saveRowData(rowData.getAfterColumnsList(),pkNames,filtercolumn)); fm.setMysqlType(setMysqlTypes(rowData.getAfterColumnsList(),filtercolumn)); fm.setSqlType(setSqlTypes(rowData.getAfterColumnsList(),filtercolumn)); } else { logger.info("-------> before->{}",rowData.getBeforeColumnsList().size()); fm.setOld(saveRowData(rowData.getBeforeColumnsList(),pkNames,filtercolumn)); logger.info("-------> after"); fm.setData(saveRowData(rowData.getAfterColumnsList(),pkNames,filtercolumn)); fm.setMysqlType(setMysqlTypes(rowData.getAfterColumnsList(),filtercolumn)); fm.setSqlType(setSqlTypes(rowData.getAfterColumnsList(),filtercolumn)); if(rowData.getBeforeColumnsList().size()==0&&rowData.getAfterColumnsList().size()>0){ fm.setType("INSERT"); } } } HashSet h = new HashSet(pkNames); pkNames.clear(); pkNames.addAll(h); fm.setPkNames(pkNames); } logger.info("json解析:{}",JSON.toJSONString(fm, SerializerFeature.WriteMapNullValue)); kafkaTemplate.send("canal-data",JSON.toJSONString(fm, SerializerFeature.WriteMapNullValue));//// FlatMessage flatMessage = (FlatMessage)JSON.parseObject(flatMessageJson, FlatMessage.class); // 消费的哪个topic、partition的消息,打印出消息内容// KafkaMessage kafkaMessage = new KafkaMessage();// kafkaMessage.setKey(consumerRecord.key());// kafkaMessage.setValue(consumerRecord.value());// kafkaMessage.setOffset(consumerRecord.offset());// kafkaMessage.setPartition(consumerRecord.partition());// kafkaMessage.setTimestamp(consumerRecord.timestamp());// TicdcEventFilter filter = new TicdcEventFilter();// TicdcEventDecoder ticdcEventDecoder = new TicdcEventDecoder(kafkaMessage);// while (ticdcEventDecoder.hasNext()) {// TicdcEventData data = ticdcEventDecoder.next();// if (data.getTicdcEventValue() instanceof TicdcEventRowChange) {// boolean ok = filter.check(data.getTicdcEventKey().getTbl(), data.getTicdcEventValue().getKafkaPartition(), data.getTicdcEventKey().getTs());// if (ok) {// // deal with row change event// } else {// // ignore duplicated messages// }// } else if (data.getTicdcEventValue() instanceof TicdcEventDDL) {// // deal with ddl event// } else if (data.getTicdcEventValue() instanceof TicdcEventResolve) {// filter.resolveEvent(data.getTicdcEventValue().getKafkaPartition(), data.getTicdcEventKey().getTs());// // deal with resolve event// }// System.out.println(JSON.toJSONString(data, true));// } } private List 这里基本上将 tidb的数据转化为canal-json格式数据,这里我们继续将转化后的数据发完kafka,以便kafka 继续消费,这里有个点就是不知道为什么tidb出来的insert和update eventtype类型都是UPDATE,所以我在代码做了判断没有OLD的话基本上就是INSERT了
4.flink 本地开发 建议下载搭建好环境参考flink table 配置
具体参考官网 flinktable配置 把table相关jar包拷贝到flink下的lib目录下即可 这里的会用到另外一个知乎开源的相关包项目地址如下: https://github.com/pingcap-incubator/TiBigData/ 把项目编译完成以后把flink相关jar包拷贝到flink下的lib下
5 最后在我们的相关业务库配置表这里我上代码了:
import org.apache.flink.api.java.DataSet;import org.apache.flink.table.api.*;import org.apache.flink.table.expressions.TimeIntervalUnit;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;public class SalesOrderStream { public static Table report(Table transactions) { return transactions.select( $("customer_name"), $("created_date"), $("total_amount")) .groupBy($("customer_name"), $("created_date")) .select( $("customer_name"), $("total_amount").sum().as("total_amount"), $("created_date") ); } public static void main(String[] args) throws Exception { EnvironmentSettings settings = EnvironmentSettings.newInstance().build(); TableEnvironment tEnv = TableEnvironment.create(settings);// tEnv.executeSql("CREATE TABLE sales_order_header_stream (\n" +//// " id BIGINT not null,\n" +// " customer_name STRING,\n"+//// " dsp_org_name STRING,\n"+// " total_amount DECIMAL(38,2),\n" +//// " total_discount DECIMAL(16,2),\n" +//// " pay_amount DECIMAL(16,2),\n" +//// " total_amount DECIMAL(16,2),\n" +// " created_date TIMESTAMP(3)\n" +// ") WITH (\n" +// " 'connector' = 'mysql-cdc',\n" +// " 'hostname' = '192.168.8.73',\n" +// " 'port' = '4000',\n"+// " 'username' = 'flink',\n"+// " 'password' = 'flink',\n"+// " 'database-name' = 'dspdev',\n"+// " 'table-name' = 'sales_order_header'\n"+// ")"); tEnv.executeSql("CREATE TABLE sales_order_header_stream (\n" + " `id` BIGINT,\n"+ " `total_amount` DECIMAL(16,2) ,\n"+ " `customer_name` STRING,\n"+ " `created_date` TIMESTAMP(3) ,\n"+ " PRIMARY KEY (`id`) NOT ENFORCED "+ ") WITH (\n" + "'connector' = 'kafka',\n"+ "'topic' = 'canal-data',\n"+ "'properties.bootstrap.servers' = '192.168.8.71:9092',\n"+ "'properties.group.id' = 'test',\n"+ "'scan.startup.mode' = 'earliest-offset',\n"+ "'format' = 'canal-json'\n"+ ")"); tEnv.executeSql("CREATE TABLE spend_report (\n" + " customer_name STRING,\n" +// " total_amount DECIMAL(16,2),\n" +// " total_discount DECIMAL(16,2),\n" +// " pay_amount DECIMAL(16,2),\n" + " total_amount DECIMAL(16,2),\n" + " created_date TIMESTAMP(3),\n" + " PRIMARY KEY (customer_name,created_date) NOT ENFORCED" + ") WITH (\n" + " 'connector' = 'tidb',\n" + " 'tidb.database.url' = 'jdbc:mysql://192.168.8.73:4000/dspdev',\n" + " 'tidb.username' = 'flink',\n"+ " 'tidb.password' = 'flink',\n"+ " 'tidb.database.name' = 'dspdev',\n"+ " 'tidb.table.name' = 'spend_report'\n"+ ")"); Table transactions = tEnv.from("sales_order_header_stream"); report(transactions).executeInsert("spend_report"); }}这样在我数据库里面就可以实时统计当前的销售总价并写入数据库里,最后数据库数据如下:
到此,相信大家对"TiDB+FLINK进行数据实时统计的方法是什么"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
数据
配置
消费
消息
生产
实时
方法
统计
序列
规则
语法
项目
内容
大小
数据库
时间
生产者
参考
同步
支持
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
毒理学数据库查找
idea直连数据库把负载打满
闵行区网络技术咨询服务优势
西安周至网络安全
链接数据库select
安特互联网科技有限公司
海口哪里有学习网络技术
人才管理数据库
361vpn服务器
ip数据库 mysql
鹤岗网络安全宣传周
ROCKBOX下载软件开发
四川电信招聘网络技术
女生有做网络安全培训的吗
普洱哪有软件开发定制
数据库设计的类型
网络安全模式不显示桌面
网络安全周开幕式在哪个城市举行
基本科学指标数据库
一般一个软件开发需要多少钱
网络技术在测绘工程的应用
金蝶加密服务器无效
数据库与信息管理技术
服务器三个小电脑的标志
互联网科技怎么建设
软件开发毕业设计中期检查表
县网络安全信息化委员会
软件开发外包安全
四川电信招聘网络技术
上海高科技网络技术特点