锁定老帖子 主题:java实现高性能的数据同步
该帖已经被评为隐藏帖
|
|
---|---|
作者 | 正文 |
发表时间:2010-11-25
最后修改:2010-11-25
首 先在代码与生产库间建立一个Connection,将读取到的数据放在ResultSet对象,然后再与开发库建立一个Connection。从 ResultSet取出数据后通过TestConnection插入到开发库,以此来实现Copy。代码写完后运行程序,速度太慢了,一秒钟只能Copy 一千条数据,生产库上有上亿条数据,按照这个速度同步完要到猴年马月呀,用PreparedStatement批处理速度也没有提交多少。我想能不能用多 线程处理,多个人干活总比一个人干活速度要快。 假设生产库有1万条数据,我开5个线程,每个线程分2000条数据,同时向开发库里插数据,Oracle支持高并发这样的话速度至少会提高好多倍,按照这 个思路重新进行了编码,批处理设置为1万条一提交,统计插入数量的变量使用 java.util.concurrent.atomic.AtomicLong,程序一运行,传输速度飞快CPU利用率在70%~90%,现在一秒钟可 以拷贝50万条记录,没过几分钟上亿条数据一条不落地全部Copy到目标库。 在查询的时候我用了如下语句 String queryStr = "SELECT * FROM xx"; ResultSet coreRs = PreparedStatement.executeQuery(queryStr); 实习生问如果xx表里有上千万条记录,你全部查询出来放到ResultSet, 那内存不溢出了么?Java在设计的时候已经考虑到这个问题了,并没有查询出所有的数据,而是只查询了一部分数据放到ResultSet,数据“用完”它 会自动查询下一批数据,你可以用setFetchSize(int rows)方法设置一个建议值给ResultSet,告诉它每次从数据库Fetch多少条数据。但我不赞成,因为JDBC驱动会根据实际情况自动调整 Fetch的数量。另外性能也与网线的带宽有直接的关系。 相关代码 package com.dlbank.domain; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.Statement; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; /** *<p>title: 数据同步类 </p> *<p>Description: 该类用于将生产核心库数据同步到开发库</p> *@author Tank Zhang */ public class CoreDataSyncImpl implements CoreDataSync { private List<String> coreTBNames; //要同步的核心库表名 private ConnectionFactory connectionFactory; private Logger log = Logger.getLogger(getClass()); private AtomicLong currentSynCount = new AtomicLong(0L); //当前已同步的条数 private int syncThreadNum; //同步的线程数 @Override public void syncData(int businessType) throws Exception { for (String tmpTBName : coreTBNames) { log.info("开始同步核心库" + tmpTBName + "表数据"); // 获得核心库连接 Connection coreConnection = connectionFactory.getDMSConnection(4); Statement coreStmt = coreConnection.createStatement(); //为每个线程分配结果集 ResultSet coreRs = coreStmt.executeQuery("SELECT count(*) FROM "+tmpTBName); coreRs.next(); //总共处理的数量 long totalNum = coreRs.getLong(1); //每个线程处理的数量 long ownerRecordNum =(long) Math.ceil((totalNum / syncThreadNum)); log.info("共需要同步的数据量:"+totalNum); log.info("同步线程数量:"+syncThreadNum); log.info("每个线程可处理的数量:"+ownerRecordNum); // 开启五个线程向目标库同步数据 for(int i=0; i < syncThreadNum; i ++){ StringBuilder sqlBuilder = new StringBuilder(); //拼装后SQL示例 //Select * From dms_core_ds Where id between 1 And 657398 //Select * From dms_core_ds Where id between 657399 And 1314796 //Select * From dms_core_ds Where id between 1314797 And 1972194 //Select * From dms_core_ds Where id between 1972195 And 2629592 //Select * From dms_core_ds Where id between 2629593 And 3286990 //.. sqlBuilder.append("Select * From ").append(tmpTBName) .append(" Where id between " ).append(i * ownerRecordNum +1) .append( " And ") .append((i * ownerRecordNum + ownerRecordNum)); Thread workThread = new Thread( new WorkerHandler(sqlBuilder.toString(),businessType,tmpTBName)); workThread.setName("SyncThread-"+i); workThread.start(); } while (currentSynCount.get() < totalNum); //休眠一会儿让数据库有机会commit剩余的批处理(只针对JUnit单元测试,因为单元测试完成后会关闭虚拟器,使线程里的代码没有机会作提交操作); //Thread.sleep(1000 * 3); log.info( "核心库"+tmpTBName+"表数据同步完成,共同步了" + currentSynCount.get() + "条数据"); } }// end for loop public void setCoreTBNames(List<String> coreTBNames) { this.coreTBNames = coreTBNames; } public void setConnectionFactory(ConnectionFactory connectionFactory) { this.connectionFactory = connectionFactory; } public void setSyncThreadNum(int syncThreadNum) { this.syncThreadNum = syncThreadNum; } //数据同步线程 final class WorkerHandler implements Runnable { ResultSet coreRs; String queryStr; int businessType; String targetTBName; public WorkerHandler(String queryStr,int businessType,String targetTBName) { this.queryStr = queryStr; this.businessType = businessType; this.targetTBName = targetTBName; } @Override public void run() { try { //开始同步 launchSyncData(); } catch(Exception e){ log.error(e); e.printStackTrace(); } } //同步数据方法 void launchSyncData() throws Exception{ // 获得核心库连接 Connection coreConnection = connectionFactory.getDMSConnection(4); Statement coreStmt = coreConnection.createStatement(); // 获得目标库连接 Connection targetConn = connectionFactory.getDMSConnection(businessType); targetConn.setAutoCommit(false);// 设置手动提交 PreparedStatement targetPstmt = targetConn.prepareStatement("INSERT INTO " + targetTBName+" VALUES (?,?,?,?,?)"); ResultSet coreRs = coreStmt.executeQuery(queryStr); log.info(Thread.currentThread().getName()+"'s Query SQL::"+queryStr); int batchCounter = 0; //累加的批处理数量 while (coreRs.next()) { targetPstmt.setString(1, coreRs.getString(2)); targetPstmt.setString(2, coreRs.getString(3)); targetPstmt.setString(3, coreRs.getString(4)); targetPstmt.setString(4, coreRs.getString(5)); targetPstmt.setString(5, coreRs.getString(6)); targetPstmt.addBatch(); batchCounter++; currentSynCount.incrementAndGet();//递增 if (batchCounter % 10000 == 0) { //1万条数据一提交 targetPstmt.executeBatch(); targetPstmt.clearBatch(); targetConn.commit(); } } //提交剩余的批处理 targetPstmt.executeBatch(); targetPstmt.clearBatch(); targetConn.commit(); //释放连接 connectionFactory.release(targetConn, targetPstmt,coreRs); } } } 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2010-11-25
为什么要用java导呢,有没有别的解决方案???这个实在是慢,搞数据仓库的不错。
|
|
返回顶楼 | |
发表时间:2010-11-25
imacback 写道 为什么要用java导呢,有没有别的解决方案???这个实在是慢,搞数据仓库的不错。 呵呵。。。解决方案有很多的;我也是实际项目赶到这儿了;所以就用java线程模式处理了! |
|
返回顶楼 | |
发表时间:2010-11-25
...没有DBA吧。。。说实话用java辅助导入是程序员最不爱干的,不妨试试语句处理
|
|
返回顶楼 | |
发表时间:2010-11-25
我有几个问题:
1) 有没有考虑CLOB/BLOB字段? 2) 没有考虑过一次性查询过多数据, 很可能会有 "ORA-1013", "ORA-1555"? 3) 如果你所处理的情况是target database肯定是没有数据的(也就是说insert时不会产生主键冲突), 你为什么不尝试用dblink? |
|
返回顶楼 | |
发表时间:2010-11-25
Sorry, Informix -> Oracle, DBlink不行
也不会有1) 和 2)的问题, 但是类似问题还是有的 |
|
返回顶楼 | |
发表时间:2010-11-25
用GoldenGate
最优选择 就是比较贵 |
|
返回顶楼 | |
发表时间:2010-11-25
jiayj198609 写道 imacback 写道 为什么要用java导呢,有没有别的解决方案???这个实在是慢,搞数据仓库的不错。
呵呵。。。解决方案有很多的;我也是实际项目赶到这儿了;所以就用java线程模式处理了! 老兄,你怎么随便拿我的文章到处贴,您也在大连银行工作?不过我好象没有见过你 |
|
返回顶楼 | |
发表时间:2010-11-25
有人转了我的文章,我就顺便说一下,计算每个线程数据传输量的算法有bug,有可能会丢一条数据,把原来的long ownerRecordNum =(long) Math.ceil((totalNum / syncThreadNum));
改成 long ownerRecordNum = ((totalNum % syncThreadNum) == 0) ? (totalNum / syncThreadNum):((totalNum / syncThreadNum) + 1);就OK了 |
|
返回顶楼 | |
发表时间:2010-11-25
最后修改:2010-11-25
如果没有大字段,可导出为文本格式,再导入。
将informix导出的文本数据导入oracle数据库。 关于数据的导入,使用ORACLE的SQL Loader。 其它工具: ORACLE Migration bench(支持informix,db2的迁移)。 PS:原来这是转载的? |
|
返回顶楼 | |