千家信息网

12C数据库Goldengate同步异构数据库Kafka中间件之二

发表于:2025-11-11 作者:千家信息网编辑
千家信息网最后更新 2025年11月11日,前两天测试环境的需求将上线生产环境,需求还是a. 数据源:SSP库 ssp.m_system_user,Oracle DB 12.1.0.2.0,Ogg Version 12.2.0.1.1 OGGC
千家信息网最后更新 2025年11月11日12C数据库Goldengate同步异构数据库Kafka中间件之二

前两天测试环境的需求将上线生产环境,需求还是
a. 数据源:SSP库 ssp.m_system_user,Oracle DB 12.1.0.2.0,Ogg Version 12.2.0.1.1 OGGCORE_12.2.0.1.0_PLATFORMS_151211.1401_FBO
b. 数据目标:MySQL DLS库 DLS_SYSTEM_USER
c.kafka集群:10.1.1.247 ,Ogg Version 12.3.0.1.0 OGGCORE_OGGADP.12.3.0.1.0GA_PLATFORMS_170828.1608

由于oracle 12c已经是多租户架构,在使用OGG同步的时候,需要考虑下面一些情况

一个 CDB包含多个PDB
抽取模式只能是integrated(集成)模式,不支持claasic capture传统方式捕获;
因为要使用integrated extract,因此,需要能访问log mining server,而这个只能从cdb$root中访问;
源端要使用common user,即c##ogg这种用户来访问源端DB,这样能访问DB的redo log & all pdbs。
在GGSCI或参数文件中,可以使用pdb.schema.table来访问具体的表或序列;
可以在参数文件 中使用sourceCatalog参数,指定一个PDB,后面的参数中只需要schema.table即可;
目标端每个pdb要有一个replicat进程,即一个replicat进程只能投递到一个PDB,不能投递到多个。
源端OGG用户需要赋权:dbms_goldengate_auth.grant_admin_privilege('C##GGADMIN',container=>'all'),同时建议将ogg的用户设置赋权为:grant dba to c##ogg container=all;
源端DB除了以前要打开归档, force logging, 最小附加日志,可能还需要打开一个开关:alter system set enable_goldengate_replication=true;

具体实施步骤;
1、为要同步的表添加附加日志
dblogin userid ogg@SALESPDB,password OGG_PROD
add trandata ssp.m_system_user

2、 添加抽取进程
add extract EXT_KAF4,integrated tranlog, begin now --12c区别
add EXTTRAIL ./dirdat/k4, extract EXT_KAF4,MEGABYTES 200

GGSCI (salesdb as ogg@salesdb/SALESPDB) 17> dblogin useridalias ggroot
Successfully logged into database CDB$ROOT.
--得在cdb里面注册ext进程
register extract EXT_KAF4 database container(SALESPDB)

edit params EXT_KAF4

extract EXT_KAF4
userid c##ggadmin,PASSWORD ggadmin
LOGALLSUPCOLS
UPDATERECORDFORMAT COMPACT
exttrail ./dirdat/k4,FORMAT RELEASE 12.1
SOURCECATALOG SALESPDB
table ssp.m_system_user;

3、添加投递进程:
add extract PMP_KAF4, exttrailsource ./dirdat/k4
add rmttrail ./dirdat/b4,EXTRACT PMP_KAF4,MEGABYTES 200

eidt params PMP_KAF4

EXTRACT PMP_KAF4
USERID c##ggadmin,PASSWORD ggadmin
PASSTHRU
RMTHOST 10.1.1.247, MGRPORT 9178
RMTTRAIL ./dirdat/b4,format release 12.1
SOURCECATALOG SALESPDB
table ssp.m_system_user;

4.添加初始化进程
ADD EXTRACT ek_04, sourceistable ---源端添加

EXTRACT ek_04
USERID c##ggadmin,PASSWORD ggadmin
RMTHOST 10.1.1.247, MGRPORT 9178
RMTFILE ./dirdat/b5,maxfiles 999, megabytes 500,format release 12.1
SOURCECATALOG SALESPDB
table ssp.m_system_user;

5.生成def文件:
edit param salesdb4

USERID c##ggadmin,PASSWORD ggadmin
defsfile /home/oracle/ogg/ggs12/dirdef/salesdb4.def,format release 12.1
SOURCECATALOG SALESPDB
table ssp.m_system_user;

在OGG_HOME下执行如下命令生成def文件
defgen paramfile ./dirprm/salesdb4.prm

将生成的def文件传到目标端kafka--$OGG_HOME/dirdef下

---mysql 数据库地址:10.1.11.24 mysql地址

---kafka地址 10.1.1.246:0000,10.1.1.247:0000 主题:DLS_MERCHANT

1、添加初始化进程:---dirprm
ADD replicat rp_06,specialrun

EDIT PARAMS rp_06

SPECIALRUN
end runtime
setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
targetdb libfile libggjava.so set property=./dirprm/kafka_k05.props
SOURCEDEFS ./dirdef/salesdb4.def
EXTFILE ./dirdat/b5
reportcount every 1 minutes, rate
grouptransops 10000
map SALESPDB.SSP.M_SYSTEM_USER,TARGET DLS.DLS_SYSTEM_USER;

2、添加复制进程:
add replicat rep_04,exttrail ./dirdat/b4
edit params rep_04

REPLICAT rep_04
setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
HANDLECOLLISIONS
targetdb libfile libggjava.so set property=./dirprm/kafka_k05.props
SOURCEDEFS ./dirdef/salesdb4.def
reportcount every 1 minutes, rate
grouptransops 10000
map SALESPDB.SSP.M_SYSTEM_USER,TARGET DLS.DLS_SYSTEM_USER;

3、参数配置:
cd /home/appgroup/ogg/ggs12/dirprm

custom_kafka_producer.properties 文件

vi kafka_k05.props

gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
#The following resolves the topic name using the short table name
gg.handler.kafkahandler.topicMappingTemplate= DLS_MERCHANT
#The following selects the message key using the concatenated primary keys
#gg.handler.kafkahandler.keyMappingTemplate=
#gg.handler.kafkahandler.format=avro_op
gg.handler.kafkahandler.format =json
gg.handler.kafkahandler.format.insertOpKey=I
gg.handler.kafkahandler.format.updateOpKey=U
gg.handler.kafkahandler.format.deleteOpKey=D
gg.handler.kafkahandler.format.truncateOpKey=T
gg.handler.kafkahandler.format.prettyPrint=false
gg.handler.kafkahandler.format.jsonDelimiter=CDATA[]
gg.handler.kafkahandler.format.includePrimaryKeys=true
gg.handler.kafkahandler.SchemaTopicName= DLS_MERCHANT
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=op
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
#Sample gg.classpath for Apache Kafka
gg.classpath=dirprm/:/opt/cloudera/parcels/KAFKA/lib/kafka/libs/
#Sample gg.classpath for HDP
#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/

javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

----启动各个进程
1,源端抽取,投递,初始化进程启动
2,目标端启动初始化进程、执行初始化脚本,启动复制进程

start rp_06

./replicat paramfile ./dirprm/rp_06.prm reportfile ./dirrpt/rp_06.rpt -p INITIALDATALOAD

start rep_04

进程 文件 参数 目标 数据 地址 用户 抽取 生成 同步 多个 日志 模式 环境 赋权 需求 附加 数据库 最小 主题 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 深圳外猫通网络技术有限公司 学校网络安全教育培训平台 绝地求生 服务器名称 城市轨道计算机网络技术题库 网络安全周2开幕式 济南软件开发客服 数据库生成随机字母 王者荣耀服务器有影响吗 数据库与应用实践教程读后感 数据库课程设计说明书 碳计算数据库名称 提高计算机网络安全的方法 下载站用什么服务器 天网公共安全网络技术有限公司 web服务器安全防护中标 医疗行业网络安全检查表 亿通医院管理系统服务器 亚马逊云服务器账号密码错误 嘉定区正规金融网络技术服务热线 服务器小机全称是什么 浏阳国税局网络安全培训班 中国四大数据库之一 基础数据库 人口法人 上海网络安全学习人才缺口大 web服务器负载均衡怎么做 把数据库的数据清空 网络安全法律法规案例 龙之信条连接服务器中 网络安全展示区 网络安全产品竞品分析怎么写
0