论坛首页 Java企业应用论坛

Java实时获取oracle变更

浏览 19632 次
该帖已经被评为良好帖
作者 正文
   发表时间:2008-11-16   最后修改:2009-10-29
在一个基于数据库的“实时系统”里面,“实时”获取数据库变化是非常重要的,本文主要描述通过Oracle中的捕获进程实时获取数据库变化。

背景:

       要做一个车辆GPS监控系统,主要分两块:

    1.采集。由GPS厂商提供实时数据,通过UDP包接收
    2.展示。前端程序获取到最新GPS数据后,在地图上模拟车辆的运行情况
      

备选方案:

     1.采集程序接收到UDP包并解析后,将数据放入数据库;前端程序轮询数据库以获取最新数据。
     2.采集程序接收到UDP包并解析后,将数据放入数据库,同时向前端程序发送一条消息,传递最新数据。

方案评估:



    方案1:最简单,最传统;增加不必要的数据库查询,并且非实时,轮询时间间隔不好确定。

    方案2:可实现“实时”,但增加采集程序职责,采集程序本不知道前端系统的存在。



最终方案:



    最后采取了另一种方案:通过oracle捕获进程捕获数据库变更(采集程序insert或update一条记录时,捕获进程即时获取到该条记录),将变更记录发送到AQ(oracle高级队列,JMS的oracle实现),前端程序只关注AQ,当有新消息到来时,即刻可收到并做相应处理,反映出实时状态。

    关于捕获进程,请参考《Streams概述》《Streams捕获进程》



实现:




  

SQL代码


    创建表空间和用户:

   

Create tablespace streams_tbs datafile 'E:\DBSERVER\ORACLE9I\ORADATA\TESTDB\stream_tbs.dbf' size 25M Reuse autoextend on maxsize unlimited;

--修改目标表(要捕获变更的表)追加日志
ALTER TABLE myoracle.TEST_GPS_STATUS ADD SUPPLEMENTAL LOG GROUP log_group_gpsstatus_pk (DEVICEID) ALWAYS;


create user strmadmin identified by strmadmin default tablespace streams_tbs quota unlimited on streams_tbs;


grant connect, resource, select_catalog_role to strmadmin;




授予相应权限

grant execute on dbms_aqadm to strmadmin;

grant execute on dbms_capture_adm to strmadmin;

grant execute on dbms_propagation_adm to strmadmin;

grant execute on dbms_streams_adm to strmadmin;

grant execute on dbms_apply_adm to strmadmin;

grant execute on dbms_flashback to strmadmin;

grant execute on dbms_aq to strmadmin;

grant execute on dbms_aqjms to strmadmin;

grant execute on dbms_aqin to strmadmin;

grant execute on dbms_aqjms_internal to strmadmin;
 



执行系统存储过程分配权限


   
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
grantee => 'strmadmin',
grant_option => FALSE);
END;
/



BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
grantee => 'strmadmin',
grant_option => FALSE);
END;
/
 





以strmadmin帐户登录oracle




创建AQ,类型为JMS消息



BEGIN
   DBMS_AQADM.CREATE_QUEUE_TABLE(
        Queue_table            => 'gpsstatus_queue_table',
        Queue_payload_type     => 'SYS.AQ$_JMS_MESSAGE',
        multiple_consumers  => false,
        compatible             => '8.1.5');
   DBMS_AQADM.CREATE_QUEUE(
      Queue_name          => 'gpsstatus_queue',
      Queue_table         => 'gpsstatus_queue_table');
   DBMS_AQADM.START_QUEUE(
      queue_name         => 'gpsstatus_queue');
END;
/
BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE(
    queue_table  => 'gps_temp_queue_table',
    queue_name   => 'gps_temp_queue');
END;
/




为目标表创建捕获进程



BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'myoracle.TEST_GPS_STATUS',
streams_type => 'capture',
streams_name => 'capture_gps',
queue_name => 'gps_temp_queue',
include_dml => true,
include_ddl => false);
END;
/




初始化scn


DECLARE
iscn NUMBER; -- Variable to hold instantiation SCN value
BEGIN
iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
source_object_name => 'myoracle.TEST_GPS_STATUS',
source_database_name => 'TESTdb',
instantiation_scn => iscn);
END;
/
 



为消息队列创建代理



BEGIN
DBMS_AQADM.CREATE_AQ_AGENT(
agent_name => 'gpsstatus_agent');
DBMS_AQADM.ENABLE_DB_ACCESS(
agent_name => 'gpsstatus_agent',
db_username => 'strmadmin');
END;
/
DECLARE
subscriber SYS.AQ$_AGENT;
BEGIN
subscriber := SYS.AQ$_AGENT('gpsstatus_agent', NULL, NULL);
SYS.DBMS_AQADM.ADD_SUBSCRIBER(
queue_name => 'strmadmin.gpsstatus_queue',
subscriber => subscriber,
rule => NULL,
transformation => NULL);
END;
/




创建存储过程以决定将哪些信息放到消息队列里面





CREATE OR REPLACE PROCEDURE enq_gps_lcr(in_any IN SYS.ANYDATA) IS
--agent sys.aq$_agent := sys.aq$_agent('gpsstatus_agent', null, 0);
message sys.aq$_jms_message;
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
msgid raw(16);
lcr SYS.LCR$_ROW_RECORD;
rc PLS_INTEGER;
DEVICEID varchar2(11);
GATHERDATETIME date;
LONGITUDETYPE char(1);
LONGITUDEVALUE number ;
LATITUDETYPE char(1);
LATITUDEVALUE number ;
SPEED number ;
DIRECTION number ;
BEGIN
rc := in_any.GETOBJECT(lcr);
DEVICEID:=lcr.get_value('new','DEVICEID').ACCESSvarchar2();
GATHERDATETIME := lcr.GET_VALUE('new','GATHERDATETIME').ACCESSdate();
LONGITUDETYPE := lcr.GET_VALUE('new','LONGITUDETYPE').ACCESSchar();
LONGITUDEVALUE := lcr.GET_VALUE('new','LONGITUDEVALUE').ACCESSnumber();
LATITUDETYPE := lcr.GET_VALUE('new','LATITUDETYPE').ACCESSchar();
LATITUDEVALUE := lcr.GET_VALUE('new','LATITUDEVALUE').ACCESSnumber();
SPEED := lcr.GET_VALUE('new','SPEED').ACCESSnumber();
DIRECTION := lcr.GET_VALUE('new','DIRECTION').ACCESSnumber();
message := sys.aq$_jms_message.construct(1);
--message.set_replyto(agent);
message.set_type('');
message.set_userid('strmadmin');
message.set_appid('');
message.set_groupid('');
message.set_groupseq('');
message.set_string_property('DEVICEID', DEVICEID);
message.set_string_property('GATHERDATETIME', to_char(GATHERDATETIME,'yyyy-MM-dd hh24:mi:ss'));
message.set_string_property('LONGITUDETYPE', LONGITUDETYPE);
message.set_string_property('LONGITUDEVALUE', to_char(LONGITUDEVALUE) );
message.set_string_property('LATITUDETYPE', LATITUDETYPE);
message.set_string_property('LATITUDEVALUE', to_char(LATITUDEVALUE));
message.set_string_property('SPEED', to_char(SPEED) );
message.set_string_property('DIRECTION', to_char(DIRECTION) );
--指定消息生存时间
message_properties.expiration:=60;
dbms_aq.enqueue(queue_name => 'strmadmin.gpsstatus_queue',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => msgid);
COMMIT;
END;
/




为目标表配置处理器




BEGIN
DBMS_APPLY_ADM.SET_DML_HANDLER(
object_name => 'myoracle.TEST_GPS_STATUS',
object_type => 'TABLE',
operation_name => 'UPDATE',  --可配置为insert,update,delete等
error_handler => false,
user_procedure => 'strmadmin.enq_gps_lcr',
apply_database_link => NULL);
END;
/




设定参数及启动捕获进程





BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'myoracle.TEST_GPS_STATUS',
streams_type => 'apply',
streams_name => 'apply_gps',
queue_name => 'strmadmin.gps_temp_queue',
include_dml => true,
include_ddl => false,
source_database => 'TESTdb');
END;
/
BEGIN
DBMS_APPLY_ADM.SET_PARAMETER(
apply_name => 'apply_gps',
parameter => 'disable_on_error',
value => 'n');
END;
/
BEGIN
DBMS_APPLY_ADM.START_APPLY(
apply_name => 'apply_gps');
END;
/
BEGIN
DBMS_CAPTURE_ADM.START_CAPTURE(
capture_name => 'capture_gps');
END;
/




至此,捕获进程配置完毕

可update一条myoracle.TEST_GPS_STATUS 中的记录,再查询gpsstatus_queue_table中是否有对应的一条记录。如果有,则配置成功。





下面是java处理代码,可直接使用JMS接口

本例使用oracle提供的API






QueueConnectionFactory queueConnectionFactory = null;
QueueConnection queueConnection = null;
QueueSession queueSession = null;

Queue queue = null;
QueueReceiver subscriber = null;
Message message = null;











log.info("开始连接 ");
queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(ip,sid, port, "thin");
queueConnection = queueConnectionFactory.createQueueConnection(userName, password);
log.info("创建Queue Connection 成功");
queueConnection.start();
log.info("connection started");
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
.info("Queue session created");
queue = ((AQjmsSession) queueSession).getQueue(userName, queueName);
log.info("Queue getted");
subscriber = queueSession.createReceiver(queue);
log.info("初始化完成");




开始取消息,本例采取while(true)的方式获取消息。当没有消息的时候,线程会一直阻塞,直到有新的消息到来,立即取出。





   

while (true) {
         message = subscriber.receive();//receive方法使没有新消息时,线程挂起
      //do something...
}
 







最后:



      本文只是试图探求一种比较好的获取实时数据方法,并不适用于所有场合,但在处理实时告警,订单等方面,应该是有一定的用武之地,若结合comet等技术,完全可以实现真正的实时。

    
   发表时间:2008-11-16  
好帖子,以前还不知道有这种实现。
另: javaeye 的 ADMIN 们,这不是灌水贴,我只是表示这个帖子对我有用,我被你们整怕了。。。
0 请登录后投票
   发表时间:2008-11-16  
为什么不在入库前,多做一步顺便扔到JMS队列里?
岂不是更简单?
不过看到楼主这个帖子我也学到了不少东西!
0 请登录后投票
   发表时间:2008-11-17  
duooluu 写道
为什么不在入库前,多做一步顺便扔到JMS队列里?
岂不是更简单?


其实也未尝不可,不过我认为楼主的办法更好。

数据的生产者和消费者根本没有必要相互耦合。

生产者:只管往数据库里插(无论以何种方式)
消费者:只管“实时获取oracle变更”即可

“入库前,多做一步顺便扔到JMS队列里”还有一个事务问题,对于要求较高情况很难满足。
0 请登录后投票
   发表时间:2008-11-18   最后修改:2008-11-18
myy 写道
duooluu 写道
为什么不在入库前,多做一步顺便扔到JMS队列里?
岂不是更简单?


其实也未尝不可,不过我认为楼主的办法更好。

数据的生产者和消费者根本没有必要相互耦合。

生产者:只管往数据库里插(无论以何种方式)
消费者:只管“实时获取oracle变更”即可

“入库前,多做一步顺便扔到JMS队列里”还有一个事务问题,对于要求较高情况很难满足。



不错,正是如此。
我们认为采集程序只负责采集和入库,不必要加重负担(只是这两项工作就快累趴下了),也不必与前端程序耦合,两者可以从不见面。采集程序也没必要和JMS打交道。

假设这种情况:数据源有多个。也许有两个采集程序同时在工作,还有一个DBA在手动更改数据,另外,还有外部系统通过webservice提交数据。
若直接使用JMS,难免要让DBA手动发送一条消息到队列?webservice也得发送消息,采集程序还得发送。

如果使用oracle捕获进程,在我们看来,不关心是谁修改了数据,只关心数据是否被修改。



0 请登录后投票
   发表时间:2008-12-15  
oralce还有这样的队列,不错。就好像数据库的“反向通知”。。。还有就是用oralce中的java是否也能实现相同功能,是否有相应的API呢?
0 请登录后投票
   发表时间:2008-12-16  
xuyao 写道
oralce还有这样的队列,不错。就好像数据库的“反向通知”。。。还有就是用oralce中的java是否也能实现相同功能,是否有相应的API呢?


捕获进程与AQ是属于oracle的,安装有oracle企业版便可直接进行配置。

至于对AQ的读写,oracle提供了很丰富的API,包括java,c++,pl/sql等,有兴趣可查看oracle技术文档
0 请登录后投票
   发表时间:2008-12-16  
不错 我们部门为了值班方便 写了一个警报程序 但不实时 明天去单位拿楼主方式试一下结合dwr

关于oracle API楼主能说详细写么?还真没接触过
0 请登录后投票
   发表时间:2008-12-16  
反向通知在MS SQL Server 2005中也有.
0 请登录后投票
   发表时间:2008-12-17  
对于Oracle技术的应用此帖不错!

但谈到系统设计,建议少用Oracle的基于数据库的这种通知机制。

只能说,也许楼主偏爱Oracle吧,每个程序员都有自己的偏爱的,呵呵!
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics