oracle队列(AQ)---实现orale到mysql的数据同步
发表于:2025-11-07 作者:千家信息网编辑
千家信息网最后更新 2025年11月07日,高级队列(Advanced Queue,简称AQ):高级队列是oracle的一种高级应用,它主要是表和触发器之间的组合而成的一种应用。其主要作用是在各应用系统中进行消息传递。利用高级队列来实现消息在两
千家信息网最后更新 2025年11月07日oracle队列(AQ)---实现orale到mysql的数据同步高级队列(Advanced Queue,简称AQ):
高级队列是oracle的一种高级应用,它主要是表和触发器之间的组合而成的一种应用。其主要作用是在各应用系统中进行消息传递。
利用高级队列来实现消息在两个不同数据库之间的异步传输,满足业务系统的改造需求。
我们利用触发器+高级序列 然后加ruby读取队列的主键,然后再在对应表中查出数据,insert 进mysql,这是idc机房oracle到idc的mysql的过程; 至于idc机房到阿里云的mysql,处于安全考虑,ruby不能直接连接rds,借助了mq, 先放放到mq,然后从mq读取放进rds. 实现oracle到mysql的同步。需要注意oracle高级序列是可以让多个 Oracle 高级队列具体开发步骤如下: (1)首先确定应用的需求,是否适合使用高级队列?使用高级队列预计提高性能的预期值
(2)赋予数据库账户相应aq权限。
(3)确定队列包体结构,即创建type。
(4)创建队列表及队列。
(5)队列管理
一:我们的队列结构(type):
因为我们oracle中信息表有两个:t_publish_info 和t_publish_zbxx,所以会比会员表t_member_info 的type多一个字段:table_name 用来区分是从那个表读取数据,其中opr也是是标识位,1表示insert, 2表示update ,3表示delete CREATE OR REPLACE TYPE INFOSERVICE."INFO_SYNC_TYPE2" as object ( table_name number(3), opr number(2) , record_id number(20) ); 二:队列表:INFOSERVICE.T_INFO_SYNC_MESSAGE begin sys.dbms_aqadm.create_queue_table( queue_table => 'INFOSERVICE.T_INFO_SYNC_MESSAGE', queue_payload_type => 'INFOSERVICE.INFO_SYNC_TYPE2', sort_list => 'ENQ_TIME', compatible => '10.0.0', primary_instance => 0, secondary_instance => 0, storage_clause => 'tablespace INFOSERVICE pctfree 10 initrans 1 maxtrans 255 storage ( initial 16M next 16M minextents 1 maxextents unlimited )'); end; / 三:创建队列: begin sys.dbms_aqadm.create_queue ( queue_name => 'infoservice.q_info_sync_message', queue_table => 'INFOSERVICE.T_INFO_SYNC_MESSAGE', queue_type => sys.dbms_aqadm.normal_queue, max_retries => 3, retry_delay => 1, retention_time => 0 ); end; 启动队列: begin sys.dbms_aqadm.start_queue (queue_name => 'infoservice.q_info_sync_message',enqueue => true ,dequeue => true ); end;
暂停队列: begin sys.dbms_aqadm.stop_queue ( queue_name => '队列名'); end; 删除队列: begin sys.dbms_aqadm.drop_queue ( queue_name => '队列名'); end; 删除队列表: begin sys.dbms_aqadm.drop_queue_table (queue_table => '队列表名'); end; 四:入队存储过程DBMS_AQ.enqueue: create or replace procedure infoservice.info_sync_enqueue( table_name in number, opr in number,record_id in number) as begin DECLARE queue_options DBMS_AQ.enqueue_options_t; message_properties DBMS_AQ.message_properties_t; message_id RAW(16); my_message info_sync_type2; BEGIN my_message := info_sync_type2( table_name, opr,record_id ); DBMS_AQ.enqueue(queue_name => 'infoservice.q_info_sync_message', enqueue_options => queue_options, message_properties => message_properties, payload => my_message, msgid => message_id); COMMIT; END; end info_sync_enqueue; 出对存储过程DBMS_AQ.DEQUEUE: create or replace procedure infoservice.info_sync_dequeue(table_name out number,opr out number ,record_id out number) as begin DECLARE queue_options DBMS_AQ.DEQUEUE_OPTIONS_T; message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; message_id RAW(200); my_message info_sync_type2; BEGIN DBMS_AQ.DEQUEUE( queue_name => 'infoservice.q_info_sync_message', dequeue_options => queue_options, message_properties => message_properties, payload => my_message, msgid => message_id ); COMMIT; table_name := my_message.table_name; opr :=my_message. opr; record_id := my_message.record_id ; END; end info_sync_dequeue; 五:我们这里用的触发器实现自动入队,当在T_PUBLISH_INFO表上做增删改的时候,触发入队,通过标识字段 opr的值 来区分: 1表示insert, 2表示update ,3表示delete: create or replace trigger INFOSERVICE.TRG_PUBLISH_2016A_Q before insert or delete or update of table_name,table_name2,cust_id, title,publish_date,OK_STATUS,OK_DATE,UP_DATE,IN_DATE,FILE_NAME on T_PUBLISH_INFO for each row declare queue_options DBMS_AQ.enqueue_options_t; message_properties DBMS_AQ.message_properties_t; message_id RAW(16); my_message info_sync_type2; opr number(2); table_name number(3); begin opr := 0; table_name := 22;###就把标识位置为22, CASE WHEN inserting THEN if (:NEW.OK_STATUS = 'Y') then opr := 1; my_message := info_sync_type2( table_name, opr,:new.record_id ); end if; WHEN updating THEN if ((:NEW.OK_STATUS != :OLD.OK_STATUS)or (:NEW.OK_STATUS = 'Y' )) then opr := 2; my_message := info_sync_type2( table_name, opr,:new.record_id ); end if; WHEN deleting THEN opr := 3; my_message := info_sync_type2( table_name, opr,:old.record_id ); END CASE; if ( opr != 0) then DBMS_AQ.enqueue(queue_name => 'infoservice.q_info_sync_message', enqueue_options => queue_options, message_properties =>message_properties, payload => my_message, msgid => message_id); end if; end; 这里的table_name:=22,就是说这个触发器入队的时候,就把标识位table_name设置成了22,方便接下来ruby在读取的时候,判断从那个表读取数据, call infoservice.info_sync_enqueue(21,1,39097403); call infoservice.info_sync_enqueue(155,1,39097403); ---这样就读t_publish_info_20142015 执行入队的存储过程,就会把type(INFO_SYNC_TYPE2)对应的三个字段的值存进队列表中,如下:需要注意的是队列表的字段是固定的。 SQL> desc INFOSERVICE.T_INFO_SYNC_MESSAGE; Name Null? Type ----------------------------------------- -------- ---------------------------- Q_NAME VARCHAR2(30) MSGID NOT NULL RAW(16) CORRID VARCHAR2(128) PRIORITY NUMBER STATE NUMBER DELAY TIMESTAMP(6) EXPIRATION NUMBER TIME_MANAGER_INFO TIMESTAMP(6) LOCAL_ORDER_NO NUMBER CHAIN_NO NUMBER CSCN NUMBER DSCN NUMBER ENQ_TIME TIMESTAMP(6) ENQ_UID VARCHAR2(30) ENQ_TID VARCHAR2(30) DEQ_TIME TIMESTAMP(6) DEQ_UID VARCHAR2(30) DEQ_TID VARCHAR2(30) RETRY_COUNT NUMBER EXCEPTION_QSCHEMA VARCHAR2(30) EXCEPTION_QUEUE VARCHAR2(30) STEP_NO NUMBER RECIPIENT_KEY NUMBER DEQUEUE_MSGID RAW(16) SENDER_NAME VARCHAR2(30) SENDER_ADDRESS VARCHAR2(1024) SENDER_PROTOCOL NUMBER USER_DATA INFOSERVICE.INFO_SYNC_TYPE2 USER_PROP ANYDATA 查看现在队列表中的数据,USER_DATA.table_name ,USER_DATA.opr ,USER_DATA.record_id 这三个是之前type中定义的字段。 select * from INFOSERVICE.T_INFO_SYNC_MESSAGE;

总结:我们这个案例中高级队列中实际上保存的最重要的信息是那个主键id,也就是record_id,然后ruby连接上oracle和mysql。通过主键id在oracle对应的表中查出数据(通过table_name 的值来判断是从哪个表找数据),通过标识符opr的值来判断是增删改操作。ruby读取oracle数据到内存中,然后insert 到mysql。实现了可靠的异步同步。
高级队列是oracle的一种高级应用,它主要是表和触发器之间的组合而成的一种应用。其主要作用是在各应用系统中进行消息传递。
利用高级队列来实现消息在两个不同数据库之间的异步传输,满足业务系统的改造需求。
我们利用触发器+高级序列 然后加ruby读取队列的主键,然后再在对应表中查出数据,insert 进mysql,这是idc机房oracle到idc的mysql的过程; 至于idc机房到阿里云的mysql,处于安全考虑,ruby不能直接连接rds,借助了mq, 先放放到mq,然后从mq读取放进rds. 实现oracle到mysql的同步。需要注意oracle高级序列是可以让多个 Oracle 高级队列具体开发步骤如下: (1)首先确定应用的需求,是否适合使用高级队列?使用高级队列预计提高性能的预期值
(2)赋予数据库账户相应aq权限。
(3)确定队列包体结构,即创建type。
(4)创建队列表及队列。
(5)队列管理
一:我们的队列结构(type):
因为我们oracle中信息表有两个:t_publish_info 和t_publish_zbxx,所以会比会员表t_member_info 的type多一个字段:table_name 用来区分是从那个表读取数据,其中opr也是是标识位,1表示insert, 2表示update ,3表示delete CREATE OR REPLACE TYPE INFOSERVICE."INFO_SYNC_TYPE2" as object ( table_name number(3), opr number(2) , record_id number(20) ); 二:队列表:INFOSERVICE.T_INFO_SYNC_MESSAGE begin sys.dbms_aqadm.create_queue_table( queue_table => 'INFOSERVICE.T_INFO_SYNC_MESSAGE', queue_payload_type => 'INFOSERVICE.INFO_SYNC_TYPE2', sort_list => 'ENQ_TIME', compatible => '10.0.0', primary_instance => 0, secondary_instance => 0, storage_clause => 'tablespace INFOSERVICE pctfree 10 initrans 1 maxtrans 255 storage ( initial 16M next 16M minextents 1 maxextents unlimited )'); end; / 三:创建队列: begin sys.dbms_aqadm.create_queue ( queue_name => 'infoservice.q_info_sync_message', queue_table => 'INFOSERVICE.T_INFO_SYNC_MESSAGE', queue_type => sys.dbms_aqadm.normal_queue, max_retries => 3, retry_delay => 1, retention_time => 0 ); end; 启动队列: begin sys.dbms_aqadm.start_queue (queue_name => 'infoservice.q_info_sync_message',enqueue => true ,dequeue => true ); end;
暂停队列: begin sys.dbms_aqadm.stop_queue ( queue_name => '队列名'); end; 删除队列: begin sys.dbms_aqadm.drop_queue ( queue_name => '队列名'); end; 删除队列表: begin sys.dbms_aqadm.drop_queue_table (queue_table => '队列表名'); end; 四:入队存储过程DBMS_AQ.enqueue: create or replace procedure infoservice.info_sync_enqueue( table_name in number, opr in number,record_id in number) as begin DECLARE queue_options DBMS_AQ.enqueue_options_t; message_properties DBMS_AQ.message_properties_t; message_id RAW(16); my_message info_sync_type2; BEGIN my_message := info_sync_type2( table_name, opr,record_id ); DBMS_AQ.enqueue(queue_name => 'infoservice.q_info_sync_message', enqueue_options => queue_options, message_properties => message_properties, payload => my_message, msgid => message_id); COMMIT; END; end info_sync_enqueue; 出对存储过程DBMS_AQ.DEQUEUE: create or replace procedure infoservice.info_sync_dequeue(table_name out number,opr out number ,record_id out number) as begin DECLARE queue_options DBMS_AQ.DEQUEUE_OPTIONS_T; message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; message_id RAW(200); my_message info_sync_type2; BEGIN DBMS_AQ.DEQUEUE( queue_name => 'infoservice.q_info_sync_message', dequeue_options => queue_options, message_properties => message_properties, payload => my_message, msgid => message_id ); COMMIT; table_name := my_message.table_name; opr :=my_message. opr; record_id := my_message.record_id ; END; end info_sync_dequeue; 五:我们这里用的触发器实现自动入队,当在T_PUBLISH_INFO表上做增删改的时候,触发入队,通过标识字段 opr的值 来区分: 1表示insert, 2表示update ,3表示delete: create or replace trigger INFOSERVICE.TRG_PUBLISH_2016A_Q before insert or delete or update of table_name,table_name2,cust_id, title,publish_date,OK_STATUS,OK_DATE,UP_DATE,IN_DATE,FILE_NAME on T_PUBLISH_INFO for each row declare queue_options DBMS_AQ.enqueue_options_t; message_properties DBMS_AQ.message_properties_t; message_id RAW(16); my_message info_sync_type2; opr number(2); table_name number(3); begin opr := 0; table_name := 22;###就把标识位置为22, CASE WHEN inserting THEN if (:NEW.OK_STATUS = 'Y') then opr := 1; my_message := info_sync_type2( table_name, opr,:new.record_id ); end if; WHEN updating THEN if ((:NEW.OK_STATUS != :OLD.OK_STATUS)or (:NEW.OK_STATUS = 'Y' )) then opr := 2; my_message := info_sync_type2( table_name, opr,:new.record_id ); end if; WHEN deleting THEN opr := 3; my_message := info_sync_type2( table_name, opr,:old.record_id ); END CASE; if ( opr != 0) then DBMS_AQ.enqueue(queue_name => 'infoservice.q_info_sync_message', enqueue_options => queue_options, message_properties =>message_properties, payload => my_message, msgid => message_id); end if; end; 这里的table_name:=22,就是说这个触发器入队的时候,就把标识位table_name设置成了22,方便接下来ruby在读取的时候,判断从那个表读取数据, call infoservice.info_sync_enqueue(21,1,39097403); call infoservice.info_sync_enqueue(155,1,39097403); ---这样就读t_publish_info_20142015 执行入队的存储过程,就会把type(INFO_SYNC_TYPE2)对应的三个字段的值存进队列表中,如下:需要注意的是队列表的字段是固定的。 SQL> desc INFOSERVICE.T_INFO_SYNC_MESSAGE; Name Null? Type ----------------------------------------- -------- ---------------------------- Q_NAME VARCHAR2(30) MSGID NOT NULL RAW(16) CORRID VARCHAR2(128) PRIORITY NUMBER STATE NUMBER DELAY TIMESTAMP(6) EXPIRATION NUMBER TIME_MANAGER_INFO TIMESTAMP(6) LOCAL_ORDER_NO NUMBER CHAIN_NO NUMBER CSCN NUMBER DSCN NUMBER ENQ_TIME TIMESTAMP(6) ENQ_UID VARCHAR2(30) ENQ_TID VARCHAR2(30) DEQ_TIME TIMESTAMP(6) DEQ_UID VARCHAR2(30) DEQ_TID VARCHAR2(30) RETRY_COUNT NUMBER EXCEPTION_QSCHEMA VARCHAR2(30) EXCEPTION_QUEUE VARCHAR2(30) STEP_NO NUMBER RECIPIENT_KEY NUMBER DEQUEUE_MSGID RAW(16) SENDER_NAME VARCHAR2(30) SENDER_ADDRESS VARCHAR2(1024) SENDER_PROTOCOL NUMBER USER_DATA INFOSERVICE.INFO_SYNC_TYPE2 USER_PROP ANYDATA 查看现在队列表中的数据,USER_DATA.table_name ,USER_DATA.opr ,USER_DATA.record_id 这三个是之前type中定义的字段。 select * from INFOSERVICE.T_INFO_SYNC_MESSAGE;

总结:我们这个案例中高级队列中实际上保存的最重要的信息是那个主键id,也就是record_id,然后ruby连接上oracle和mysql。通过主键id在oracle对应的表中查出数据(通过table_name 的值来判断是从哪个表找数据),通过标识符opr的值来判断是增删改操作。ruby读取oracle数据到内存中,然后insert 到mysql。实现了可靠的异步同步。
队列
数据
高级
字段
标识
触发器
过程
应用
时候
存储
同步
三个
两个
之间
信息
序列
数据库
机房
消息
系统
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
改华为服务器风扇
软件开发和网络技术的区别
网络安全评估
为什么我总是提示服务器错误
csgo 网络安全宣传周
浙江软件开发加盟商市场报价
网络安全的手抄报最简单漂亮
服务器仓库管理软件有哪些
arcgis 数据库建库
2021年网络安全知识问答
文件服务器高级安全设置
医疗机构数据库
软件开发类高职
国产服务器品牌 SAP认证
优质软件开发公司电话
物联网应用软件开发
端游游戏服务器多少钱
征途服务器管理器要注册吗
如何给服务器做数据
假设数据库只有两个事物
2021年河南警察学院网络安全
找大状互联网科技有限公司
数据库中地址怎么拆分为省市
互联网与科技有关的公司
服务器内存插槽不对
为了提高企业内部服务器的安全性
深圳市乐施软件开发
携程软件开发成本
调度自动化网络安全
天津服务器公司云空间