`
sheungxin
  • 浏览: 105879 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

基于oracle的增量数据采集实现总结

阅读更多
  • 项目打包方案
在“基于oracle的增量数据采集”一文中提出了基于触发器》物化视图》存储过程》java source》外部程序数据采集方案。本文初步对其进行了实现,利用maven-assembly-plugin进行打包,输出结构如下:bin、conf、lib,分别存放命令文件、配置文件、jar包,需注意在bin目录下命令文件中把conf、lib加入classpath中,详见start.bat、clear.bat
@echo off & setlocal enabledelayedexpansion

set LIB_JARS=""
cd ..\lib
for %%i in (*) do set LIB_JARS=!LIB_JARS!;..\lib\%%i
cd ..\bin

java -Xms64m -Xmx1024m -classpath ..\conf;%LIB_JARS% com.service.data.sync.oracle.producer.SyncDataEnv init
goto end

:end
pause
目前仅配置命令文件start.bat、clear.bat,start.bat:启动初始化数据同步环境(创建java source、存储过程、物化视图、触发器),clear.bat:清除数据同步环境(删除java source、存储过程、物化视图、触发器),clear.bat只需把上述命令中init替换为clear即可

maven-jar-plugin,也是一个maven打包插件,可加入lib依赖、指定启动类等。使用java -jar 【jar包】 【args】 命令启动,本项目使用assembly即可

  • 需要注意事项
a、配置文件中指定需要同步的表,执行start.bat自动为每个表建立物化视图和触发器,已存在忽略
b、目前不支持二进制、单条数据过长,因为在触发器中拼接的数据类型为varchar2,限制4000;需要特殊处理,未验证
c、向外发布数据应采用异步发送方案,否则会影响业务库的数据提交。java source执行完,业务库中的事务方可提交成功,已验证
d、触发器创建在物化视图的原因在于,在表上建立触发器,未提交已触发。物化视图在未提交前存在物化视图日志中,不会触发
e、尽量减少jar的使用,避免oracle导入jar过多
f、检查是否安装OracleJVM,用sys用户执行"select * from dba_registry where comp_id='JAVAVM'",没有记录表示未安装,用database configuration assistant安装java组件或者执行$ORACLE_HOME/javavm/install/initjvm.sql脚本
g、在初始化数据同步环境前需在oracle中添加java source所依赖的jar包,目前最简方案把本项目中AbstractSend、HttpUrlSend、SyncDataRunner三个类打包,添加到oracle中;仅使用HttpURLConnection向外发数据,不依赖任何java环境外的jar,仅用于测试;正式环境应考虑使用其它异步发送方案,例如消息队列
loadjava -r -f -verbose -resolve -user username/password xxx.jar
loadjava -r -f -user username/password xxx.class
dropjava -r -f -user [option_list] file_list
该命令需要直接在数据库本机的命令行中执行,不能在plsql中执行

  • 代码实现
package com.service.data.sync.oracle.producer.send;

/**
 * 把监控获取数据向外发送,
 * @author sheungxin
 *
 */
public abstract class AbstractSend {
	
	/**
	 * 发送消息
	 * @param message
	 */
	public void send(String message){
		beforeSend(message);
		exeSend(message);
		afterSend(message);
	}
	
	/**
	 * 发送消息前执行
	 * @param message
	 */
	private void beforeSend(String message){
		//do something
	}
	
	/**
	 * 实际发送消息实现类
	 * @param message
	 */
	public abstract boolean exeSend(String message);

	/**
	 * 发送消息后执行
	 * @param message
	 */
	private void afterSend(String message){
		//do something
	}
}
消息发送抽象类,把监控的数据变化信息发送出去,可在发送前后做一些操作
package com.service.data.sync.oracle.producer.send;

import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;

/**
 * 使用HttpURLConnection向外发送消息,GET方式,消息有长度限制,仅用于测试
 * @author sheungxin
 *
 */
public class HttpUrlSend extends AbstractSend{
	
	private static final String sendPath="http://192.168.19.99:8181/jeesite/f/list-7.html?params=";
			
	public boolean exeSend(String message){
		boolean flag=true;
		try{
			URL url = new URL(sendPath);
			HttpURLConnection conn = (HttpURLConnection) url.openConnection();
			conn.setRequestMethod("GET");
			conn.setDoInput(true);
			InputStream is = conn.getInputStream();
			is.close();
		}catch(Exception e){
			e.printStackTrace();
			flag=false;
		}
		return flag;
	}

}
本消息发送实现类仅用于测试,不建议使用。在注意事项中提到过,需改用异步发送机制,避免对业务库事物提交产生影响
package com.service.data.sync.oracle.producer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import com.service.data.sync.oracle.producer.send.AbstractSend;
import com.service.data.sync.oracle.producer.send.HttpUrlSend;

/**
 * 数据同步执行类
 * @author sheungxin
 *
 */
public class SyncDataRunner {
	
	private static final BlockingQueue<String> taskQueue=new LinkedBlockingQueue<String>();
	
	/**
	 * 向队列中添加待发送的消息,队列满后返回false,LinkedBlockingQueue默认Integer.MAX_VALUE
	 * @param message
	 */
	public static boolean addTask(String message){
		return taskQueue.offer(message);
	}
	
	/**
	 * 消费队列中数据,向外发送消息
	 */
	public static void executeTask(){
		AbstractSend httpSend=new HttpUrlSend();
		while(true){
			String message;
			try {
				//堵塞获取待发送消息
				message = taskQueue.take();
				httpSend.send(message);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

}
数据同步实际操作类,仅提供两个方法,addTask用于把监控到的数据放入队列中,在oracle中的java source代码中调用;executeTask用于把队列中的消息向外发送,此处仅单线程向外发送,可考虑多线程,但需注意对oracle性能的影响。具体根据业务情况,单线程+MQ应该也可以满足业务需求。另外,消息存放在队列中,异常情况下有可能会丢失,需要针对可能发生的异常进行特殊处理

关于java代码操作java source、存储过程、物化视图、触发器,这里就不贴代码,主要介绍遇到的一些坑。
1、在创建触发器时,拼接数据需要用到:old、:new,分别为触发事件之前和之后的对象,一直报错,无法创建成功。但是把sql拷贝到plsql可正常执行,通过排除法定位是“:old、:new”引起的。直接使用的JDBC,替换PrepareStatement改用Statement后可以了,PrepareStatement会进行预编译,一直不通过
2、创建java source时,一直异常,也是拷贝到plsql中可成功执行。最后解决方案:Statement.setEscapeProcessing(false),如果参数为true,则驱动程序在把SQL语句发给数据库前进行转义替换,否则让数据库自己处理

  • 其它思路
add2ws 写道
用得着这么麻烦么,直接用kettle插入更新,配合oracle触发器省时省力
根据上述意见,调研了kettle的增量同步方案:
1、全量比对取增量
2、使用时间戳进行数据增量更新
3、使用触发器+快照表进行数据增量更新
方案1,全量比对性能肯定不佳;方案2,需要时间戳,原有业务库不一定支持;方案3,使用快照表记录数据变化情况。网上有部分方案提出:快照表仅记录修改、删除操作,定时删除目标表在快照表中有的数据,再复制源表中没有的数据。这样删除修改数据再添加,有些业务不可取。这样的好处我的理解是可以批量处理数据,若根据操作类型逐条处理效率过低。所以我们需要根据实际业务场景去均衡实时性、性能、一致性、复杂度等去制定方案。
所以上文中的实现方案即替换为:触发器+快照表+快照表扫描程序(也可考虑用kettle),好处在于与数据库耦合度降低,但实时性、效率可能就差一些

以下这篇帖子对物化视图和ETL进行了讨论,适合自己即可,可以参考下:
引用
0
0
分享到:
评论
2 楼 sheungxin 2017-02-27  
@add2ws 我试下,多谢!
1 楼 add2ws 2017-02-27  
用得着这么麻烦么,直接用kettle插入更新,配合oracle触发器省时省力

相关推荐

    大数据采集技术-大数据采集技术概述.pptx

    大数据采集技术是现代信息技术领域的重要组成部分,它涉及从各种数据源获取海量数据,并将这些数据转化为可供分析和处理的形式。本篇文章将详细探讨大数据采集的主要技术及其应用。 首先,我们来了解一下大数据的...

    基于Rman和TSM的Oracle数据库备份方法研究.pdf

    本文的研究结果表明,基于Rman和TSM的Oracle数据库备份方案是一种安全高效的数据备份解决方案,对于气象数据采集和存储具有重要的参考价值。 知识点: 1. 数据库备份的重要性:数据库备份是数据库管理中非常重要的...

    基于 Flink CDC 的现代数据栈.pdf

    Flink CDC 在现代数据栈中的作用是作为数据采集和转换的工具,通过 Flink CDC,可以高效实现海量数据的实时集成,并将数据传输到下游的存储组件中。 Flink CDC 是一个强大的数据集成框架,可以高效实现海量数据的...

    阿里巴巴大数据之路——数据技术篇.pdf

    而TimeTunnel则用于实时数据同步,基于数据库的日志,如MySQL的bin-log和Oracle的归档日志,实现增量数据的实时传输,其本质上是一个消息中间件。 在数据同步过程中,一个常见的挑战是如何合并增量数据与全量数据。...

    logstash采集数据库数据插件

    `schedule` 用 cron 表达式定义了执行频率,`use_column_value` 和 `tracking_column` 用于实现基于时间戳的增量查询,确保每次只获取自上次查询以来的新数据。 标签 "logstash" 和 "logstash-inp" 暗示了这个插件...

    NIFI 模板实例

    NIFI使用请参考:Apache NIFI 安装 ● 操作 ● 文件同步 ● oracle 数据库增量同步实例讲解... 基于WEB图形界面,通过拖拽、连接、配置完成基于流程的编程,实现数据采集、处理等功能。

    Mysql到hdfs增量同步实验手册.pdf

    reader部分设置MySQL连接信息和基于时间戳的查询SQL,用于筛选出需要同步的增量数据。writer部分设置HDFS的存储路径、文件类型、列信息以及写入模式。 - JSON配置文件中的`${id}`是一个动态参数,运行时通过命令行...

    基于AnalyticDB for MySQL构建实时数据仓库.pdf

    - **增量同步**: 通过数据迁移或跨库SQL等方式实现增量数据的同步。 #### 五、总结 通过以上介绍可以看出,利用AnalyticDB for MySQL构建实时数据仓库不仅可以提高数据处理的速度和效率,还能降低运维成本,增强...

    劳动与社会保障数据交换平台技术方案.doc

    1. 建立数据采集平台,实现下级业务系统数据向数据中心的实时更新和转换。该平台应能隔离底层系统变化对资源库的影响,确保数据质量不受系统升级或格式变化等影响。 2. 实现数据分析和综合决策支持。基于资源库的...

    某金融保险数据中心基于机器学习的智能运维经验分享-最佳实践.docx

    该项目的主要创新点包括:基于数据特色定制模型,例如Oracle日志数据数据库监控平台的日志数据通过间隔固定时间的快照方式被采集,诸如SQL执行时间、CPU时间增量、集群等待时间等反映数据库性能的多项指标按时刻被...

    Oracle数据库实验操作

    ##### 采集数据 - **实验131:** 使用`UTLBSTAT`和`UTLESTAT`工具进行优化。 - **实验132:** 使用`SPREPORT`工具进行优化。 - **实验133:** 使用`DBMS_JOB`包维护作业。 ##### 共享池优化 - **实验134:** SQL...

    大数据基础知识.docx

    在大数据的基础知识中,我们通常关注四大核心环节:大数据采集、大数据预处理、大数据存储和大数据分析。以下是对这些环节的详细阐述: 一、大数据采集 大数据采集是大数据生命周期的起点,它涵盖了从各种源头获取...

    基于大数据环境下Python的爬虫技术的应用.pdf

    通用爬虫技术主要抓取网络上的各种信息,适合于广泛的数据采集。而聚焦网络爬虫技术则针对特定主题或用户需求,只抓取与主题相关的内容,因而更高效、更有针对性。 增量式爬虫技术的应用,可以避免重复抓取已有数据...

    接口层实时采集工具OGG介绍.pptx

    总结起来,Oracle GoldenGate是一个强大的工具,用于实现实时数据集成和复制,尤其在大数据环境下,其性能和灵活性使其成为企业级数据同步的关键组件。了解和掌握OGG的相关概念、配置方法以及常用命令,对于管理和...

    藏经阁-PostgresChina2018_樊文凯_ORACLE数据库和应用异构迁移最佳实践.pdf

    3. 数据库增量、全量迁移:ADAM可以对数据库进行增量、全量迁移,确保数据的一致性和完整性。 4. 数据库系统优化:ADAM可以对数据库系统进行优化,提高数据库的性能和可扩展性。 5. PL/SQL转Java应用迁移:ADAM可以...

    hadoop项目--网站流量日志分析--5.docx

    在这个过程中,数据采集是一个重要的步骤,虽然在某些情况下对数据采集的可靠性要求可能不是特别严格,但理解如何高效地采集和处理数据仍然是至关重要的。 Apache Sqoop是Hadoop生态系统中的一个工具,专门用于在...

Global site tag (gtag.js) - Google Analytics