`
jiayj198609
  • 浏览: 149825 次
  • 性别: Icon_minigender_1
  • 来自: 大连
社区版块
存档分类
最新评论

java实现高性能的数据同步

    博客分类:
  • Java
 
阅读更多
     最近在做一个银行的生产数据脱敏系统,今天写代码时遇到了一个“瓶颈”,脱敏系统需要将生产环境上Infoxmix里的数据原封不动的Copy到另一台 Oracle数据库服务器上,然后对Copy后的数据作些漂白处理。为了将人为干预的因素降到最低,在系统设计时采用Java代码对数据作Copy,思路 如图:



    首 先在代码与生产库间建立一个Connection,将读取到的数据放在ResultSet对象,然后再与开发库建立一个Connection。从 ResultSet取出数据后通过TestConnection插入到开发库,以此来实现Copy。代码写完后运行程序,速度太慢了,一秒钟只能Copy 一千条数据,生产库上有上亿条数据,按照这个速度同步完要到猴年马月呀,用PreparedStatement批处理速度也没有提交多少。我想能不能用多 线程处理,多个人干活总比一个人干活速度要快。
    假设生产库有1万条数据,我开5个线程,每个线程分2000条数据,同时向开发库里插数据,Oracle支持高并发这样的话速度至少会提高好多倍,按照这 个思路重新进行了编码,批处理设置为1万条一提交,统计插入数量的变量使用 java.util.concurrent.atomic.AtomicLong,程序一运行,传输速度飞快CPU利用率在70%~90%,现在一秒钟可 以拷贝50万条记录,没过几分钟上亿条数据一条不落地全部Copy到目标库。

在查询的时候我用了如下语句
String queryStr = "SELECT * FROM xx";
ResultSet coreRs = PreparedStatement.executeQuery(queryStr);

实习生问如果xx表里有上千万条记录,你全部查询出来放到ResultSet, 那内存不溢出了么?Java在设计的时候已经考虑到这个问题了,并没有查询出所有的数据,而是只查询了一部分数据放到ResultSet,数据“用完”它 会自动查询下一批数据,你可以用setFetchSize(int rows)方法设置一个建议值给ResultSet,告诉它每次从数据库Fetch多少条数据。但我不赞成,因为JDBC驱动会根据实际情况自动调整 Fetch的数量。另外性能也与网线的带宽有直接的关系。
相关代码

package com.dlbank.domain;  
  
import java.sql.Connection;  
import java.sql.PreparedStatement;  
import java.sql.ResultSet;  
import java.sql.Statement;  
import java.util.List;  
import java.util.concurrent.atomic.AtomicLong;  
  
import org.apache.log4j.Logger;  
  
/** 
 *<p>title: 数据同步类 </p>   
 *<p>Description: 该类用于将生产核心库数据同步到开发库</p>   
 *@author Tank Zhang  
 */  
public class CoreDataSyncImpl implements CoreDataSync {  
      
    private List<String> coreTBNames; //要同步的核心库表名  
    private ConnectionFactory connectionFactory;  
    private Logger log = Logger.getLogger(getClass());  
      
    private AtomicLong currentSynCount = new AtomicLong(0L); //当前已同步的条数  
      
    private int syncThreadNum;  //同步的线程数  
  
    @Override  
    public void syncData(int businessType) throws Exception {  
          
        for (String tmpTBName : coreTBNames) {  
            log.info("开始同步核心库" + tmpTBName + "表数据");  
            // 获得核心库连接  
            Connection coreConnection = connectionFactory.getDMSConnection(4);  
            Statement coreStmt = coreConnection.createStatement();  
            //为每个线程分配结果集  
            ResultSet coreRs = coreStmt.executeQuery("SELECT count(*) FROM "+tmpTBName);  
            coreRs.next();  
            //总共处理的数量  
            long totalNum = coreRs.getLong(1);  
            //每个线程处理的数量  
            long ownerRecordNum =(long) Math.ceil((totalNum / syncThreadNum));   
            log.info("共需要同步的数据量:"+totalNum);  
            log.info("同步线程数量:"+syncThreadNum);  
            log.info("每个线程可处理的数量:"+ownerRecordNum);  
            // 开启五个线程向目标库同步数据  
            for(int i=0; i < syncThreadNum; i ++){  
                StringBuilder sqlBuilder = new StringBuilder();  
                //拼装后SQL示例  
                //Select * From dms_core_ds Where id between 1 And 657398  
                //Select * From dms_core_ds Where id between 657399 And 1314796  
                //Select * From dms_core_ds Where id between 1314797 And 1972194  
                //Select * From dms_core_ds Where id between 1972195 And 2629592  
                //Select * From dms_core_ds Where id between 2629593 And 3286990  
                //..  
                sqlBuilder.append("Select * From ").append(tmpTBName)  
                        .append(" Where id between " ).append(i * ownerRecordNum +1)  
                        .append( " And ")  
                        .append((i * ownerRecordNum + ownerRecordNum));  
                Thread workThread = new Thread(  
                        new WorkerHandler(sqlBuilder.toString(),businessType,tmpTBName));  
                workThread.setName("SyncThread-"+i);  
                workThread.start();  
            }  
            while (currentSynCount.get() < totalNum);  
            //休眠一会儿让数据库有机会commit剩余的批处理(只针对JUnit单元测试,因为单元测试完成后会关闭虚拟器,使线程里的代码没有机会作提交操作);  
            //Thread.sleep(1000 * 3);  
            log.info( "核心库"+tmpTBName+"表数据同步完成,共同步了" + currentSynCount.get() + "条数据");  
        }  
    }// end for loop  
      
    public void setCoreTBNames(List<String> coreTBNames) {  
        this.coreTBNames = coreTBNames;  
    }  
  
    public void setConnectionFactory(ConnectionFactory connectionFactory) {  
        this.connectionFactory = connectionFactory;  
    }  
      
    public void setSyncThreadNum(int syncThreadNum) {  
        this.syncThreadNum = syncThreadNum;  
    }  
      
    //数据同步线程  
    final class WorkerHandler implements Runnable {  
        ResultSet coreRs;  
        String queryStr;  
        int businessType;  
        String targetTBName;  
        public WorkerHandler(String queryStr,int businessType,String targetTBName) {  
            this.queryStr = queryStr;  
            this.businessType = businessType;  
            this.targetTBName = targetTBName;  
        }  
        @Override  
        public void run() {  
            try {  
                //开始同步  
                launchSyncData();  
            } catch(Exception e){  
                log.error(e);  
                e.printStackTrace();  
            }  
        }  
        //同步数据方法  
        void launchSyncData() throws Exception{  
            // 获得核心库连接  
            Connection coreConnection = connectionFactory.getDMSConnection(4);  
            Statement coreStmt = coreConnection.createStatement();  
            // 获得目标库连接  
            Connection targetConn = connectionFactory.getDMSConnection(businessType);  
            targetConn.setAutoCommit(false);// 设置手动提交  
            PreparedStatement targetPstmt = targetConn.prepareStatement("INSERT INTO " + targetTBName+" VALUES (?,?,?,?,?)");  
            ResultSet coreRs = coreStmt.executeQuery(queryStr);  
            log.info(Thread.currentThread().getName()+"'s Query SQL::"+queryStr);  
            int batchCounter = 0; //累加的批处理数量  
            while (coreRs.next()) {  
                targetPstmt.setString(1, coreRs.getString(2));  
                targetPstmt.setString(2, coreRs.getString(3));  
                targetPstmt.setString(3, coreRs.getString(4));  
                targetPstmt.setString(4, coreRs.getString(5));  
                targetPstmt.setString(5, coreRs.getString(6));  
                targetPstmt.addBatch();  
                batchCounter++;  
                currentSynCount.incrementAndGet();//递增  
                if (batchCounter % 10000 == 0) { //1万条数据一提交  
                    targetPstmt.executeBatch();  
                    targetPstmt.clearBatch();  
                    targetConn.commit();  
                }  
            }  
            //提交剩余的批处理  
            targetPstmt.executeBatch();  
            targetPstmt.clearBatch();  
            targetConn.commit();  
            //释放连接   
            connectionFactory.release(targetConn, targetPstmt,coreRs);  
        }  
    }  
}  


  • 大小: 14.5 KB
分享到:
评论
35 楼 gaobo424 2010-11-26  
你是在哪家银行哦,有点搞扯,居然允许你连接它的核心数据库???你要是发一条Update语句呢?我做银行的项目都N久了,做过建行、商业银行的项目,谁敢把核心数据给你直连???顶多只能给你备过份出来,你再导入还差不多
34 楼 zhys513 2010-11-26  
迁移数据的话,PB也有个可以做不同数据库的对接(数据管道)。迁移不是很快,但操作很简单就是了。
33 楼 zhys513 2010-11-26  
jiayj198609 写道
yjwxfpl 写道
PreparedStatement targetPstmt = targetConn.prepareStatement("INSERT INTO " + targetTBName+" VALUES (?,?,?,?,?)");

你同步的表结构都只有五个字段  ,targetPstmt.setString()  类型全都是String?



那具体的表结构当然是看你具体的操作了;我的表只有五个字段;当然就set五个了;难道还要set6个?



用户set和用SQL拼接是有性能的差别的!既然是考虑大数据的迁移,所以对每条语句的性能都是要考虑到滴。
32 楼 LikeEJB_CC 2010-11-26  
一次1000条,还多线程,IO怎么解决
频繁的少量写入,不如一次大批量的写入.
31 楼 sunlong 2010-11-26  
线程处理上并不是很好吧,比如while (currentSynCount.get() < totalNum);你这是让主线程运行空语句,太浪费了
30 楼 smithfox 2010-11-26  
首先, 如果楼主真的是转载别人的文章, 需要加引说原文. 这种直接抄袭的做法, 我们的robbin哥是最鄙视的

yizhilong28 写道
1) CLOB/BLOB字段
2) 一次性查询过多数据, 遇到 "ORA-1013", "ORA-1555",或堆栈溢出
3) insert时产生主键冲突

做过一同步的项目,1,2,3都遇到了,用java基本没解,头疼



这种数据库的手动复制用在一次性工作善可, 如果是一个长期使用的tools或是服务, 是很成问题的.

我所做的系统遇到了很多的问题, 主要有

1) CLOB/BLOB字段, 只比较长度, 不比较实际内容, 并且用ojdbc6.jar(最新oracle驱动fix了不少bug, 并且支持JDBC4标准, 所以用CLOB/ BLOG字段的支持更好

2) 一次性查询过多数据, 遇到 "ORA-1013", "ORA-1555": 需要对查询进行分段(分段逻辑需要针对不同的数据库定制, 一般情况是对类似lastmodifiedtime字段, 以时间分段

3) insert产生冲突, 或是其他的异常, 需要有非常好的异常处理框架, 因为JDBC的异常只有SQLException, 不能区分是连接异常, 表级异常, 还是行级异常. 并且要做好出异常的尝试机制. 只有这样才能避免因为中间异常而出现的中途退出和进入死循环.  要知道真正的产品上一次复制可能是好几天, 没有人能一直盯着看. 程序必须有很好的健壮性.

4) 对第三的补充, 尽管JDBC4标准已经对SQLException做了很好的改进, 但是各大数据库产商支持太差.
29 楼 吾日三省吾身 2010-11-26  
尊重人家版权嘛 转载的时候要说明哦
28 楼 sheep3600 2010-11-26  
codermouse 写道
最近刚在做一个同步项目,和楼主要求的很像。思路也和楼主要求的差不多。
不过说实话,不赞成用JAVA做数据同步,我再做的时候就出现很多不方便处理的问题,总是通过绕来绕去来解决。
可惜本人只会JAVA(而且还不是精通),其它的都不会,DBLINK也是开始做这个同步项目后才知道的,不过DBLINK用了一段时间,觉得很不稳定。也不赞成用。
如果有DBA,或者我是DBA,我想数据同步是个很容易解决的问题。
哎。。。感叹程序员要学的东西太多。
PS1:这是转贴?
PS2:JAVAEYE重新开张,数据丢失了,怎么还要我做发帖测试?还是经过前2天的事故,要求更进一步了?
PS3:恭喜JAVAEYE重新开张,不好意思,话有点过多了。



这种事情DBA估计不到10分钟就全搞定了,其实这种事情就该DBA去搞,各有所属,各有专攻嘛。
生产库别说select了,连ip我们都不知道,所以这种事还是不掺和了。
27 楼 renpeng301 2010-11-26  
sunnylocus 写道
jiayj198609 写道
imacback 写道
为什么要用java导呢,有没有别的解决方案???这个实在是慢,搞数据仓库的不错。

呵呵。。。解决方案有很多的;我也是实际项目赶到这儿了;所以就用java线程模式处理了!

老兄,你怎么随便拿我的文章到处贴,您也在大连银行工作?不过我好象没有见过你

真的是转的啊
26 楼 85977328 2010-11-26  
直接用dblink
25 楼 bitray 2010-11-26  
sunnylocus 写道
jiayj198609 写道
imacback 写道
为什么要用java导呢,有没有别的解决方案???这个实在是慢,搞数据仓库的不错。

呵呵。。。解决方案有很多的;我也是实际项目赶到这儿了;所以就用java线程模式处理了!

老兄,你怎么随便拿我的文章到处贴,您也在大连银行工作?不过我好象没有见过你



哈,直接遇到原作者就太尴尬了,不过老兄你也别介意。现在到处都是那种超级年轻的软件工程师。总的给他们一点点信心啊,哈哈
24 楼 xhdwell 2010-11-26  
用jdbc效率肯定不行的。建议通过动态组建脚本,使用脚本执行COPY
23 楼 pikenlike_123 2010-11-25  
我记得有个数据抽取/数据转换工具叫 ODI  
      ODI最大的特点是特征是提出了知识模块的概念(Knowledge Module)。
   可以百度一下..
22 楼 houzidexinsheng 2010-11-25  
去年我写了一个应用系统,不过场景和你有区别,我们主要是要同步基础数据,对需要同步的基础数据做版本控制,发布的时候直接把开发库的基础数据增量更新到生产库,如果需要的话,可以交流一下!
21 楼 yizhilong28 2010-11-25  
1) CLOB/BLOB字段
2) 一次性查询过多数据, 遇到 "ORA-1013", "ORA-1555",或堆栈溢出
3) insert时产生主键冲突

做过一同步的项目,1,2,3都遇到了,用java基本没解,头疼

20 楼 gwpking8419 2010-11-25  
ETL工具呗
19 楼 codermouse 2010-11-25  
最近刚在做一个同步项目,和楼主要求的很像。思路也和楼主要求的差不多。
不过说实话,不赞成用JAVA做数据同步,我再做的时候就出现很多不方便处理的问题,总是通过绕来绕去来解决。
可惜本人只会JAVA(而且还不是精通),其它的都不会,DBLINK也是开始做这个同步项目后才知道的,不过DBLINK用了一段时间,觉得很不稳定。也不赞成用。
如果有DBA,或者我是DBA,我想数据同步是个很容易解决的问题。
哎。。。感叹程序员要学的东西太多。
PS1:这是转贴?
PS2:JAVAEYE重新开张,数据丢失了,怎么还要我做发帖测试?还是经过前2天的事故,要求更进一步了?
PS3:恭喜JAVAEYE重新开张,不好意思,话有点过多了。
18 楼 jiasky 2010-11-25  
你的这种思路人们用来做了很多事,
1个西瓜1个人吃了1分钟,
现在分成10个人,每个人6秒钟就吃完了,呵呵...
17 楼 icanfly 2010-11-25  
如果,我是说如果,你在中转复制的时候突然你的中转器(java实现的)挂了,当你恢复中转的时候,怎么处理?有容错吗?
16 楼 liliugen 2010-11-25  

天~\(≧▽≦)/~啦啦啦,这是银行系统啊 。关链接都不在finally里面的啊,这样的银行系统敢用么。。
 

相关推荐

    java多个数据库实现数据同步

    本文将深入探讨如何使用Java来实现多个数据库之间的数据同步。 首先,我们需要理解数据同步的含义。数据同步是指在两个或多个数据库之间,当某个数据库中的数据发生改变时,这些变化能够被实时或者近实时地反映到...

    java实现两个mysql同步主库的数据

    下面我们将深入探讨如何使用Java实现两个MySQL数据库之间的数据同步,以及涉及的相关知识点。 首先,我们需要理解MySQL的复制原理。MySQL的主从复制是基于日志的,主库上的所有更改都会被记录到二进制日志(binlog...

    SpringBoot定时任务实现Oracle和mysql数据同步

    Spring Boot作为Java领域的一个热门微服务框架,提供了强大的定时任务功能,能够帮助我们实现不同数据库间的数据同步,比如Oracle到MySQL。本篇文章将详细讲解如何利用Spring Boot的定时任务特性,结合Java的相关...

    mysql-oracle数据同步

    本文将深入探讨MySQL到Oracle的数据同步过程,以及如何在同步过程中修改MySQL数据表的标识。 首先,我们要理解MySQL和Oracle是两种不同的关系型数据库管理系统(RDBMS)。MySQL以其开源、轻量级、高效的特点被广泛...

    JAVA 线程实现数据库的主从同步更新

    在Java编程环境中,数据库的主从同步更新是一个重要的任务,特别是在分布式系统或者高可用性架构中,确保数据的一致性和完整性至关重要。这个过程通常涉及到主数据库的写操作和从数据库的读操作,以及两者间的实时...

    Java编程实现同步序列密码的加密解密系统

    总结来说,Java编程实现的同步序列密码加密解密系统是一种高效且灵活的解决方案,适用于需要实时加密大量数据的场景。理解其工作原理和实现细节,对于提升软件的安全性和合规性至关重要。在开发过程中,应始终关注...

    Java高性能编程

    ### Java高性能编程——构建高可用系统的关键技术与实践 #### 引言 随着互联网技术的飞速发展,用户对服务的稳定性和响应速度提出了更高的要求。对于任何IT系统而言,“高可用性”(High Availability, HA)已经...

    java实现csv导出千万级数据实例

    本实例聚焦于“java实现csv导出千万级数据实例”,旨在提供一个高效、稳定的解决方案,避免因数据量过大而导致的性能问题,如Java中的栈溢出(Stack Overflow)。CSV(Comma Separated Values)格式因其简单、通用性...

    canal实现mysql到ES数据实时同步

    - 观察Canal Server和Adapter的日志,确认数据同步是否正常。 - 使用Elasticsearch的查询语句验证数据是否已正确写入。 在实际应用中,还需要考虑一些高级特性,如数据过滤(只同步部分表或部分列)、数据一致性...

    Java实现拖拽列表项的排序功能

    在Java中,我们可能使用JavaFX或Swing来实现这样的功能。对于JavaFX,我们可以监听`onDragDetected`、`onDragEntered`、`onDragExited`、`onDragDropped`和`onDragDone`事件。以下是一个简化的JavaFX示例: ```java...

    深入学习Java同步机制中的底层实现

    Java同步机制是多线程编程中确保数据一致性与正确性的关键。在Java中,主要有两种同步机制:内置的`synchronized`关键字以及基于`java.util.concurrent`包中的高级同步工具类。本文将深入探讨这些机制的底层实现,...

    Java对象缓存系统的实现,实现了LRU算法,并可以进行集群同步

    在IT行业中,缓存系统是优化应用程序性能的关键技术之一,特别是在大数据量和高并发的场景下。本项目实现了一个基于Java的对象缓存系统,其中包含了LRU(Least Recently Used)算法,以及支持集群同步功能。这里我们...

    关于java实现群聊和同步画图小结。

    在Java编程领域,实现群聊和同步画图是两个具有挑战性的任务,它们涉及到网络通信、多线程、图形用户界面(GUI)以及数据同步等多个关键知识点。这篇博客的作者通过分享自己的实践经验和代码示例,为我们揭示了如何...

    java获取电子称重量数据

    综上所述,Java获取电子称重量数据涉及了Java串口通信、通信协议的理解与实现、异常处理和性能优化等多个方面的知识。通过正确的技术选型和严谨的编程实践,我们可以构建可靠的系统来无缝地集成电子秤设备,实现称重...

    mysql历史数据同步到clickhouse 已测试

    ClickHouse是一个高性能的列式数据库管理系统(Column-Oriented DBMS),适用于在线分析处理(OLAP)和实时数据分析。MySQL作为流行的开源关系型数据库,广泛应用于事务处理和实时数据存储。在需要对历史数据进行...

    java实现歌词同步显示

    Java 实现歌词同步显示涉及到的是音乐播放器中的一个重要功能,即如何将歌词与正在播放的音乐进行精确匹配,为用户提供良好的听歌体验。在本文中,我们将深入探讨如何使用Java来实现这一功能,主要涉及以下几个关键...

    java实现基于netty 的udp字节数据接收服务

    在Java编程环境中,Netty是一个高性能、异步事件驱动的网络应用程序框架,常用于构建可伸缩、高并发的服务器。本示例关注的是如何利用Netty实现一个基于UDP(User Datagram Protocol)的数据接收服务,这在需要进行...

    基于Java实现的基于netty轻量的高性能分布式RPC服务框架

    【作品名称】:基于Java实现的基于netty轻量的高性能分布式RPC服务框架 【适用人群】:适用于希望学习不同技术领域的小白或进阶学习者。可作为毕设项目、课程设计、大作业、工程实训或初期项目立项。 【项目介绍】...

    NettySocket同步数据获取实现

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨如何利用 Netty 实现...Netty的灵活性和高性能使得这些功能的实现变得简单而高效。

    基于java开发的、高性能的、基于解析mysql row base binlog技术实现.zip

    总结来说,基于Java的MySQL Row Base Binlog技术实现是构建高效数据同步系统的关键组件,它能够帮助开发者实现实时、精确且高性能的数据流动,广泛应用于各种分布式和大数据场景。通过深入理解并优化这个技术,可以...

Global site tag (gtag.js) - Google Analytics