千家信息网

使用tunnel同步PG数据到kafka

发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,tunnel同步PG数据到kafka来自哈罗单车开源的组件。支持同步PG数据到kafka或者ES。https://github.com/hellobike/tunneltunnel整体的部署比较简单的
千家信息网最后更新 2025年12月02日使用tunnel同步PG数据到kafka

tunnel同步PG数据到kafka

来自哈罗单车开源的组件。支持同步PG数据到kafka或者ES。


https://github.com/hellobike/tunnel

tunnel整体的部署比较简单的

需要事先部署好zkkafka(我下面演示的是单节点的zkkafka

节点部署关系:

192.168.2.4 部署zkkafkapg10运行在1921端口

192.168.2.189 部署tunnel

确保已开启PG的逻辑复制

wal_level = 'logical';

max_replication_slots = 20

注意这个设置要重启PG进程的

然后,创建测试库表和同步用的账号

CREATE DATABASE test_database;

\c test_database

create table test_1 (id int primary key , name char(40));

create table test_2 (id int primary key , name char(40));

CREATE ROLE test_rep LOGIN ENCRYPTED PASSWORD 'xxxx' REPLICATION;

GRANT CONNECT ON DATABASE test_database to test_rep;

vim pg_hba.conf 增加2行配置:

host all test_rep 192.168.2.0/24 md5

host replication test_rep 192.168.2.0/24 md5

然后 reload PG

192.168.2.189 机器上去 编译tunnel

注意: tunnel的启动需要事先安装好oracle jdk 1.8

git clone https://github.com/hellobike/tunnel

cd tunnel

mvn clean package -Dmaven.test.skip=true

cd target

unzip AppTunnelService.zip

cd AppTunnelService

vim conf/test.yml 内容如下:

tunnel_subscribe_config:

pg_dump_path: '/usr/local/pgsql-10.10/bin/pg_dump'

subscribes:

- slotName: slot_for_test

pgConnConf:

host: 192.168.2.4

port: 1921

database: test_database

user: test_rep

password: xxxx

rules:

- {table: test_1, pks: ['id'], topic: test_1_logs}

- {table: test_2, pks: ['id'], topic: test_2_logs}

kafkaConf:

addrs:

- 192.168.2.4:9092

tunnel_zookeeper_address: 192.168.2.4:2181

前台启动:

java -server -classpath conf/*:lib/* com.hellobike.base.tunnel.TunnelLauncher -u false -c cfg.properties -p 7788 # 暴露prometheus metric7788端口(配置监控不是这里的重点,也很简单,暂时先跳过)


然后,我们再在PG10上面的test_database2张表随便造些数据,然后可以看到kafka里面已经有数据了(下图是通过kafkamanager kafka-eagle的结果)。





格式化下,数据就是这样的:

UPDATE的记录的样子:

{

"dataList": [{

"dataType": "integer",

"name": "id",

"value": "1111"

}, {

"dataType": "character",

"name": "name",

"value": "大狗蛋 "

}],

"eventType": "UPDATE",

"lsn": 10503246616,

"schema": "public",

"table": "test_1"

}

DELETE的记录的样子:

{

"dataList": [{

"dataType": "integer",

"name": "id",

"value": "3"

}],

"eventType": "DELETE",

"lsn": 10503247064,

"schema": "public",

"table": "test_1"

}

0