- 浏览: 70644 次
- 性别:
- 来自: 上海
文章分类
最新评论
-
wukele:
呵呵,整理附件下载该多好。
Java实时获取oracle变更 -
zz_1999:
lz,你piggyback方式试过没,我试了客户端没反映。
dwr reverse ajax -
jroad:
还有这个DefaultScriptSession 和scrip ...
DWR3-ReverseAjax-半推实现 -
jroad:
有点不明白,朋友的sessionmap从那时来的,请教下.
...
DWR3-ReverseAjax-半推实现
在一个基于数据库的“实时系统”里面,“实时”获取数据库变化是非常重要的,本文主要描述通过Oracle中的捕获进程实时获取数据库变化。
背景:
要做一个车辆GPS监控系统,主要分两块:
1.采集。由GPS厂商提供实时数据,通过UDP包接收
2.展示。前端程序获取到最新GPS数据后,在地图上模拟车辆的运行情况
备选方案:
1.采集程序接收到UDP包并解析后,将数据放入数据库;前端程序轮询数据库以获取最新数据。
2.采集程序接收到UDP包并解析后,将数据放入数据库,同时向前端程序发送一条消息,传递最新数据。
方案评估:
方案1:最简单,最传统;增加不必要的数据库查询,并且非实时,轮询时间间隔不好确定。
方案2:可实现“实时”,但增加采集程序职责,采集程序本不知道前端系统的存在。
最终方案:
最后采取了另一种方案:通过oracle捕获进程捕获数据库变更(采集程序insert或update一条记录时,捕获进程即时获取到该条记录),将变更记录发送到AQ(oracle高级队列,JMS的oracle实现),前端程序只关注AQ,当有新消息到来时,即刻可收到并做相应处理,反映出实时状态。
关于捕获进程,请参考《Streams概述》,《Streams捕获进程》
实现:
SQL代码
创建表空间和用户:
Sql代码 复制代码
1. Create tablespace streams_tbs datafile 'E:\DBSERVER\ORACLE9I\ORADATA\TESTDB\stream_tbs.dbf' size 25M Reuse autoextend on maxsize unlimited;
2.
3. --修改目标表(要捕获变更的表)追加日志
4. ALTER TABLE myoracle.TEST_GPS_STATUS ADD SUPPLEMENTAL LOG GROUP log_group_gpsstatus_pk (DEVICEID) ALWAYS;
5.
6.
7. create user strmadmin identified by strmadmin default tablespace streams_tbs quota unlimited on streams_tbs;
8.
9.
10. grant connect, resource, select_catalog_role to strmadmin;
Sql代码
1. Create tablespace streams_tbs datafile 'E:\DBSERVER\ORACLE9I\ORADATA\TESTDB\stream_tbs.dbf' size 25M Reuse autoextend on maxsize unlimited;
2.
3. --修改目标表(要捕获变更的表)追加日志
4. ALTER TABLE myoracle.TEST_GPS_STATUS ADD SUPPLEMENTAL LOG GROUP log_group_gpsstatus_pk (DEVICEID) ALWAYS;
5.
6.
7. create user strmadmin identified by strmadmin default tablespace streams_tbs quota unlimited on streams_tbs;
8.
9.
10. grant connect, resource, select_catalog_role to strmadmin;
Create tablespace streams_tbs datafile 'E:\DBSERVER\ORACLE9I\ORADATA\TESTDB\stream_tbs.dbf' size 25M Reuse autoextend on maxsize unlimited;
--修改目标表(要捕获变更的表)追加日志
ALTER TABLE myoracle.TEST_GPS_STATUS ADD SUPPLEMENTAL LOG GROUP log_group_gpsstatus_pk (DEVICEID) ALWAYS;
create user strmadmin identified by strmadmin default tablespace streams_tbs quota unlimited on streams_tbs;
grant connect, resource, select_catalog_role to strmadmin;
授予相应权限
Sql代码 复制代码
1. grant execute on dbms_aqadm to strmadmin;
2.
3. grant execute on dbms_capture_adm to strmadmin;
4.
5. grant execute on dbms_propagation_adm to strmadmin;
6.
7. grant execute on dbms_streams_adm to strmadmin;
8.
9. grant execute on dbms_apply_adm to strmadmin;
10.
11. grant execute on dbms_flashback to strmadmin;
12.
13. grant execute on dbms_aq to strmadmin;
14.
15. grant execute on dbms_aqjms to strmadmin;
16.
17. grant execute on dbms_aqin to strmadmin;
18.
19. grant execute on dbms_aqjms_internal to strmadmin;
20.
Sql代码
1. grant execute on dbms_aqadm to strmadmin;
2.
3. grant execute on dbms_capture_adm to strmadmin;
4.
5. grant execute on dbms_propagation_adm to strmadmin;
6.
7. grant execute on dbms_streams_adm to strmadmin;
8.
9. grant execute on dbms_apply_adm to strmadmin;
10.
11. grant execute on dbms_flashback to strmadmin;
12.
13. grant execute on dbms_aq to strmadmin;
14.
15. grant execute on dbms_aqjms to strmadmin;
16.
17. grant execute on dbms_aqin to strmadmin;
18.
19. grant execute on dbms_aqjms_internal to strmadmin;
20.
grant execute on dbms_aqadm to strmadmin;
grant execute on dbms_capture_adm to strmadmin;
grant execute on dbms_propagation_adm to strmadmin;
grant execute on dbms_streams_adm to strmadmin;
grant execute on dbms_apply_adm to strmadmin;
grant execute on dbms_flashback to strmadmin;
grant execute on dbms_aq to strmadmin;
grant execute on dbms_aqjms to strmadmin;
grant execute on dbms_aqin to strmadmin;
grant execute on dbms_aqjms_internal to strmadmin;
执行系统存储过程分配权限
Sql代码 复制代码
1. BEGIN
2. DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
3. privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
4. grantee => 'strmadmin',
5. grant_option => FALSE);
6. END;
7. /
8.
9.
10.
11. BEGIN
12. DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
13. privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
14. grantee => 'strmadmin',
15. grant_option => FALSE);
16. END;
17. /
18.
Sql代码
1. BEGIN
2. DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
3. privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
4. grantee => 'strmadmin',
5. grant_option => FALSE);
6. END;
7. /
8.
9.
10.
11. BEGIN
12. DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
13. privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
14. grantee => 'strmadmin',
15. grant_option => FALSE);
16. END;
17. /
18.
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
grantee => 'strmadmin',
grant_option => FALSE);
END;
/
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
grantee => 'strmadmin',
grant_option => FALSE);
END;
/
以strmadmin帐户登录oracle
创建AQ,类型为JMS消息
Sql代码 复制代码
1. BEGIN
2. DBMS_AQADM.CREATE_QUEUE_TABLE(
3. Queue_table => 'gpsstatus_queue_table',
4. Queue_payload_type => 'SYS.AQ$_JMS_MESSAGE',
5. multiple_consumers => false,
6. compatible => '8.1.5');
7. DBMS_AQADM.CREATE_QUEUE(
8. Queue_name => 'gpsstatus_queue',
9. Queue_table => 'gpsstatus_queue_table');
10. DBMS_AQADM.START_QUEUE(
11. queue_name => 'gpsstatus_queue');
12. END;
13. /
14. BEGIN
15. DBMS_STREAMS_ADM.SET_UP_QUEUE(
16. queue_table => 'gps_temp_queue_table',
17. queue_name => 'gps_temp_queue');
18. END;
19. /
Sql代码
1. BEGIN
2. DBMS_AQADM.CREATE_QUEUE_TABLE(
3. Queue_table => 'gpsstatus_queue_table',
4. Queue_payload_type => 'SYS.AQ$_JMS_MESSAGE',
5. multiple_consumers => false,
6. compatible => '8.1.5');
7. DBMS_AQADM.CREATE_QUEUE(
8. Queue_name => 'gpsstatus_queue',
9. Queue_table => 'gpsstatus_queue_table');
10. DBMS_AQADM.START_QUEUE(
11. queue_name => 'gpsstatus_queue');
12. END;
13. /
14. BEGIN
15. DBMS_STREAMS_ADM.SET_UP_QUEUE(
16. queue_table => 'gps_temp_queue_table',
17. queue_name => 'gps_temp_queue');
18. END;
19. /
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE(
Queue_table => 'gpsstatus_queue_table',
Queue_payload_type => 'SYS.AQ$_JMS_MESSAGE',
multiple_consumers => false,
compatible => '8.1.5');
DBMS_AQADM.CREATE_QUEUE(
Queue_name => 'gpsstatus_queue',
Queue_table => 'gpsstatus_queue_table');
DBMS_AQADM.START_QUEUE(
queue_name => 'gpsstatus_queue');
END;
/
BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE(
queue_table => 'gps_temp_queue_table',
queue_name => 'gps_temp_queue');
END;
/
为目标表创建捕获进程
Sql代码 复制代码
1. BEGIN
2. DBMS_STREAMS_ADM.ADD_TABLE_RULES(
3. table_name => 'myoracle.TEST_GPS_STATUS',
4. streams_type => 'capture',
5. streams_name => 'capture_gps',
6. queue_name => 'gps_temp_queue',
7. include_dml => true,
8. include_ddl => false);
9. END;
10. /
Sql代码
1. BEGIN
2. DBMS_STREAMS_ADM.ADD_TABLE_RULES(
3. table_name => 'myoracle.TEST_GPS_STATUS',
4. streams_type => 'capture',
5. streams_name => 'capture_gps',
6. queue_name => 'gps_temp_queue',
7. include_dml => true,
8. include_ddl => false);
9. END;
10. /
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'myoracle.TEST_GPS_STATUS',
streams_type => 'capture',
streams_name => 'capture_gps',
queue_name => 'gps_temp_queue',
include_dml => true,
include_ddl => false);
END;
/
初始化scn
Sql代码 复制代码
1. DECLARE
2. iscn NUMBER; -- Variable to hold instantiation SCN value
3. BEGIN
4. iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
5. DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
6. source_object_name => 'myoracle.TEST_GPS_STATUS',
7. source_database_name => 'TESTdb',
8. instantiation_scn => iscn);
9. END;
10. /
11.
Sql代码
1. DECLARE
2. iscn NUMBER; -- Variable to hold instantiation SCN value
3. BEGIN
4. iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
5. DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
6. source_object_name => 'myoracle.TEST_GPS_STATUS',
7. source_database_name => 'TESTdb',
8. instantiation_scn => iscn);
9. END;
10. /
11.
DECLARE
iscn NUMBER; -- Variable to hold instantiation SCN value
BEGIN
iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
source_object_name => 'myoracle.TEST_GPS_STATUS',
source_database_name => 'TESTdb',
instantiation_scn => iscn);
END;
/
为消息队列创建代理
Sql代码 复制代码
1. BEGIN
2. DBMS_AQADM.CREATE_AQ_AGENT(
3. agent_name => 'gpsstatus_agent');
4. DBMS_AQADM.ENABLE_DB_ACCESS(
5. agent_name => 'gpsstatus_agent',
6. db_username => 'strmadmin');
7. END;
8. /
9. DECLARE
10. subscriber SYS.AQ$_AGENT;
11. BEGIN
12. subscriber := SYS.AQ$_AGENT('gpsstatus_agent', NULL, NULL);
13. SYS.DBMS_AQADM.ADD_SUBSCRIBER(
14. queue_name => 'strmadmin.gpsstatus_queue',
15. subscriber => subscriber,
16. rule => NULL,
17. transformation => NULL);
18. END;
19. /
Sql代码
1. BEGIN
2. DBMS_AQADM.CREATE_AQ_AGENT(
3. agent_name => 'gpsstatus_agent');
4. DBMS_AQADM.ENABLE_DB_ACCESS(
5. agent_name => 'gpsstatus_agent',
6. db_username => 'strmadmin');
7. END;
8. /
9. DECLARE
10. subscriber SYS.AQ$_AGENT;
11. BEGIN
12. subscriber := SYS.AQ$_AGENT('gpsstatus_agent', NULL, NULL);
13. SYS.DBMS_AQADM.ADD_SUBSCRIBER(
14. queue_name => 'strmadmin.gpsstatus_queue',
15. subscriber => subscriber,
16. rule => NULL,
17. transformation => NULL);
18. END;
19. /
BEGIN
DBMS_AQADM.CREATE_AQ_AGENT(
agent_name => 'gpsstatus_agent');
DBMS_AQADM.ENABLE_DB_ACCESS(
agent_name => 'gpsstatus_agent',
db_username => 'strmadmin');
END;
/
DECLARE
subscriber SYS.AQ$_AGENT;
BEGIN
subscriber := SYS.AQ$_AGENT('gpsstatus_agent', NULL, NULL);
SYS.DBMS_AQADM.ADD_SUBSCRIBER(
queue_name => 'strmadmin.gpsstatus_queue',
subscriber => subscriber,
rule => NULL,
transformation => NULL);
END;
/
创建存储过程以决定将哪些信息放到消息队列里面
Sql代码 复制代码
1. CREATE OR REPLACE PROCEDURE enq_gps_lcr(in_any IN SYS.ANYDATA) IS
2. --agent sys.aq$_agent := sys.aq$_agent('gpsstatus_agent', null, 0);
3. message sys.aq$_jms_message;
4. enqueue_options dbms_aq.enqueue_options_t;
5. message_properties dbms_aq.message_properties_t;
6. msgid raw(16);
7. lcr SYS.LCR$_ROW_RECORD;
8. rc PLS_INTEGER;
9. DEVICEID varchar2(11);
10. GATHERDATETIME date;
11. LONGITUDETYPE char(1);
12. LONGITUDEVALUE number ;
13. LATITUDETYPE char(1);
14. LATITUDEVALUE number ;
15. SPEED number ;
16. DIRECTION number ;
17. BEGIN
18. rc := in_any.GETOBJECT(lcr);
19. DEVICEID:=lcr.get_value('new','DEVICEID').ACCESSvarchar2();
20. GATHERDATETIME := lcr.GET_VALUE('new','GATHERDATETIME').ACCESSdate();
21. LONGITUDETYPE := lcr.GET_VALUE('new','LONGITUDETYPE').ACCESSchar();
22. LONGITUDEVALUE := lcr.GET_VALUE('new','LONGITUDEVALUE').ACCESSnumber();
23. LATITUDETYPE := lcr.GET_VALUE('new','LATITUDETYPE').ACCESSchar();
24. LATITUDEVALUE := lcr.GET_VALUE('new','LATITUDEVALUE').ACCESSnumber();
25. SPEED := lcr.GET_VALUE('new','SPEED').ACCESSnumber();
26. DIRECTION := lcr.GET_VALUE('new','DIRECTION').ACCESSnumber();
27. message := sys.aq$_jms_message.construct(1);
28. --message.set_replyto(agent);
29. message.set_type('');
30. message.set_userid('strmadmin');
31. message.set_appid('');
32. message.set_groupid('');
33. message.set_groupseq('');
34. message.set_string_property('DEVICEID', DEVICEID);
35. message.set_string_property('GATHERDATETIME', to_char(GATHERDATETIME,'yyyy-MM-dd hh24:mi:ss'));
36. message.set_string_property('LONGITUDETYPE', LONGITUDETYPE);
37. message.set_string_property('LONGITUDEVALUE', to_char(LONGITUDEVALUE) );
38. message.set_string_property('LATITUDETYPE', LATITUDETYPE);
39. message.set_string_property('LATITUDEVALUE', to_char(LATITUDEVALUE));
40. message.set_string_property('SPEED', to_char(SPEED) );
41. message.set_string_property('DIRECTION', to_char(DIRECTION) );
42. --指定消息生存时间
43. message_properties.expiration:=60;
44. dbms_aq.enqueue(queue_name => 'strmadmin.gpsstatus_queue',
45. enqueue_options => enqueue_options,
46. message_properties => message_properties,
47. payload => message,
48. msgid => msgid);
49. COMMIT;
50. END;
51. /
Sql代码
1. CREATE OR REPLACE PROCEDURE enq_gps_lcr(in_any IN SYS.ANYDATA) IS
2. --agent sys.aq$_agent := sys.aq$_agent('gpsstatus_agent', null, 0);
3. message sys.aq$_jms_message;
4. enqueue_options dbms_aq.enqueue_options_t;
5. message_properties dbms_aq.message_properties_t;
6. msgid raw(16);
7. lcr SYS.LCR$_ROW_RECORD;
8. rc PLS_INTEGER;
9. DEVICEID varchar2(11);
10. GATHERDATETIME date;
11. LONGITUDETYPE char(1);
12. LONGITUDEVALUE number ;
13. LATITUDETYPE char(1);
14. LATITUDEVALUE number ;
15. SPEED number ;
16. DIRECTION number ;
17. BEGIN
18. rc := in_any.GETOBJECT(lcr);
19. DEVICEID:=lcr.get_value('new','DEVICEID').ACCESSvarchar2();
20. GATHERDATETIME := lcr.GET_VALUE('new','GATHERDATETIME').ACCESSdate();
21. LONGITUDETYPE := lcr.GET_VALUE('new','LONGITUDETYPE').ACCESSchar();
22. LONGITUDEVALUE := lcr.GET_VALUE('new','LONGITUDEVALUE').ACCESSnumber();
23. LATITUDETYPE := lcr.GET_VALUE('new','LATITUDETYPE').ACCESSchar();
24. LATITUDEVALUE := lcr.GET_VALUE('new','LATITUDEVALUE').ACCESSnumber();
25. SPEED := lcr.GET_VALUE('new','SPEED').ACCESSnumber();
26. DIRECTION := lcr.GET_VALUE('new','DIRECTION').ACCESSnumber();
27. message := sys.aq$_jms_message.construct(1);
28. --message.set_replyto(agent);
29. message.set_type('');
30. message.set_userid('strmadmin');
31. message.set_appid('');
32. message.set_groupid('');
33. message.set_groupseq('');
34. message.set_string_property('DEVICEID', DEVICEID);
35. message.set_string_property('GATHERDATETIME', to_char(GATHERDATETIME,'yyyy-MM-dd hh24:mi:ss'));
36. message.set_string_property('LONGITUDETYPE', LONGITUDETYPE);
37. message.set_string_property('LONGITUDEVALUE', to_char(LONGITUDEVALUE) );
38. message.set_string_property('LATITUDETYPE', LATITUDETYPE);
39. message.set_string_property('LATITUDEVALUE', to_char(LATITUDEVALUE));
40. message.set_string_property('SPEED', to_char(SPEED) );
41. message.set_string_property('DIRECTION', to_char(DIRECTION) );
42. --指定消息生存时间
43. message_properties.expiration:=60;
44. dbms_aq.enqueue(queue_name => 'strmadmin.gpsstatus_queue',
45. enqueue_options => enqueue_options,
46. message_properties => message_properties,
47. payload => message,
48. msgid => msgid);
49. COMMIT;
50. END;
51. /
CREATE OR REPLACE PROCEDURE enq_gps_lcr(in_any IN SYS.ANYDATA) IS
--agent sys.aq$_agent := sys.aq$_agent('gpsstatus_agent', null, 0);
message sys.aq$_jms_message;
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
msgid raw(16);
lcr SYS.LCR$_ROW_RECORD;
rc PLS_INTEGER;
DEVICEID varchar2(11);
GATHERDATETIME date;
LONGITUDETYPE char(1);
LONGITUDEVALUE number ;
LATITUDETYPE char(1);
LATITUDEVALUE number ;
SPEED number ;
DIRECTION number ;
BEGIN
rc := in_any.GETOBJECT(lcr);
DEVICEID:=lcr.get_value('new','DEVICEID').ACCESSvarchar2();
GATHERDATETIME := lcr.GET_VALUE('new','GATHERDATETIME').ACCESSdate();
LONGITUDETYPE := lcr.GET_VALUE('new','LONGITUDETYPE').ACCESSchar();
LONGITUDEVALUE := lcr.GET_VALUE('new','LONGITUDEVALUE').ACCESSnumber();
LATITUDETYPE := lcr.GET_VALUE('new','LATITUDETYPE').ACCESSchar();
LATITUDEVALUE := lcr.GET_VALUE('new','LATITUDEVALUE').ACCESSnumber();
SPEED := lcr.GET_VALUE('new','SPEED').ACCESSnumber();
DIRECTION := lcr.GET_VALUE('new','DIRECTION').ACCESSnumber();
message := sys.aq$_jms_message.construct(1);
--message.set_replyto(agent);
message.set_type('');
message.set_userid('strmadmin');
message.set_appid('');
message.set_groupid('');
message.set_groupseq('');
message.set_string_property('DEVICEID', DEVICEID);
message.set_string_property('GATHERDATETIME', to_char(GATHERDATETIME,'yyyy-MM-dd hh24:mi:ss'));
message.set_string_property('LONGITUDETYPE', LONGITUDETYPE);
message.set_string_property('LONGITUDEVALUE', to_char(LONGITUDEVALUE) );
message.set_string_property('LATITUDETYPE', LATITUDETYPE);
message.set_string_property('LATITUDEVALUE', to_char(LATITUDEVALUE));
message.set_string_property('SPEED', to_char(SPEED) );
message.set_string_property('DIRECTION', to_char(DIRECTION) );
--指定消息生存时间
message_properties.expiration:=60;
dbms_aq.enqueue(queue_name => 'strmadmin.gpsstatus_queue',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => msgid);
COMMIT;
END;
/
为目标表配置处理器
Sql代码 复制代码
1.
2. BEGIN
3. DBMS_APPLY_ADM.SET_DML_HANDLER(
4. object_name => 'myoracle.TEST_GPS_STATUS',
5. object_type => 'TABLE',
6. operation_name => 'UPDATE', --可配置为 insert,update,delete等
7. error_handler => false,
8. user_procedure => 'strmadmin.enq_gps_lcr',
9. apply_database_link => NULL);
10. END;
11. /
Sql代码
1. BEGIN
2. DBMS_APPLY_ADM.SET_DML_HANDLER(
3. object_name => 'myoracle.TEST_GPS_STATUS',
4. object_type => 'TABLE',
5. operation_name => 'UPDATE', --可配置为insert,update,delete等
6. error_handler => false,
7. user_procedure => 'strmadmin.enq_gps_lcr',
8. apply_database_link => NULL);
9. END;
10. /
BEGIN
DBMS_APPLY_ADM.SET_DML_HANDLER(
object_name => 'myoracle.TEST_GPS_STATUS',
object_type => 'TABLE',
operation_name => 'UPDATE', --可配置为insert,update,delete等
error_handler => false,
user_procedure => 'strmadmin.enq_gps_lcr',
apply_database_link => NULL);
END;
/
设定参数及启动捕获进程
Sql代码 复制代码
1. BEGIN
2. DBMS_STREAMS_ADM.ADD_TABLE_RULES(
3. table_name => 'myoracle.TEST_GPS_STATUS',
4. streams_type => 'apply',
5. streams_name => 'apply_gps',
6. queue_name => 'strmadmin.gps_temp_queue',
7. include_dml => true,
8. include_ddl => false,
9. source_database => 'TESTdb');
10. END;
11. /
12. BEGIN
13. DBMS_APPLY_ADM.SET_PARAMETER(
14. apply_name => 'apply_gps',
15. parameter => 'disable_on_error',
16. value => 'n');
17. END;
18. /
19. BEGIN
20. DBMS_APPLY_ADM.START_APPLY(
21. apply_name => 'apply_gps');
22. END;
23. /
24. BEGIN
25. DBMS_CAPTURE_ADM.START_CAPTURE(
26. capture_name => 'capture_gps');
27. END;
28. /
Sql代码
1. BEGIN
2. DBMS_STREAMS_ADM.ADD_TABLE_RULES(
3. table_name => 'myoracle.TEST_GPS_STATUS',
4. streams_type => 'apply',
5. streams_name => 'apply_gps',
6. queue_name => 'strmadmin.gps_temp_queue',
7. include_dml => true,
8. include_ddl => false,
9. source_database => 'TESTdb');
10. END;
11. /
12. BEGIN
13. DBMS_APPLY_ADM.SET_PARAMETER(
14. apply_name => 'apply_gps',
15. parameter => 'disable_on_error',
16. value => 'n');
17. END;
18. /
19. BEGIN
20. DBMS_APPLY_ADM.START_APPLY(
21. apply_name => 'apply_gps');
22. END;
23. /
24. BEGIN
25. DBMS_CAPTURE_ADM.START_CAPTURE(
26. capture_name => 'capture_gps');
27. END;
28. /
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'myoracle.TEST_GPS_STATUS',
streams_type => 'apply',
streams_name => 'apply_gps',
queue_name => 'strmadmin.gps_temp_queue',
include_dml => true,
include_ddl => false,
source_database => 'TESTdb');
END;
/
BEGIN
DBMS_APPLY_ADM.SET_PARAMETER(
apply_name => 'apply_gps',
parameter => 'disable_on_error',
value => 'n');
END;
/
BEGIN
DBMS_APPLY_ADM.START_APPLY(
apply_name => 'apply_gps');
END;
/
BEGIN
DBMS_CAPTURE_ADM.START_CAPTURE(
capture_name => 'capture_gps');
END;
/
至此,捕获进程配置完毕
可update一条myoracle.TEST_GPS_STATUS 中的记录,再查询gpsstatus_queue_table中是否有对应的一条记录。如果有,则配置成功。
下面是java处理代码,可直接使用JMS接口
本例使用oracle提供的API
Java代码 复制代码
1.
2. QueueConnectionFactory queueConnectionFactory = null;
3. QueueConnection queueConnection = null;
4. QueueSession queueSession = null;
5.
6. Queue queue = null;
7. QueueReceiver subscriber = null;
8. Message message = null;
Java代码
1. QueueConnectionFactory queueConnectionFactory = null;
2. QueueConnection queueConnection = null;
3. QueueSession queueSession = null;
4.
5. Queue queue = null;
6. QueueReceiver subscriber = null;
7. Message message = null;
QueueConnectionFactory queueConnectionFactory = null;
QueueConnection queueConnection = null;
QueueSession queueSession = null;
Queue queue = null;
QueueReceiver subscriber = null;
Message message = null;
Java代码 复制代码
1.
2. log.info("开始连接 ");
3. queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(ip,sid, port, "thin");
4. queueConnection = queueConnectionFactory.createQueueConnection(userName, password);
5. log.info("创建Queue Connection 成功");
6. queueConnection.start();
7. log.info("connection started");
8. queueSession = queueConnection.createQueueSession(false,
9. Session.AUTO_ACKNOWLEDGE);
10. .info("Queue session created");
11. queue = ((AQjmsSession) queueSession).getQueue(userName, queueName);
12. log.info("Queue getted");
13. subscriber = queueSession.createReceiver(queue);
14. log.info("初始化完成");
Java代码
1. log.info("开始连接 ");
2. queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(ip,sid, port, "thin");
3. queueConnection = queueConnectionFactory.createQueueConnection(userName, password);
4. log.info("创建Queue Connection 成功");
5. queueConnection.start();
6. log.info("connection started");
7. queueSession = queueConnection.createQueueSession(false,
8. Session.AUTO_ACKNOWLEDGE);
9. .info("Queue session created");
10. queue = ((AQjmsSession) queueSession).getQueue(userName, queueName);
11. log.info("Queue getted");
12. subscriber = queueSession.createReceiver(queue);
13. log.info("初始化完成");
log.info("开始连接 ");
queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(ip,sid, port, "thin");
queueConnection = queueConnectionFactory.createQueueConnection(userName, password);
log.info("创建Queue Connection 成功");
queueConnection.start();
log.info("connection started");
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
.info("Queue session created");
queue = ((AQjmsSession) queueSession).getQueue(userName, queueName);
log.info("Queue getted");
subscriber = queueSession.createReceiver(queue);
log.info("初始化完成");
开始取消息,本例采取while(true)的方式获取消息。当没有消息的时候,线程会一直阻塞,直到有新的消息到来,立即取出。
Java代码 复制代码
1. while (true) {
2. message = subscriber.receive();//receive方法使没有新消息时,线程挂起
3. //do something...
4. }
5.
Java代码
1. while (true) {
2. message = subscriber.receive();//receive方法使没有新消息时,线程挂起
3. //do something...
4. }
5.
while (true) {
message = subscriber.receive();//receive方法使没有新消息时,线程挂起
//do something...
}
背景:
要做一个车辆GPS监控系统,主要分两块:
1.采集。由GPS厂商提供实时数据,通过UDP包接收
2.展示。前端程序获取到最新GPS数据后,在地图上模拟车辆的运行情况
备选方案:
1.采集程序接收到UDP包并解析后,将数据放入数据库;前端程序轮询数据库以获取最新数据。
2.采集程序接收到UDP包并解析后,将数据放入数据库,同时向前端程序发送一条消息,传递最新数据。
方案评估:
方案1:最简单,最传统;增加不必要的数据库查询,并且非实时,轮询时间间隔不好确定。
方案2:可实现“实时”,但增加采集程序职责,采集程序本不知道前端系统的存在。
最终方案:
最后采取了另一种方案:通过oracle捕获进程捕获数据库变更(采集程序insert或update一条记录时,捕获进程即时获取到该条记录),将变更记录发送到AQ(oracle高级队列,JMS的oracle实现),前端程序只关注AQ,当有新消息到来时,即刻可收到并做相应处理,反映出实时状态。
关于捕获进程,请参考《Streams概述》,《Streams捕获进程》
实现:
SQL代码
创建表空间和用户:
Sql代码 复制代码
1. Create tablespace streams_tbs datafile 'E:\DBSERVER\ORACLE9I\ORADATA\TESTDB\stream_tbs.dbf' size 25M Reuse autoextend on maxsize unlimited;
2.
3. --修改目标表(要捕获变更的表)追加日志
4. ALTER TABLE myoracle.TEST_GPS_STATUS ADD SUPPLEMENTAL LOG GROUP log_group_gpsstatus_pk (DEVICEID) ALWAYS;
5.
6.
7. create user strmadmin identified by strmadmin default tablespace streams_tbs quota unlimited on streams_tbs;
8.
9.
10. grant connect, resource, select_catalog_role to strmadmin;
Sql代码
1. Create tablespace streams_tbs datafile 'E:\DBSERVER\ORACLE9I\ORADATA\TESTDB\stream_tbs.dbf' size 25M Reuse autoextend on maxsize unlimited;
2.
3. --修改目标表(要捕获变更的表)追加日志
4. ALTER TABLE myoracle.TEST_GPS_STATUS ADD SUPPLEMENTAL LOG GROUP log_group_gpsstatus_pk (DEVICEID) ALWAYS;
5.
6.
7. create user strmadmin identified by strmadmin default tablespace streams_tbs quota unlimited on streams_tbs;
8.
9.
10. grant connect, resource, select_catalog_role to strmadmin;
Create tablespace streams_tbs datafile 'E:\DBSERVER\ORACLE9I\ORADATA\TESTDB\stream_tbs.dbf' size 25M Reuse autoextend on maxsize unlimited;
--修改目标表(要捕获变更的表)追加日志
ALTER TABLE myoracle.TEST_GPS_STATUS ADD SUPPLEMENTAL LOG GROUP log_group_gpsstatus_pk (DEVICEID) ALWAYS;
create user strmadmin identified by strmadmin default tablespace streams_tbs quota unlimited on streams_tbs;
grant connect, resource, select_catalog_role to strmadmin;
授予相应权限
Sql代码 复制代码
1. grant execute on dbms_aqadm to strmadmin;
2.
3. grant execute on dbms_capture_adm to strmadmin;
4.
5. grant execute on dbms_propagation_adm to strmadmin;
6.
7. grant execute on dbms_streams_adm to strmadmin;
8.
9. grant execute on dbms_apply_adm to strmadmin;
10.
11. grant execute on dbms_flashback to strmadmin;
12.
13. grant execute on dbms_aq to strmadmin;
14.
15. grant execute on dbms_aqjms to strmadmin;
16.
17. grant execute on dbms_aqin to strmadmin;
18.
19. grant execute on dbms_aqjms_internal to strmadmin;
20.
Sql代码
1. grant execute on dbms_aqadm to strmadmin;
2.
3. grant execute on dbms_capture_adm to strmadmin;
4.
5. grant execute on dbms_propagation_adm to strmadmin;
6.
7. grant execute on dbms_streams_adm to strmadmin;
8.
9. grant execute on dbms_apply_adm to strmadmin;
10.
11. grant execute on dbms_flashback to strmadmin;
12.
13. grant execute on dbms_aq to strmadmin;
14.
15. grant execute on dbms_aqjms to strmadmin;
16.
17. grant execute on dbms_aqin to strmadmin;
18.
19. grant execute on dbms_aqjms_internal to strmadmin;
20.
grant execute on dbms_aqadm to strmadmin;
grant execute on dbms_capture_adm to strmadmin;
grant execute on dbms_propagation_adm to strmadmin;
grant execute on dbms_streams_adm to strmadmin;
grant execute on dbms_apply_adm to strmadmin;
grant execute on dbms_flashback to strmadmin;
grant execute on dbms_aq to strmadmin;
grant execute on dbms_aqjms to strmadmin;
grant execute on dbms_aqin to strmadmin;
grant execute on dbms_aqjms_internal to strmadmin;
执行系统存储过程分配权限
Sql代码 复制代码
1. BEGIN
2. DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
3. privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
4. grantee => 'strmadmin',
5. grant_option => FALSE);
6. END;
7. /
8.
9.
10.
11. BEGIN
12. DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
13. privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
14. grantee => 'strmadmin',
15. grant_option => FALSE);
16. END;
17. /
18.
Sql代码
1. BEGIN
2. DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
3. privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
4. grantee => 'strmadmin',
5. grant_option => FALSE);
6. END;
7. /
8.
9.
10.
11. BEGIN
12. DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
13. privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
14. grantee => 'strmadmin',
15. grant_option => FALSE);
16. END;
17. /
18.
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
grantee => 'strmadmin',
grant_option => FALSE);
END;
/
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
grantee => 'strmadmin',
grant_option => FALSE);
END;
/
以strmadmin帐户登录oracle
创建AQ,类型为JMS消息
Sql代码 复制代码
1. BEGIN
2. DBMS_AQADM.CREATE_QUEUE_TABLE(
3. Queue_table => 'gpsstatus_queue_table',
4. Queue_payload_type => 'SYS.AQ$_JMS_MESSAGE',
5. multiple_consumers => false,
6. compatible => '8.1.5');
7. DBMS_AQADM.CREATE_QUEUE(
8. Queue_name => 'gpsstatus_queue',
9. Queue_table => 'gpsstatus_queue_table');
10. DBMS_AQADM.START_QUEUE(
11. queue_name => 'gpsstatus_queue');
12. END;
13. /
14. BEGIN
15. DBMS_STREAMS_ADM.SET_UP_QUEUE(
16. queue_table => 'gps_temp_queue_table',
17. queue_name => 'gps_temp_queue');
18. END;
19. /
Sql代码
1. BEGIN
2. DBMS_AQADM.CREATE_QUEUE_TABLE(
3. Queue_table => 'gpsstatus_queue_table',
4. Queue_payload_type => 'SYS.AQ$_JMS_MESSAGE',
5. multiple_consumers => false,
6. compatible => '8.1.5');
7. DBMS_AQADM.CREATE_QUEUE(
8. Queue_name => 'gpsstatus_queue',
9. Queue_table => 'gpsstatus_queue_table');
10. DBMS_AQADM.START_QUEUE(
11. queue_name => 'gpsstatus_queue');
12. END;
13. /
14. BEGIN
15. DBMS_STREAMS_ADM.SET_UP_QUEUE(
16. queue_table => 'gps_temp_queue_table',
17. queue_name => 'gps_temp_queue');
18. END;
19. /
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE(
Queue_table => 'gpsstatus_queue_table',
Queue_payload_type => 'SYS.AQ$_JMS_MESSAGE',
multiple_consumers => false,
compatible => '8.1.5');
DBMS_AQADM.CREATE_QUEUE(
Queue_name => 'gpsstatus_queue',
Queue_table => 'gpsstatus_queue_table');
DBMS_AQADM.START_QUEUE(
queue_name => 'gpsstatus_queue');
END;
/
BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE(
queue_table => 'gps_temp_queue_table',
queue_name => 'gps_temp_queue');
END;
/
为目标表创建捕获进程
Sql代码 复制代码
1. BEGIN
2. DBMS_STREAMS_ADM.ADD_TABLE_RULES(
3. table_name => 'myoracle.TEST_GPS_STATUS',
4. streams_type => 'capture',
5. streams_name => 'capture_gps',
6. queue_name => 'gps_temp_queue',
7. include_dml => true,
8. include_ddl => false);
9. END;
10. /
Sql代码
1. BEGIN
2. DBMS_STREAMS_ADM.ADD_TABLE_RULES(
3. table_name => 'myoracle.TEST_GPS_STATUS',
4. streams_type => 'capture',
5. streams_name => 'capture_gps',
6. queue_name => 'gps_temp_queue',
7. include_dml => true,
8. include_ddl => false);
9. END;
10. /
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'myoracle.TEST_GPS_STATUS',
streams_type => 'capture',
streams_name => 'capture_gps',
queue_name => 'gps_temp_queue',
include_dml => true,
include_ddl => false);
END;
/
初始化scn
Sql代码 复制代码
1. DECLARE
2. iscn NUMBER; -- Variable to hold instantiation SCN value
3. BEGIN
4. iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
5. DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
6. source_object_name => 'myoracle.TEST_GPS_STATUS',
7. source_database_name => 'TESTdb',
8. instantiation_scn => iscn);
9. END;
10. /
11.
Sql代码
1. DECLARE
2. iscn NUMBER; -- Variable to hold instantiation SCN value
3. BEGIN
4. iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
5. DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
6. source_object_name => 'myoracle.TEST_GPS_STATUS',
7. source_database_name => 'TESTdb',
8. instantiation_scn => iscn);
9. END;
10. /
11.
DECLARE
iscn NUMBER; -- Variable to hold instantiation SCN value
BEGIN
iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
source_object_name => 'myoracle.TEST_GPS_STATUS',
source_database_name => 'TESTdb',
instantiation_scn => iscn);
END;
/
为消息队列创建代理
Sql代码 复制代码
1. BEGIN
2. DBMS_AQADM.CREATE_AQ_AGENT(
3. agent_name => 'gpsstatus_agent');
4. DBMS_AQADM.ENABLE_DB_ACCESS(
5. agent_name => 'gpsstatus_agent',
6. db_username => 'strmadmin');
7. END;
8. /
9. DECLARE
10. subscriber SYS.AQ$_AGENT;
11. BEGIN
12. subscriber := SYS.AQ$_AGENT('gpsstatus_agent', NULL, NULL);
13. SYS.DBMS_AQADM.ADD_SUBSCRIBER(
14. queue_name => 'strmadmin.gpsstatus_queue',
15. subscriber => subscriber,
16. rule => NULL,
17. transformation => NULL);
18. END;
19. /
Sql代码
1. BEGIN
2. DBMS_AQADM.CREATE_AQ_AGENT(
3. agent_name => 'gpsstatus_agent');
4. DBMS_AQADM.ENABLE_DB_ACCESS(
5. agent_name => 'gpsstatus_agent',
6. db_username => 'strmadmin');
7. END;
8. /
9. DECLARE
10. subscriber SYS.AQ$_AGENT;
11. BEGIN
12. subscriber := SYS.AQ$_AGENT('gpsstatus_agent', NULL, NULL);
13. SYS.DBMS_AQADM.ADD_SUBSCRIBER(
14. queue_name => 'strmadmin.gpsstatus_queue',
15. subscriber => subscriber,
16. rule => NULL,
17. transformation => NULL);
18. END;
19. /
BEGIN
DBMS_AQADM.CREATE_AQ_AGENT(
agent_name => 'gpsstatus_agent');
DBMS_AQADM.ENABLE_DB_ACCESS(
agent_name => 'gpsstatus_agent',
db_username => 'strmadmin');
END;
/
DECLARE
subscriber SYS.AQ$_AGENT;
BEGIN
subscriber := SYS.AQ$_AGENT('gpsstatus_agent', NULL, NULL);
SYS.DBMS_AQADM.ADD_SUBSCRIBER(
queue_name => 'strmadmin.gpsstatus_queue',
subscriber => subscriber,
rule => NULL,
transformation => NULL);
END;
/
创建存储过程以决定将哪些信息放到消息队列里面
Sql代码 复制代码
1. CREATE OR REPLACE PROCEDURE enq_gps_lcr(in_any IN SYS.ANYDATA) IS
2. --agent sys.aq$_agent := sys.aq$_agent('gpsstatus_agent', null, 0);
3. message sys.aq$_jms_message;
4. enqueue_options dbms_aq.enqueue_options_t;
5. message_properties dbms_aq.message_properties_t;
6. msgid raw(16);
7. lcr SYS.LCR$_ROW_RECORD;
8. rc PLS_INTEGER;
9. DEVICEID varchar2(11);
10. GATHERDATETIME date;
11. LONGITUDETYPE char(1);
12. LONGITUDEVALUE number ;
13. LATITUDETYPE char(1);
14. LATITUDEVALUE number ;
15. SPEED number ;
16. DIRECTION number ;
17. BEGIN
18. rc := in_any.GETOBJECT(lcr);
19. DEVICEID:=lcr.get_value('new','DEVICEID').ACCESSvarchar2();
20. GATHERDATETIME := lcr.GET_VALUE('new','GATHERDATETIME').ACCESSdate();
21. LONGITUDETYPE := lcr.GET_VALUE('new','LONGITUDETYPE').ACCESSchar();
22. LONGITUDEVALUE := lcr.GET_VALUE('new','LONGITUDEVALUE').ACCESSnumber();
23. LATITUDETYPE := lcr.GET_VALUE('new','LATITUDETYPE').ACCESSchar();
24. LATITUDEVALUE := lcr.GET_VALUE('new','LATITUDEVALUE').ACCESSnumber();
25. SPEED := lcr.GET_VALUE('new','SPEED').ACCESSnumber();
26. DIRECTION := lcr.GET_VALUE('new','DIRECTION').ACCESSnumber();
27. message := sys.aq$_jms_message.construct(1);
28. --message.set_replyto(agent);
29. message.set_type('');
30. message.set_userid('strmadmin');
31. message.set_appid('');
32. message.set_groupid('');
33. message.set_groupseq('');
34. message.set_string_property('DEVICEID', DEVICEID);
35. message.set_string_property('GATHERDATETIME', to_char(GATHERDATETIME,'yyyy-MM-dd hh24:mi:ss'));
36. message.set_string_property('LONGITUDETYPE', LONGITUDETYPE);
37. message.set_string_property('LONGITUDEVALUE', to_char(LONGITUDEVALUE) );
38. message.set_string_property('LATITUDETYPE', LATITUDETYPE);
39. message.set_string_property('LATITUDEVALUE', to_char(LATITUDEVALUE));
40. message.set_string_property('SPEED', to_char(SPEED) );
41. message.set_string_property('DIRECTION', to_char(DIRECTION) );
42. --指定消息生存时间
43. message_properties.expiration:=60;
44. dbms_aq.enqueue(queue_name => 'strmadmin.gpsstatus_queue',
45. enqueue_options => enqueue_options,
46. message_properties => message_properties,
47. payload => message,
48. msgid => msgid);
49. COMMIT;
50. END;
51. /
Sql代码
1. CREATE OR REPLACE PROCEDURE enq_gps_lcr(in_any IN SYS.ANYDATA) IS
2. --agent sys.aq$_agent := sys.aq$_agent('gpsstatus_agent', null, 0);
3. message sys.aq$_jms_message;
4. enqueue_options dbms_aq.enqueue_options_t;
5. message_properties dbms_aq.message_properties_t;
6. msgid raw(16);
7. lcr SYS.LCR$_ROW_RECORD;
8. rc PLS_INTEGER;
9. DEVICEID varchar2(11);
10. GATHERDATETIME date;
11. LONGITUDETYPE char(1);
12. LONGITUDEVALUE number ;
13. LATITUDETYPE char(1);
14. LATITUDEVALUE number ;
15. SPEED number ;
16. DIRECTION number ;
17. BEGIN
18. rc := in_any.GETOBJECT(lcr);
19. DEVICEID:=lcr.get_value('new','DEVICEID').ACCESSvarchar2();
20. GATHERDATETIME := lcr.GET_VALUE('new','GATHERDATETIME').ACCESSdate();
21. LONGITUDETYPE := lcr.GET_VALUE('new','LONGITUDETYPE').ACCESSchar();
22. LONGITUDEVALUE := lcr.GET_VALUE('new','LONGITUDEVALUE').ACCESSnumber();
23. LATITUDETYPE := lcr.GET_VALUE('new','LATITUDETYPE').ACCESSchar();
24. LATITUDEVALUE := lcr.GET_VALUE('new','LATITUDEVALUE').ACCESSnumber();
25. SPEED := lcr.GET_VALUE('new','SPEED').ACCESSnumber();
26. DIRECTION := lcr.GET_VALUE('new','DIRECTION').ACCESSnumber();
27. message := sys.aq$_jms_message.construct(1);
28. --message.set_replyto(agent);
29. message.set_type('');
30. message.set_userid('strmadmin');
31. message.set_appid('');
32. message.set_groupid('');
33. message.set_groupseq('');
34. message.set_string_property('DEVICEID', DEVICEID);
35. message.set_string_property('GATHERDATETIME', to_char(GATHERDATETIME,'yyyy-MM-dd hh24:mi:ss'));
36. message.set_string_property('LONGITUDETYPE', LONGITUDETYPE);
37. message.set_string_property('LONGITUDEVALUE', to_char(LONGITUDEVALUE) );
38. message.set_string_property('LATITUDETYPE', LATITUDETYPE);
39. message.set_string_property('LATITUDEVALUE', to_char(LATITUDEVALUE));
40. message.set_string_property('SPEED', to_char(SPEED) );
41. message.set_string_property('DIRECTION', to_char(DIRECTION) );
42. --指定消息生存时间
43. message_properties.expiration:=60;
44. dbms_aq.enqueue(queue_name => 'strmadmin.gpsstatus_queue',
45. enqueue_options => enqueue_options,
46. message_properties => message_properties,
47. payload => message,
48. msgid => msgid);
49. COMMIT;
50. END;
51. /
CREATE OR REPLACE PROCEDURE enq_gps_lcr(in_any IN SYS.ANYDATA) IS
--agent sys.aq$_agent := sys.aq$_agent('gpsstatus_agent', null, 0);
message sys.aq$_jms_message;
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
msgid raw(16);
lcr SYS.LCR$_ROW_RECORD;
rc PLS_INTEGER;
DEVICEID varchar2(11);
GATHERDATETIME date;
LONGITUDETYPE char(1);
LONGITUDEVALUE number ;
LATITUDETYPE char(1);
LATITUDEVALUE number ;
SPEED number ;
DIRECTION number ;
BEGIN
rc := in_any.GETOBJECT(lcr);
DEVICEID:=lcr.get_value('new','DEVICEID').ACCESSvarchar2();
GATHERDATETIME := lcr.GET_VALUE('new','GATHERDATETIME').ACCESSdate();
LONGITUDETYPE := lcr.GET_VALUE('new','LONGITUDETYPE').ACCESSchar();
LONGITUDEVALUE := lcr.GET_VALUE('new','LONGITUDEVALUE').ACCESSnumber();
LATITUDETYPE := lcr.GET_VALUE('new','LATITUDETYPE').ACCESSchar();
LATITUDEVALUE := lcr.GET_VALUE('new','LATITUDEVALUE').ACCESSnumber();
SPEED := lcr.GET_VALUE('new','SPEED').ACCESSnumber();
DIRECTION := lcr.GET_VALUE('new','DIRECTION').ACCESSnumber();
message := sys.aq$_jms_message.construct(1);
--message.set_replyto(agent);
message.set_type('');
message.set_userid('strmadmin');
message.set_appid('');
message.set_groupid('');
message.set_groupseq('');
message.set_string_property('DEVICEID', DEVICEID);
message.set_string_property('GATHERDATETIME', to_char(GATHERDATETIME,'yyyy-MM-dd hh24:mi:ss'));
message.set_string_property('LONGITUDETYPE', LONGITUDETYPE);
message.set_string_property('LONGITUDEVALUE', to_char(LONGITUDEVALUE) );
message.set_string_property('LATITUDETYPE', LATITUDETYPE);
message.set_string_property('LATITUDEVALUE', to_char(LATITUDEVALUE));
message.set_string_property('SPEED', to_char(SPEED) );
message.set_string_property('DIRECTION', to_char(DIRECTION) );
--指定消息生存时间
message_properties.expiration:=60;
dbms_aq.enqueue(queue_name => 'strmadmin.gpsstatus_queue',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => msgid);
COMMIT;
END;
/
为目标表配置处理器
Sql代码 复制代码
1.
2. BEGIN
3. DBMS_APPLY_ADM.SET_DML_HANDLER(
4. object_name => 'myoracle.TEST_GPS_STATUS',
5. object_type => 'TABLE',
6. operation_name => 'UPDATE', --可配置为 insert,update,delete等
7. error_handler => false,
8. user_procedure => 'strmadmin.enq_gps_lcr',
9. apply_database_link => NULL);
10. END;
11. /
Sql代码
1. BEGIN
2. DBMS_APPLY_ADM.SET_DML_HANDLER(
3. object_name => 'myoracle.TEST_GPS_STATUS',
4. object_type => 'TABLE',
5. operation_name => 'UPDATE', --可配置为insert,update,delete等
6. error_handler => false,
7. user_procedure => 'strmadmin.enq_gps_lcr',
8. apply_database_link => NULL);
9. END;
10. /
BEGIN
DBMS_APPLY_ADM.SET_DML_HANDLER(
object_name => 'myoracle.TEST_GPS_STATUS',
object_type => 'TABLE',
operation_name => 'UPDATE', --可配置为insert,update,delete等
error_handler => false,
user_procedure => 'strmadmin.enq_gps_lcr',
apply_database_link => NULL);
END;
/
设定参数及启动捕获进程
Sql代码 复制代码
1. BEGIN
2. DBMS_STREAMS_ADM.ADD_TABLE_RULES(
3. table_name => 'myoracle.TEST_GPS_STATUS',
4. streams_type => 'apply',
5. streams_name => 'apply_gps',
6. queue_name => 'strmadmin.gps_temp_queue',
7. include_dml => true,
8. include_ddl => false,
9. source_database => 'TESTdb');
10. END;
11. /
12. BEGIN
13. DBMS_APPLY_ADM.SET_PARAMETER(
14. apply_name => 'apply_gps',
15. parameter => 'disable_on_error',
16. value => 'n');
17. END;
18. /
19. BEGIN
20. DBMS_APPLY_ADM.START_APPLY(
21. apply_name => 'apply_gps');
22. END;
23. /
24. BEGIN
25. DBMS_CAPTURE_ADM.START_CAPTURE(
26. capture_name => 'capture_gps');
27. END;
28. /
Sql代码
1. BEGIN
2. DBMS_STREAMS_ADM.ADD_TABLE_RULES(
3. table_name => 'myoracle.TEST_GPS_STATUS',
4. streams_type => 'apply',
5. streams_name => 'apply_gps',
6. queue_name => 'strmadmin.gps_temp_queue',
7. include_dml => true,
8. include_ddl => false,
9. source_database => 'TESTdb');
10. END;
11. /
12. BEGIN
13. DBMS_APPLY_ADM.SET_PARAMETER(
14. apply_name => 'apply_gps',
15. parameter => 'disable_on_error',
16. value => 'n');
17. END;
18. /
19. BEGIN
20. DBMS_APPLY_ADM.START_APPLY(
21. apply_name => 'apply_gps');
22. END;
23. /
24. BEGIN
25. DBMS_CAPTURE_ADM.START_CAPTURE(
26. capture_name => 'capture_gps');
27. END;
28. /
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'myoracle.TEST_GPS_STATUS',
streams_type => 'apply',
streams_name => 'apply_gps',
queue_name => 'strmadmin.gps_temp_queue',
include_dml => true,
include_ddl => false,
source_database => 'TESTdb');
END;
/
BEGIN
DBMS_APPLY_ADM.SET_PARAMETER(
apply_name => 'apply_gps',
parameter => 'disable_on_error',
value => 'n');
END;
/
BEGIN
DBMS_APPLY_ADM.START_APPLY(
apply_name => 'apply_gps');
END;
/
BEGIN
DBMS_CAPTURE_ADM.START_CAPTURE(
capture_name => 'capture_gps');
END;
/
至此,捕获进程配置完毕
可update一条myoracle.TEST_GPS_STATUS 中的记录,再查询gpsstatus_queue_table中是否有对应的一条记录。如果有,则配置成功。
下面是java处理代码,可直接使用JMS接口
本例使用oracle提供的API
Java代码 复制代码
1.
2. QueueConnectionFactory queueConnectionFactory = null;
3. QueueConnection queueConnection = null;
4. QueueSession queueSession = null;
5.
6. Queue queue = null;
7. QueueReceiver subscriber = null;
8. Message message = null;
Java代码
1. QueueConnectionFactory queueConnectionFactory = null;
2. QueueConnection queueConnection = null;
3. QueueSession queueSession = null;
4.
5. Queue queue = null;
6. QueueReceiver subscriber = null;
7. Message message = null;
QueueConnectionFactory queueConnectionFactory = null;
QueueConnection queueConnection = null;
QueueSession queueSession = null;
Queue queue = null;
QueueReceiver subscriber = null;
Message message = null;
Java代码 复制代码
1.
2. log.info("开始连接 ");
3. queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(ip,sid, port, "thin");
4. queueConnection = queueConnectionFactory.createQueueConnection(userName, password);
5. log.info("创建Queue Connection 成功");
6. queueConnection.start();
7. log.info("connection started");
8. queueSession = queueConnection.createQueueSession(false,
9. Session.AUTO_ACKNOWLEDGE);
10. .info("Queue session created");
11. queue = ((AQjmsSession) queueSession).getQueue(userName, queueName);
12. log.info("Queue getted");
13. subscriber = queueSession.createReceiver(queue);
14. log.info("初始化完成");
Java代码
1. log.info("开始连接 ");
2. queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(ip,sid, port, "thin");
3. queueConnection = queueConnectionFactory.createQueueConnection(userName, password);
4. log.info("创建Queue Connection 成功");
5. queueConnection.start();
6. log.info("connection started");
7. queueSession = queueConnection.createQueueSession(false,
8. Session.AUTO_ACKNOWLEDGE);
9. .info("Queue session created");
10. queue = ((AQjmsSession) queueSession).getQueue(userName, queueName);
11. log.info("Queue getted");
12. subscriber = queueSession.createReceiver(queue);
13. log.info("初始化完成");
log.info("开始连接 ");
queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(ip,sid, port, "thin");
queueConnection = queueConnectionFactory.createQueueConnection(userName, password);
log.info("创建Queue Connection 成功");
queueConnection.start();
log.info("connection started");
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
.info("Queue session created");
queue = ((AQjmsSession) queueSession).getQueue(userName, queueName);
log.info("Queue getted");
subscriber = queueSession.createReceiver(queue);
log.info("初始化完成");
开始取消息,本例采取while(true)的方式获取消息。当没有消息的时候,线程会一直阻塞,直到有新的消息到来,立即取出。
Java代码 复制代码
1. while (true) {
2. message = subscriber.receive();//receive方法使没有新消息时,线程挂起
3. //do something...
4. }
5.
Java代码
1. while (true) {
2. message = subscriber.receive();//receive方法使没有新消息时,线程挂起
3. //do something...
4. }
5.
while (true) {
message = subscriber.receive();//receive方法使没有新消息时,线程挂起
//do something...
}
相关推荐
以及基于日志的CDC,如Debezium、Canal和Flink-CDC,后者通过解析数据库的日志来实时获取变更。Flink-CDC因其全增量一体化同步、分布式架构和强大的数据加工能力而备受青睐。 **二、为何使用CDC及适用场景** 随着...
在Oracle数据库环境中,实现Socket监听数据库推送信息是一种实时获取数据变化的方法。这通常涉及到数据库触发器、存储过程以及Java编程的结合使用。以下是对这个主题的详细解释: 1. **Oracle触发器**: - 触发器...
1. **配置Oracle CDC连接器**:Flink提供了JDBC connector,它可以配合第三方库如Oracle GoldenGate或Oracle LogMiner来获取Oracle CDC事件。你需要确保已安装了相应的Oracle JDBC驱动(如ojdbc.jar),并将该驱动...
- **研究方法**:首先使用ADO控件获取Oracle数据库中的数据集,然后通过调用TClientDataSet的`SaveToFile()`方法将其保存为指定格式的文件。这里选择XML格式进行保存。 - **示例**:以一个Oracle表`D_PORT`为例,...
Oracle GoldenGate 是一款高效的数据复制解决方案,主要用于实时数据同步和数据迁移。在Oracle数据库环境中,它提供了低延迟、高可用性的数据复制功能。本压缩包"Oracle_GoldenGate_11.2.1.0.3 for Oracle_11g_...
1. `elasticsearch-river-jdbc-1.4.0.10.jar`:这是Elasticsearch JDBC River插件的一个版本,它允许Elasticsearch从各种支持JDBC(Java Database Connectivity)的数据库中获取数据,包括Oracle。River是Elastic...
Oracle公司是Java的主要维护者,它不仅提供了Java开发工具包(JDK),还管理着Java平台标准版(Java SE)、企业版(Java EE)和微型版(Java ME)。SVN,全称Subversion,是一种版本控制系统,用于跟踪文件和目录的...
7. **参与用户组和其他资源**:加入Oracle用户组,利用社区资源,可以获取最新的技术动态和解决方案。 8. **建立标准和变更控制流程**:标准化数据库设计和管理流程,实施严格的变更控制,减少错误和风险。 9. **...
- 包括共享池、大型池、Java池、数据库缓冲区缓存和重做日志缓冲区等。 - 相较于数据库实例的SGA,ASM实例的SGA规模较小。 - **后台进程** - 处理与存储管理相关的任务,如数据同步、元数据维护等。 2. **安装...
- `syschange_on_install--`: 这可能是Oracle安装过程中某个步骤的命令或提示信息,用于记录安装过程中的某些变更。 - `systemmanager--PLSQLDeveloper`: “systemmanager”可能是指Oracle系统管理员账户,“PLSQL...
标题 "idea+springboot+oracle+mybatis" 暗示了这个项目是基于IntelliJ IDEA的一个集成开发环境,使用Spring Boot框架,配合Oracle数据库和MyBatis持久层框架来构建的应用。以下是对这些技术栈的详细解释: 1. **...
Oracle® Fusion Middleware Tutorial for Oracle Coherence 12c (12.1.2) 是为开发者和架构师准备的教程文档,提供了创建、配置和部署基于Oracle Coherence的Java应用的逐步实例。该教程的主要内容包括但不限于以下...
Oracle CDC(Change Data Capture)是一种高效的数据同步技术,主要用于在异构环境中实现实时数据捕捉、转换及传递。它通过监控数据库的日志文件来检测数据变化,并将这些变化数据进行捕获、记录和传输。此技术对于...
首先,关于数据库的设计与操作,Java项目通常会用到关系型数据库管理系统,如MySQL或Oracle。在这个过程中,我们需要学习SQL语言进行数据操作,包括创建表、插入数据、查询和更新等。此外,还会涉及数据库优化、事务...
### Oracle E-Business Suite Release Notes ...因此,在部署或升级Oracle E-Business Suite时,务必检查最新的官方文档以获取最新支持信息。此外,根据实际使用场景选择合适的操作系统版本和浏览器类型也非常重要。
通过监听数据库,可以实现实时的数据变更通知、事务处理、审计日志记录等功能,从而增强应用程序的健壮性和安全性。 ### `AbstractDAO`类分析 `AbstractDAO`类是Java中实现数据库操作的一个典型抽象类设计。它提供...
通过监听和解析MySQL的Binlog事件,我们可以获取到数据库的实时变更信息,进而实现增量数据的捕获。常见的开源工具如Canal、Maxwell等,就是基于此原理实现的。 在“springboot-binlog-main”这个项目中,很可能...
5. **监听和事件驱动**:一些数据库如MySQL的Binlog或Oracle的GoldenGate支持监听数据库日志,当有变更时触发事件,Java应用通过订阅这些事件来同步数据。 6. **第三方库**:如Flyway、Liquibase用于数据库版本管理...
JDBC是Sun Microsystems(现已被Oracle收购)推出的一种Java数据库连接规范,它提供了一种统一的API,让Java程序员能够在不同的数据库系统上编写数据库应用,无需关心底层数据库的差异。 8.0.25是该驱动的版本号,...