`
aswang
  • 浏览: 847628 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

基于Oracle Streams + Oracle AQ 捕获变更,发布变更(一)

阅读更多
要求:使用Oracle Streams捕获某个用户下部分表的DML操作变更,并通过Oracle的AQ(高级队列)对外发布,然后Java端通过JMS来获取变更,并执行后续同步操作。
数据库部分:
1、使用Streams要求Oracle以归档模式运行,归档日志默认存放在DB_RECOVERY_FILE_DEST指定的位置,由于该区域有大小限制,
所以,为了避免空间不足导致的后续问题,首先需要修改Oracle归档日志目录
 
alter system set log_archive_dest='/home/dev/app/dev/oradata/archivelog';
--其中archivelog目录为手动创建
--如果该语句执行失败,可以先尝试将DB_RECOVERY_FILE_DEST置空,再执行上述语句。
alter system set DB_RECOVERY_FILE_DEST='';
 
2、修改数据库为归档模式运行:
shutdown immediate;
startup nomount;
alter database mount;
alter database archivelog;
archive log list;
3、修改系统参数:
alter system set aq_tm_processes=2 scope=both;
alter system set global_names=true scope=both;
alter system set job_queue_processes=10 scope=both;
alter system set parallel_max_servers=20 scope=both;
alter system set undo_retention=3600 scope=both;
alter system set streams_pool_size=25M scope=spfile; 
 
4、更改数据设置,增加补充日志信息
alter database add supplemental log data;
--该语句是在数据库级别添加的补充日志,也可以单独为表设置
alter table user add supplemental log group g_user (id) always;
 
5、创建用于Oracle Streams的表空间和用户
create tablespace streams_tbs datafile '/home/dev/app/dev/oradata/stream_tbs.dbf'
  size 25M Reuse autoextend on maxsize unlimited;
 
create user stradmin
identified by stradmin
default tablespace streams_tbs quota unlimited on streams_tbs;  
6、授权
grant dba,select_catalog_role to strmadmin; 
 
--执行系统存储过程,分配权限
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
 privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
 grantee => 'stradmin',
 grant_option => FALSE);
END;
/
 
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
 privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
 grantee => 'stradmin',
 grant_option => FALSE);
END;
/
 
7、创建用于捕获进程的队列:
exec DBMS_STREAMS_ADM.SET_UP_QUEUE(
  queue_table => 'streams_queue_table', 
  queue_name => 'streams_queue');
 
8、创建用于捕获数据变更的捕获进程:
begin
   dbms_streams_adm.add_schema_rules(
     schema_name => 'scott',
     streams_type => 'capture',
     streams_name => 'capture_jms',
     queue_name => 'strmadmin.streams_queue',
     include_dml => true,
     include_ddl => false,
  source_database => 'orcl',
     inclusion_rule => true,
  and_condition => ':lcr.get_compatible()<dbms_streams.max_compatible()'
  );
 end;
/
说明:其中include_dml和include_ddl用于指明是否希望捕获DML操作和DDL操作。
and_condition用于添加捕获规则。
该捕获进程会捕获scott用户下所有表的dml变更
 
9、创建用于Oracle AQ的消息队列
BEGIN
   DBMS_AQADM.CREATE_QUEUE_TABLE(
        Queue_table => 'jms_queue_table',
        Queue_payload_type => 'SYS.AQ$_jms_bytes_message',
        multiple_consumers => true);
   DBMS_AQADM.CREATE_QUEUE(
      Queue_name => 'jms_queue',
      Queue_table => 'jms_queue_table');
   DBMS_AQADM.START_QUEUE(queue_name=> 'jms_queue');
END;
/
说明:
    应用进程会将streams_queue队列中的消息出队,然后经过转换插入到该队列中。
    queue_payload_type用于指定该队列存放的用户数据的类型。
 
10、初始化SCN
DECLARE
iscn NUMBER; -- Variable to hold instantiation SCN value
BEGIN
iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
DBMS_APPLY_ADM.SET_SCHEMA_INSTANTIATION_SCN(
 source_schema_name => 'scott',
    source_database_name => 'orcl',
    instantiation_scn => iscn,
    recursive => true);
END;
/
11、为AQ消息队列创建代理和订阅者
--6、为消息队列创建代理
exec DBMS_AQADM.CREATE_AQ_AGENT(agent_name => 'jms_agent');
exec DBMS_AQADM.ENABLE_DB_ACCESS(agent_name => 'jms_agent',
  db_username => 'strmadmin');
 
DECLARE
 subscriber SYS.AQ$_AGENT;
BEGIN
 subscriber := SYS.AQ$_AGENT('jms_agent', NULL, NULL);
 SYS.DBMS_AQADM.ADD_SUBSCRIBER(
  queue_name => 'strmadmin.jms_queue',
  subscriber => subscriber,
  rule => NULL,
  transformation => NULL);
END;
/
备注:这里的agent_name和jms_queue会在java的JMS中使用到。
 
12、创建应用进程
BEGIN
   dbms_streams_adm.add_schema_rules(
      schema_name => 'scott',
      streams_type => 'apply',
      streams_name => 'apply_jms',
      queue_name => 'strmadmin.streams_queue',
      include_dml => true,
      include_ddl => false,
      source_database => 'orcl',
      inclusion_rule => true);
END;
/
说明:默认情况下,应用进程会将streams_queue中的消息(LCR)出队,然后将消息对应的变更应用到指定的数据库中。
但在这里,我们希望通过应用进程进行消息转换,并插入到AQ的消息队列中,所以后面还需要针对目标表设置处理器。
 
13、创建用于应用进程的消息处理存储过程,对LCR进行转换,并插入到jms_queue队列
CREATE OR REPLACE PROCEDURE enq_jms_lcr(in_any IN SYS.ANYDATA) IS
  message SYS.AQ$_jms_bytes_message;
  enqueue_options dbms_aq.enqueue_options_t;
  message_properties dbms_aq.message_properties_t;
  msgid raw(16);
  lcr SYS.LCR$_ROW_RECORD;
  rc PLS_INTEGER;
  agent sys.aq$_agent := sys.aq$_agent('jms_agent', null, NULL);
 
  ID number;
  tablename varchar2(100);
  action varchar2(100);
  updatetime date;
BEGIN
 rc := in_any.GETOBJECT(lcr);
 tablename := lcr.get_object_name();
 action := lcr.get_command_type();
 updatetime := lcr.get_source_time();
 
 if action = 'UPDATE' or action = 'DELETE' then
  ID := lcr.get_value('OLD', 'ID').ACCESSnumber();
 end if;
 
 if action = 'INSERT' then
  ID := lcr.get_value('NEW', 'ID').ACCESSnumber();
 end if;
 
 message := SYS.AQ$_jms_bytes_message.construct;
    message.set_type('map');
    message.set_userid('strmadmin');
    message.set_appid('jms_sync');
    message.set_groupid('jms');
    message.set_groupseq(1);
 
 message.set_long_property('id',ID);
 message.set_long_property('areacode',areacode);
 message.set_string_property('tablename', tablename);
 message.set_string_property('action', action);
 
 dbms_aq.enqueue(queue_name => 'strmadmin.jms_queue',
                  enqueue_options => enqueue_options,
                  message_properties => message_properties,
                  payload => message,
                  msgid => msgid);
END;
/
说明:在这个存储过程中,我们仅仅抓取了数据变更的关键信息:主键ID,表名,DML动作以及时间。
然后将信息封装在SYS.AQ$_jms_bytes_message中,并放入队列jms_queue中。
 
14、创建一个空的消息处理存储过程
CREATE OR REPLACE PROCEDURE empty_jms_lcr(in_any IN SYS.ANYDATA) IS
i number;
BEGIN
i:= 1;
END;
/
说明:由于前面的捕获进程会捕获用户下所有受支持的表的dml变更,但有少数表,我们不希望对其处理,
这个时候我们就可以通过一个什么都不做的存储过程,来覆盖应用进程的默认行为。否则,应用进程会默认将
这些表的变更进行处理,这时就会报错ORA-25215 user_data type and queue type do not match.
 
15、在上面建立了两个消息处理存储过程以后,我们需要将其设置为对应表的处理器。
--工具函数
create or replace PROCEDURE set_dml_handler(tablename varchar2, action varchar2, handler varchar2) is
begin
  DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name => tablename ,
    object_type => 'TABLE',
    operation_name => action,
    error_handler => false,
    user_procedure => handler,
    apply_database_link => NULL);
end;
/
 
--为目标表配置处理器stradmin.enq_jms_lcr
declare
  here number;
  type array_type is table of varchar2(100);
  --定义需要同步的表
  tables array_type := array_type('scott.USER');
 
  cursor c_table is
    select T.OWNER||'.'||T.TABLE_NAME tablename
      from dba_tables t
     where t.owner = 'scott'
       and instr(t.table_name,'EST') = 1 ;
 
begin
  for t in c_table loop
  here := 0;
    for i IN 1..tables.count loop
      if tables(i) = t.tablename then
        here := 1;
      end if;
    end loop;
 
 if here = 1 then
  dbms_output.put_line('enq_jms_lcr : ' || t.tablename);
        set_dml_handler(t.tablename, 'UPDATE', 'strmadmin.enq_jms_lcr');
        set_dml_handler(t.tablename, 'INSERT', 'strmadmin.enq_jms_lcr');
        set_dml_handler(t.tablename, 'DELETE', 'strmadmin.enq_jms_lcr');
 end if;
 
 if here = 0 then
  dbms_output.put_line('empty_jms_lcr : ' || t.tablename);
        set_dml_handler(t.tablename, 'UPDATE', 'strmadmin.empty_jms_lcr');
        set_dml_handler(t.tablename, 'INSERT', 'strmadmin.empty_jms_lcr');
        set_dml_handler(t.tablename, 'DELETE', 'strmadmin.empty_jms_lcr');
 end if;
  end loop;
end;
/
说明:上述存储过程,对应希望同步的表会设置enq_jms_lcr,而对于其它表则设置empty_jms_lcr。
 
16、启动应用进程以及捕获进程
exec DBMS_APPLY_ADM.SET_PARAMETER(apply_name => 'apply_jms',parameter => 'disable_on_error',value => 'n');
exec DBMS_APPLY_ADM.START_APPLY(apply_name => 'apply_jms');
exec DBMS_CAPTURE_ADM.START_CAPTURE(capture_name => 'capture_jms');
17、在完成上述操作以后,我们可以尝试对USER表进行INSERT、UPDATE和DELTE操作,然后查看jms_queue_table表中是否包含对应的转换后的数据
select t.rowid, t.* from jms_queue_table t;
如果有对应的记录,则说明配置成功,后面可以编写java代码来接受消息并处理了。
 
过程中的坑:
        在为应用进程创建消息处理存储过程以后,应用进程运行一次或几次之后,总会报错,状态为ABORT,错误信息为:ORA-25215 user_data type and queue type do not match.
错误原因描述的很清楚了,但是在检查了前后的语句,并没有发现数据类型与队列的数据类型不一致,
存储过程中放入队列的数据类型与创建队列时的类型均为SYS.AQ$_jms_bytes_message。
在google了N多信息,大体的解释都一样,说是保证进入队列的数据类型与队列表中的数据类型一致。
      于是在这个地方卡了很久,最后突然意识到,上面的捕获进程默认捕获用户下所有的表的变更,
而应用进程会自动的从捕获进程的队列strems_queue中获取所有变更,并将其应用到jms_queue队列。
但在刚开始时,默认只给需要同步的表设置了消息处理器,那么其它的表的变更消息在应用进程获取以后,
就没有转换为SYS.AQ$_jms_bytes_message,而是直接尝试进入队列,于是就报错类型不匹配了。
      在知道这个原因后,就为其它不需要同步的表,建立了一个空的消息处理器,来覆盖应用进程默认的行为,
从而确保应用进程不再报错终止运行。
 
 
错误排查:
查看捕获进程状态:
SELECT t.capture_name, t.capture_user, t.capture_type, t.status FROM DBA_CAPTURE t Where t.capture_name = 'CAPTURE_JMS';
 
查看应用进程状态:
select apply_name, queue_name, status, t.error_number, t.error_message from dba_apply t;
 
查看应用进程错误信息:
select t.rowid, t.* from dba_apply_error t where t.apply_name = 'APPLY_JMS';
查看消息队列:
SELECT t.rowid, t.* from JMS_QUEUE_TABLE t;
查看相关规则:
select * from dba_streams_rules;
分享到:
评论

相关推荐

    Oracle Streams Step by Step PPT

    Streams的特点还包括独立的捕获、传播和应用服务,以及通过Advanced Queues(AQ)实现的消息队列功能,支持发布/订阅模式,允许基于规则的捕获、转换和应用,与数据库的紧密集成,以及高度灵活的配置和内联转换。...

    Oracle Stream + AQ + JMS 搭建步骤

    1. **捕获进程**(dev_capture_jms)根据设定的捕获规则解析Oracle数据对象的消息变更记录(LCR),并将消息在捕获队列(dev_streams_queue)中入队。 2. **应用进程**(dev_apply_jms)根据设定的应用规则,会将...

    基于Oracle Streams的数据库实时备份技术研究.pdf

    Oracle Streams 是 Oracle 数据库的一种高级数据复制技术,用于在数据库之间同步数据,实现数据的实时备份和灾难恢复。该技术的核心在于捕获数据库中的变化事件,如插入、更新和删除操作,并将这些变化以逻辑改变...

    Oracle STREAMS数据同步复制技术应用.pdf

    该技术基于Oracle数据库的日志处理能力,通过解析数据库重做日志(Redo Log)来捕获数据变化,并将这些变化传播到其他目标数据库。 在Oracle数据库中,日志挖掘(LOGMINER)是一项关键的技术,它允许用户分析重做...

    Oracle 11g Streams Implementer’s Guide.rar

    Oracle 11g Streams 是一个强大的数据复制解决方案,它提供了实时的数据流功能,使得数据库间的同步、数据迁移和故障恢复成为可能。这个技术是Oracle数据库系统中的一个重要组件,尤其适用于分布式环境下的数据管理...

    oracle stream详细配置

    Oracle Streams 是一个强大的数据复制和集成工具,可以实现数据的实时捕获、处理以及传送功能。通过使用Oracle Streams,用户能够轻松地在不同的数据库之间进行数据同步、复制,支持多种应用场景,如灾难恢复、数据...

    Oracle 9i Streams

    它通过捕获源数据库(Source Database)上的变更数据,并将这些变更应用到目标数据库(Target Database)上,从而实现了数据的同步更新。此技术广泛应用于分布式数据库环境下的数据复制、日志挖掘等多种场景。 ####...

    通过Oracle的流复制实现数据库之间的同步

    1. **逻辑变更记录(LCR)**:Oracle Streams通过捕获数据库中的DDL(数据定义语言)和DML(数据操纵语言)操作,将其转换为逻辑变更记录,这些记录描述了对数据库所做的具体更改。 2. **队列管理器**:用于存储和...

    Oracle API

    6. **其他API**:Oracle还提供了其他的API,如XML DB API用于处理XML数据,Oracle Advanced Queuing (AQ) API用于消息队列,以及Oracle Streams API用于数据复制和变更数据捕获。 Oracle API的使用不仅涉及如何调用...

    Oracle Stream-安装配置

    Streams利用了Oracle的高级队列技术,通过解析归档日志,将数据库的DDL(数据定义语言)和DML(数据操纵语言)转换为可执行的语句,以实现在不同数据库间的同步。这一功能可以复制整个数据库或者数据库中的特定对象...

    Oracle Stream配置详细步骤

    通过遵循上述步骤,我们可以构建一个基于 Oracle Stream 的高效、可靠的数据复制和同步系统。这种系统不仅可以提高数据库的高可用性和容错能力,还能满足企业级应用的需求,例如报表生成、数据分析以及灾难恢复等。

    Oracle10g数据库自动诊断监视工具

    11. **其他问题**:例如,由于扩展磁盘分配导致的争用,移动对象高水位引发的冲突,以及Streams和AQ(Advanced Queuing)的问题。 ADDM的使用简化了DBA的工作流程,不再需要手动分析复杂的性能数据和猜测可能的问题...

    Oracle数据库10g-对自我管理数据库进行管理1-P.pptx

    高级特性如高级复制(Advanced Replication)、Oracle Streams、AQ(Advanced Queuing)以及虚拟专用数据库等,也得到了自动化的支持。 总之,Oracle数据库10g通过自我管理特性,实现了数据库管理的自动化、智能化...

    stream replication 双向复制

    Stream Replication 双向复制是一种高级的Oracle数据库复制技术,用于在两个数据库之间实现数据的实时同步。这种复制方式不仅可以确保数据的一致性,还能在主数据库出现问题时提供即时的故障转移能力。以下是对...

Global site tag (gtag.js) - Google Analytics