`
aswang
  • 浏览: 847663 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

基于Oracle Streams + Oracle AQ 捕获变更,发布变更(二)

阅读更多
要求:使用Oracle Streams捕获某个用户下部分表的DML操作变更,并通过Oracle的AQ(高级队列)对外发布,然后Java端通过JMS来获取变更,并执行后续同步操作。
 
Java部分:
 
1、jms配置信息
jms.local.username=strmadmin
jms.local.password=strmadmin
jms.local.jdbcUrl=jdbc:oracle:thin:@127.0.0.1:1521:orcl
jms.local.queueName=jms_queue
jms.local.agentName=jms_agent
jms.local.batchSize=10
jms.local.receiveTimeout=3000
 
对应的java class:
public final class JmsConfig {
    public String username;
    public String password;
    public String jdbcUrl;
    public String queueName;
    public String agentName;
   
    public int batchSize = 10;
    public long receiveTimeout = 5000L;
}
 
2、获取TopicConnection
private TopicConnection getTopicConnection(String jdbcUrl, String username, String password) throws JMSException {
        Properties info = new Properties();
        info.put(username, password);
 
        TopicConnectionFactory topicConnectionFactory =
            AQjmsFactory.getTopicConnectionFactory(jdbcUrl,info);
       
        return topicConnectionFactory.createTopicConnection(username, password);
    }
 
3、建立连接,并获取session
JmsConfig config = ......;
TopicConnection conn = getTopicConnection(config.jdbcUrl, config.username, config.password);
AQjmsSession session = (AQjmsSession)conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
       
conn.start();
 
4、获取Topic,创建subscriber,并接受消息
AQjmsDestination topic = (AQjmsDestination)session.getTopic(config.username, config.queueName);
       
AQjmsConsumer subscriber = (AQjmsConsumer)session.createDurableSubscriber(topic,  config.agentName); 
       
topic.start(session, false, true);
 
Message m = subscriber.receive(config.receiveTimeout);
AQjmsBytesMessage payload = (AQjmsBytesMessage)msg;
Long id = payload.getLongProperty("id");
String tablename =  payload.getStringProperty("tablename");
......
说明:这里接受信息会阻塞,在超过给定的超时时间后,如果没有获取到消息,会返回null。
JMS还支持另外一种监听方式:MessageListener
subscriber.setMessageListener(new MessageListener() {
    @Override
    public void onMessage(Message message) {
    }
});
    
5、在获取到Message以后,就可以后去其中的数据,然后进行后续的操作,比如通过这些信息,去更新另一个库中对应的数据。
 
需要注意的问题:
1、需要的oracle相关的jar包:aqapi13_g,jmscommon,orai18n以及ojdbc5-11.2.0.3.0
2、由于数据库的编码为ZHS16GBK,而该编码Java本身不支持,导致在获取非数字的信息时,比如表名等会出现乱码。好找oracle自身有国际化支持,只需要将对应的jar包orai18n加入classpath即可。该jar包可以在oracle客户端或者服务端的jlib目录下面找到。
3、JMS消息接收在采用监听的方式时,是在独立的线程中进行的,如果当前线程没有阻塞,则该线程结束也会导致监听线程结束。可以通过调用subscriber.getListenerWorkerThread().join() 让当前线程等待监听线程结束。
4、在创建Topic时,createTopicSession(false, Session.AUTO_ACKNOWLEDGE) 需要设置Session.AUTO_ACKNOWLEDGE,这样当我们接受消息以后,JMS会自动回复确认消息,然后Oracle数据库消息队列jms_queue中的数据才会被移除,否则jms_queue队列的数据会一直存在。
分享到:
评论

相关推荐

    Oracle Streams Step by Step PPT

    Streams的特点还包括独立的捕获、传播和应用服务,以及通过Advanced Queues(AQ)实现的消息队列功能,支持发布/订阅模式,允许基于规则的捕获、转换和应用,与数据库的紧密集成,以及高度灵活的配置和内联转换。...

    Oracle Stream + AQ + JMS 搭建步骤

    1. **捕获进程**(dev_capture_jms)根据设定的捕获规则解析Oracle数据对象的消息变更记录(LCR),并将消息在捕获队列(dev_streams_queue)中入队。 2. **应用进程**(dev_apply_jms)根据设定的应用规则,会将...

    基于Oracle Streams的数据库实时备份技术研究.pdf

    而基于 Oracle Streams 的实时备份则可以在数据库操作的同时进行备份,一旦发生故障,可以从备份中恢复最近的变化,实现几乎无数据丢失的恢复。 Oracle Streams 的工作流程主要包括以下几个步骤: 1. **配置捕获...

    Oracle STREAMS数据同步复制技术应用.pdf

    该技术基于Oracle数据库的日志处理能力,通过解析数据库重做日志(Redo Log)来捕获数据变化,并将这些变化传播到其他目标数据库。 在Oracle数据库中,日志挖掘(LOGMINER)是一项关键的技术,它允许用户分析重做...

    Oracle 11g Streams Implementer’s Guide.rar

    Redo log files是Oracle数据库用来存储事务日志的关键组件,对于Streams来说,它们是捕获和传播更改的基础。 7. **Purging and Archiving**:Streams允许设置策略来自动清除不再需要的更改信息,以节省存储空间。...

    oracle stream详细配置

    Oracle Streams 是一个强大的数据复制和集成工具,可以实现数据的实时捕获、处理以及传送功能。通过使用Oracle Streams,用户能够轻松地在不同的数据库之间进行数据同步、复制,支持多种应用场景,如灾难恢复、数据...

    Oracle 9i Streams

    它通过捕获源数据库(Source Database)上的变更数据,并将这些变更应用到目标数据库(Target Database)上,从而实现了数据的同步更新。此技术广泛应用于分布式数据库环境下的数据复制、日志挖掘等多种场景。 ####...

    通过Oracle的流复制实现数据库之间的同步

    1. **逻辑变更记录(LCR)**:Oracle Streams通过捕获数据库中的DDL(数据定义语言)和DML(数据操纵语言)操作,将其转换为逻辑变更记录,这些记录描述了对数据库所做的具体更改。 2. **队列管理器**:用于存储和...

    Oracle API

    6. **其他API**:Oracle还提供了其他的API,如XML DB API用于处理XML数据,Oracle Advanced Queuing (AQ) API用于消息队列,以及Oracle Streams API用于数据复制和变更数据捕获。 Oracle API的使用不仅涉及如何调用...

    Oracle Stream-安装配置

    Streams利用了Oracle的高级队列技术,通过解析归档日志,将数据库的DDL(数据定义语言)和DML(数据操纵语言)转换为可执行的语句,以实现在不同数据库间的同步。这一功能可以复制整个数据库或者数据库中的特定对象...

    Oracle Stream配置详细步骤

    通过遵循上述步骤,我们可以构建一个基于 Oracle Stream 的高效、可靠的数据复制和同步系统。这种系统不仅可以提高数据库的高可用性和容错能力,还能满足企业级应用的需求,例如报表生成、数据分析以及灾难恢复等。

    Oracle10g数据库自动诊断监视工具

    11. **其他问题**:例如,由于扩展磁盘分配导致的争用,移动对象高水位引发的冲突,以及Streams和AQ(Advanced Queuing)的问题。 ADDM的使用简化了DBA的工作流程,不再需要手动分析复杂的性能数据和猜测可能的问题...

    Oracle数据库10g-对自我管理数据库进行管理1-P.pptx

    高级特性如高级复制(Advanced Replication)、Oracle Streams、AQ(Advanced Queuing)以及虚拟专用数据库等,也得到了自动化的支持。 总之,Oracle数据库10g通过自我管理特性,实现了数据库管理的自动化、智能化...

    stream replication 双向复制

    Stream Replication 双向复制是一种高级的Oracle数据库复制技术,用于在两个数据库之间实现数据的实时同步。这种复制方式不仅可以确保数据的一致性,还能在主数据库出现问题时提供即时的故障转移能力。以下是对...

Global site tag (gtag.js) - Google Analytics