锁定老帖子 主题:Java实时获取oracle变更
该帖已经被评为良好帖
|
|
---|---|
作者 | 正文 |
发表时间:2008-11-16
最后修改:2009-10-29
背景: 要做一个车辆GPS监控系统,主要分两块: 1.采集。由GPS厂商提供实时数据,通过UDP包接收 2.展示。前端程序获取到最新GPS数据后,在地图上模拟车辆的运行情况 备选方案: 1.采集程序接收到UDP包并解析后,将数据放入数据库;前端程序轮询数据库以获取最新数据。 2.采集程序接收到UDP包并解析后,将数据放入数据库,同时向前端程序发送一条消息,传递最新数据。 方案评估: 方案1:最简单,最传统;增加不必要的数据库查询,并且非实时,轮询时间间隔不好确定。 方案2:可实现“实时”,但增加采集程序职责,采集程序本不知道前端系统的存在。 最终方案: 最后采取了另一种方案:通过oracle捕获进程捕获数据库变更(采集程序insert或update一条记录时,捕获进程即时获取到该条记录),将变更记录发送到AQ(oracle高级队列,JMS的oracle实现),前端程序只关注AQ,当有新消息到来时,即刻可收到并做相应处理,反映出实时状态。 关于捕获进程,请参考《Streams概述》,《Streams捕获进程》 实现: SQL代码 创建表空间和用户: Create tablespace streams_tbs datafile 'E:\DBSERVER\ORACLE9I\ORADATA\TESTDB\stream_tbs.dbf' size 25M Reuse autoextend on maxsize unlimited; --修改目标表(要捕获变更的表)追加日志 ALTER TABLE myoracle.TEST_GPS_STATUS ADD SUPPLEMENTAL LOG GROUP log_group_gpsstatus_pk (DEVICEID) ALWAYS; create user strmadmin identified by strmadmin default tablespace streams_tbs quota unlimited on streams_tbs; grant connect, resource, select_catalog_role to strmadmin; 授予相应权限 grant execute on dbms_aqadm to strmadmin; grant execute on dbms_capture_adm to strmadmin; grant execute on dbms_propagation_adm to strmadmin; grant execute on dbms_streams_adm to strmadmin; grant execute on dbms_apply_adm to strmadmin; grant execute on dbms_flashback to strmadmin; grant execute on dbms_aq to strmadmin; grant execute on dbms_aqjms to strmadmin; grant execute on dbms_aqin to strmadmin; grant execute on dbms_aqjms_internal to strmadmin; 执行系统存储过程分配权限 BEGIN DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE( privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ, grantee => 'strmadmin', grant_option => FALSE); END; / BEGIN DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE( privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ, grantee => 'strmadmin', grant_option => FALSE); END; / 以strmadmin帐户登录oracle 创建AQ,类型为JMS消息 BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE( Queue_table => 'gpsstatus_queue_table', Queue_payload_type => 'SYS.AQ$_JMS_MESSAGE', multiple_consumers => false, compatible => '8.1.5'); DBMS_AQADM.CREATE_QUEUE( Queue_name => 'gpsstatus_queue', Queue_table => 'gpsstatus_queue_table'); DBMS_AQADM.START_QUEUE( queue_name => 'gpsstatus_queue'); END; / BEGIN DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => 'gps_temp_queue_table', queue_name => 'gps_temp_queue'); END; / 为目标表创建捕获进程 BEGIN DBMS_STREAMS_ADM.ADD_TABLE_RULES( table_name => 'myoracle.TEST_GPS_STATUS', streams_type => 'capture', streams_name => 'capture_gps', queue_name => 'gps_temp_queue', include_dml => true, include_ddl => false); END; / 初始化scn DECLARE iscn NUMBER; -- Variable to hold instantiation SCN value BEGIN iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER(); DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN( source_object_name => 'myoracle.TEST_GPS_STATUS', source_database_name => 'TESTdb', instantiation_scn => iscn); END; / 为消息队列创建代理 BEGIN DBMS_AQADM.CREATE_AQ_AGENT( agent_name => 'gpsstatus_agent'); DBMS_AQADM.ENABLE_DB_ACCESS( agent_name => 'gpsstatus_agent', db_username => 'strmadmin'); END; / DECLARE subscriber SYS.AQ$_AGENT; BEGIN subscriber := SYS.AQ$_AGENT('gpsstatus_agent', NULL, NULL); SYS.DBMS_AQADM.ADD_SUBSCRIBER( queue_name => 'strmadmin.gpsstatus_queue', subscriber => subscriber, rule => NULL, transformation => NULL); END; / 创建存储过程以决定将哪些信息放到消息队列里面 CREATE OR REPLACE PROCEDURE enq_gps_lcr(in_any IN SYS.ANYDATA) IS --agent sys.aq$_agent := sys.aq$_agent('gpsstatus_agent', null, 0); message sys.aq$_jms_message; enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; msgid raw(16); lcr SYS.LCR$_ROW_RECORD; rc PLS_INTEGER; DEVICEID varchar2(11); GATHERDATETIME date; LONGITUDETYPE char(1); LONGITUDEVALUE number ; LATITUDETYPE char(1); LATITUDEVALUE number ; SPEED number ; DIRECTION number ; BEGIN rc := in_any.GETOBJECT(lcr); DEVICEID:=lcr.get_value('new','DEVICEID').ACCESSvarchar2(); GATHERDATETIME := lcr.GET_VALUE('new','GATHERDATETIME').ACCESSdate(); LONGITUDETYPE := lcr.GET_VALUE('new','LONGITUDETYPE').ACCESSchar(); LONGITUDEVALUE := lcr.GET_VALUE('new','LONGITUDEVALUE').ACCESSnumber(); LATITUDETYPE := lcr.GET_VALUE('new','LATITUDETYPE').ACCESSchar(); LATITUDEVALUE := lcr.GET_VALUE('new','LATITUDEVALUE').ACCESSnumber(); SPEED := lcr.GET_VALUE('new','SPEED').ACCESSnumber(); DIRECTION := lcr.GET_VALUE('new','DIRECTION').ACCESSnumber(); message := sys.aq$_jms_message.construct(1); --message.set_replyto(agent); message.set_type(''); message.set_userid('strmadmin'); message.set_appid(''); message.set_groupid(''); message.set_groupseq(''); message.set_string_property('DEVICEID', DEVICEID); message.set_string_property('GATHERDATETIME', to_char(GATHERDATETIME,'yyyy-MM-dd hh24:mi:ss')); message.set_string_property('LONGITUDETYPE', LONGITUDETYPE); message.set_string_property('LONGITUDEVALUE', to_char(LONGITUDEVALUE) ); message.set_string_property('LATITUDETYPE', LATITUDETYPE); message.set_string_property('LATITUDEVALUE', to_char(LATITUDEVALUE)); message.set_string_property('SPEED', to_char(SPEED) ); message.set_string_property('DIRECTION', to_char(DIRECTION) ); --指定消息生存时间 message_properties.expiration:=60; dbms_aq.enqueue(queue_name => 'strmadmin.gpsstatus_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => msgid); COMMIT; END; / 为目标表配置处理器 BEGIN DBMS_APPLY_ADM.SET_DML_HANDLER( object_name => 'myoracle.TEST_GPS_STATUS', object_type => 'TABLE', operation_name => 'UPDATE', --可配置为insert,update,delete等 error_handler => false, user_procedure => 'strmadmin.enq_gps_lcr', apply_database_link => NULL); END; / 设定参数及启动捕获进程 BEGIN DBMS_STREAMS_ADM.ADD_TABLE_RULES( table_name => 'myoracle.TEST_GPS_STATUS', streams_type => 'apply', streams_name => 'apply_gps', queue_name => 'strmadmin.gps_temp_queue', include_dml => true, include_ddl => false, source_database => 'TESTdb'); END; / BEGIN DBMS_APPLY_ADM.SET_PARAMETER( apply_name => 'apply_gps', parameter => 'disable_on_error', value => 'n'); END; / BEGIN DBMS_APPLY_ADM.START_APPLY( apply_name => 'apply_gps'); END; / BEGIN DBMS_CAPTURE_ADM.START_CAPTURE( capture_name => 'capture_gps'); END; / 至此,捕获进程配置完毕 可update一条myoracle.TEST_GPS_STATUS 中的记录,再查询gpsstatus_queue_table中是否有对应的一条记录。如果有,则配置成功。 下面是java处理代码,可直接使用JMS接口 本例使用oracle提供的API QueueConnectionFactory queueConnectionFactory = null; QueueConnection queueConnection = null; QueueSession queueSession = null; Queue queue = null; QueueReceiver subscriber = null; Message message = null; log.info("开始连接 "); queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(ip,sid, port, "thin"); queueConnection = queueConnectionFactory.createQueueConnection(userName, password); log.info("创建Queue Connection 成功"); queueConnection.start(); log.info("connection started"); queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); .info("Queue session created"); queue = ((AQjmsSession) queueSession).getQueue(userName, queueName); log.info("Queue getted"); subscriber = queueSession.createReceiver(queue); log.info("初始化完成"); 开始取消息,本例采取while(true)的方式获取消息。当没有消息的时候,线程会一直阻塞,直到有新的消息到来,立即取出。 while (true) { message = subscriber.receive();//receive方法使没有新消息时,线程挂起 //do something... } 最后: 本文只是试图探求一种比较好的获取实时数据方法,并不适用于所有场合,但在处理实时告警,订单等方面,应该是有一定的用武之地,若结合comet等技术,完全可以实现真正的实时。 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2008-11-16
好帖子,以前还不知道有这种实现。
另: javaeye 的 ADMIN 们,这不是灌水贴,我只是表示这个帖子对我有用,我被你们整怕了。。。 |
|
返回顶楼 | |
发表时间:2008-11-16
为什么不在入库前,多做一步顺便扔到JMS队列里?
岂不是更简单? 不过看到楼主这个帖子我也学到了不少东西! |
|
返回顶楼 | |
发表时间:2008-11-17
duooluu 写道 为什么不在入库前,多做一步顺便扔到JMS队列里?
岂不是更简单? 其实也未尝不可,不过我认为楼主的办法更好。 数据的生产者和消费者根本没有必要相互耦合。 生产者:只管往数据库里插(无论以何种方式) 消费者:只管“实时获取oracle变更”即可 “入库前,多做一步顺便扔到JMS队列里”还有一个事务问题,对于要求较高情况很难满足。 |
|
返回顶楼 | |
发表时间:2008-11-18
最后修改:2008-11-18
myy 写道 duooluu 写道 为什么不在入库前,多做一步顺便扔到JMS队列里?
岂不是更简单? 其实也未尝不可,不过我认为楼主的办法更好。 数据的生产者和消费者根本没有必要相互耦合。 生产者:只管往数据库里插(无论以何种方式) 消费者:只管“实时获取oracle变更”即可 “入库前,多做一步顺便扔到JMS队列里”还有一个事务问题,对于要求较高情况很难满足。 不错,正是如此。 我们认为采集程序只负责采集和入库,不必要加重负担(只是这两项工作就快累趴下了),也不必与前端程序耦合,两者可以从不见面。采集程序也没必要和JMS打交道。 假设这种情况:数据源有多个。也许有两个采集程序同时在工作,还有一个DBA在手动更改数据,另外,还有外部系统通过webservice提交数据。 若直接使用JMS,难免要让DBA手动发送一条消息到队列?webservice也得发送消息,采集程序还得发送。 如果使用oracle捕获进程,在我们看来,不关心是谁修改了数据,只关心数据是否被修改。 |
|
返回顶楼 | |
发表时间:2008-12-15
oralce还有这样的队列,不错。就好像数据库的“反向通知”。。。还有就是用oralce中的java是否也能实现相同功能,是否有相应的API呢?
|
|
返回顶楼 | |
发表时间:2008-12-16
xuyao 写道 oralce还有这样的队列,不错。就好像数据库的“反向通知”。。。还有就是用oralce中的java是否也能实现相同功能,是否有相应的API呢?
捕获进程与AQ是属于oracle的,安装有oracle企业版便可直接进行配置。 至于对AQ的读写,oracle提供了很丰富的API,包括java,c++,pl/sql等,有兴趣可查看oracle技术文档 |
|
返回顶楼 | |
发表时间:2008-12-16
不错 我们部门为了值班方便 写了一个警报程序 但不实时 明天去单位拿楼主方式试一下结合dwr
关于oracle API楼主能说详细写么?还真没接触过 |
|
返回顶楼 | |
发表时间:2008-12-16
反向通知在MS SQL Server 2005中也有.
|
|
返回顶楼 | |
发表时间:2008-12-17
对于Oracle技术的应用此帖不错!
但谈到系统设计,建议少用Oracle的基于数据库的这种通知机制。 只能说,也许楼主偏爱Oracle吧,每个程序员都有自己的偏爱的,呵呵! |
|
返回顶楼 | |