`
sunnylocus
  • 浏览: 876665 次
  • 性别: Icon_minigender_1
  • 来自: 美国图森
社区版块
存档分类
最新评论

经验总结:高性能的数据同步

    博客分类:
  • Java
阅读更多

     最近在做一个银行的生产数据脱敏系统,今天写代码时遇到了一个“瓶颈”,脱敏系统需要将生产环境上Infoxmix里的数据原封不动的Copy到另一台Oracle数据库服务器上,然后对Copy后的数据作些漂白处理。为了将人为干预的因素降到最低,在系统设计时采用Java代码对数据作Copy,思路如图

    首先在代码与生产库间建立一个Connection,将读取到的数据放在ResultSet对象,然后再与开发库建立一个Connection。从ResultSet取出数据后通过TestConnection插入到开发库,以此来实现Copy。代码写完后运行程序,速度太慢了,一秒钟只能Copy一千条数据,生产库上有上亿条数据,按照这个速度同步完要到猴年马月呀,用PreparedStatement批处理速度也没有提交多少。我想能不能用多线程处理,多个人干活总比一个人干活速度要快。

    假设生产库有1万条数据,我开5个线程,每个线程分2000条数据,同时向开发库里插数据,Oracle支持高并发这样的话速度至少会提高好多倍,按照这个思路重新进行了编码,批处理设置为1万条一提交,统计插入数量的变量使用java.util.concurrent.atomic.AtomicLong,程序一运行,传输速度飞快CPU利用率在70%~90%,现在一秒钟可以拷贝50万条记录,没过几分钟上亿条数据一条不落地全部Copy到目标库。

 

在查询的时候我用了如下语句

String queryStr = "SELECT * FROM xx";

ResultSet coreRs = PreparedStatement.executeQuery(queryStr);

实习生问如果xx表里有上千万条记录,你全部查询出来放到ResultSet,那内存不溢出了么?Java在设计的时候已经考虑到这个问题了,并没有查询出所有的数据,而是只查询了一部分数据放到ResultSet,数据“用完”它会自动查询下一批数据,你可以用setFetchSize(int rows)方法设置一个建议值给ResultSet,告诉它每次从数据库Fetch多少条数据。但我不赞成,因为JDBC驱动会根据实际情况自动调整Fetch的数量。另外性能也与网线的带宽有直接的关系。

相关代码

package com.dlbank.domain;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.log4j.Logger;

/**
 *<p>title: 数据同步类 </p>  
 *<p>Description: 该类用于将生产核心库数据同步到开发库</p>  
 *@author Tank Zhang 
 */
public class CoreDataSyncImpl implements CoreDataSync {
	
	private List<String> coreTBNames; //要同步的核心库表名
	private ConnectionFactory connectionFactory;
	private Logger log = Logger.getLogger(getClass());
	
	private AtomicLong currentSynCount = new AtomicLong(0L); //当前已同步的条数
	
	private int syncThreadNum;  //同步的线程数

	@Override
	public void syncData(int businessType) throws Exception {
		
		for (String tmpTBName : coreTBNames) {
			log.info("开始同步核心库" + tmpTBName + "表数据");
			// 获得核心库连接
			Connection coreConnection = connectionFactory.getDMSConnection(4);
			Statement coreStmt = coreConnection.createStatement();
			//为每个线程分配结果集
			ResultSet coreRs = coreStmt.executeQuery("SELECT count(*) FROM "+tmpTBName);
			coreRs.next();
			//总共处理的数量
			long totalNum = coreRs.getLong(1);
			//每个线程处理的数量
			long ownerRecordNum =(long) Math.ceil((totalNum / syncThreadNum)); 
			log.info("共需要同步的数据量:"+totalNum);
			log.info("同步线程数量:"+syncThreadNum);
			log.info("每个线程可处理的数量:"+ownerRecordNum);
			// 开启五个线程向目标库同步数据
			for(int i=0; i < syncThreadNum; i ++){
				StringBuilder sqlBuilder = new StringBuilder();
				//拼装后SQL示例
				//Select * From dms_core_ds Where id between 1 And 657398
				//Select * From dms_core_ds Where id between 657399 And 1314796
				//Select * From dms_core_ds Where id between 1314797 And 1972194
				//Select * From dms_core_ds Where id between 1972195 And 2629592
				//Select * From dms_core_ds Where id between 2629593 And 3286990
				//..
				sqlBuilder.append("Select * From ").append(tmpTBName)
						.append(" Where id between " ).append(i * ownerRecordNum +1)
						.append( " And ")
						.append((i * ownerRecordNum + ownerRecordNum));
				Thread workThread = new Thread(
						new WorkerHandler(sqlBuilder.toString(),businessType,tmpTBName));
				workThread.setName("SyncThread-"+i);
				workThread.start();
			}
			while (currentSynCount.get() < totalNum);
			//休眠一会儿让数据库有机会commit剩余的批处理(只针对JUnit单元测试,因为单元测试完成后会关闭虚拟器,使线程里的代码没有机会作提交操作);
			//Thread.sleep(1000 * 3);
			log.info( "核心库"+tmpTBName+"表数据同步完成,共同步了" + currentSynCount.get() + "条数据");
		}
	}// end for loop
	
	public void setCoreTBNames(List<String> coreTBNames) {
		this.coreTBNames = coreTBNames;
	}

	public void setConnectionFactory(ConnectionFactory connectionFactory) {
		this.connectionFactory = connectionFactory;
	}
	
	public void setSyncThreadNum(int syncThreadNum) {
		this.syncThreadNum = syncThreadNum;
	}
	
	//数据同步线程
	final class WorkerHandler implements Runnable {
		ResultSet coreRs;
		String queryStr;
		int businessType;
		String targetTBName;
		public WorkerHandler(String queryStr,int businessType,String targetTBName) {
			this.queryStr = queryStr;
			this.businessType = businessType;
			this.targetTBName = targetTBName;
		}
		@Override
		public void run() {
			try {
				//开始同步
				launchSyncData();
			} catch(Exception e){
				log.error(e);
				e.printStackTrace();
			}
		}
		//同步数据方法
		void launchSyncData() throws Exception{
			// 获得核心库连接
			Connection coreConnection = connectionFactory.getDMSConnection(4);
			Statement coreStmt = coreConnection.createStatement();
			// 获得目标库连接
			Connection targetConn = connectionFactory.getDMSConnection(businessType);
			targetConn.setAutoCommit(false);// 设置手动提交
			PreparedStatement targetPstmt = targetConn.prepareStatement("INSERT INTO " + targetTBName+" VALUES (?,?,?,?,?)");
			ResultSet coreRs = coreStmt.executeQuery(queryStr);
			log.info(Thread.currentThread().getName()+"'s Query SQL::"+queryStr);
			int batchCounter = 0; //累加的批处理数量
			while (coreRs.next()) {
				targetPstmt.setString(1, coreRs.getString(2));
				targetPstmt.setString(2, coreRs.getString(3));
				targetPstmt.setString(3, coreRs.getString(4));
				targetPstmt.setString(4, coreRs.getString(5));
				targetPstmt.setString(5, coreRs.getString(6));
				targetPstmt.addBatch();
				batchCounter++;
				currentSynCount.incrementAndGet();//递增
				if (batchCounter % 10000 == 0) { //1万条数据一提交
					targetPstmt.executeBatch();
					targetPstmt.clearBatch();
					targetConn.commit();
				}
			}
			//提交剩余的批处理
			targetPstmt.executeBatch();
			targetPstmt.clearBatch();
			targetConn.commit();
			//释放连接 
			connectionFactory.release(targetConn, targetPstmt,coreRs);
		}
	}
}

 

6
1
分享到:
评论
6 楼 cfying 2016-09-18  
不会有锁表的现象吗?楼主
5 楼 xiaokang1582830 2012-06-27  
这代码难道没问题,CoreDataSync是哪来的接口
4 楼 sunnylocus 2011-10-13  
cczakai 写道
感觉楼主的多线程理解有点偏差。



如果一条线程下来,1秒copy1000条数据,1分钟6000条,确实很难完成上亿条数据的任务量。



像楼主处理,应该是超线程CPU模式。


这里有篇文章:

随着计算机技术的发展,编程模型也越来越复杂多样化。但多线程编程模型是目前计算机系统架构的最终模型。随着CPU主频的不断攀升,X86架构的硬件已经成为瓶,在这种架构的CPU主频最高为4G。事实上目前3.6G主频的CPU已经接近了顶峰。
  如果不能从根本上更新当前CPU的架构(在很长一段时间内还不太可能),那么继续提高CPU性能的方法就是超线程CPU模式。那么,作业系统、应用程序要发挥CPU的最大性能,就是要改变到以多线程编程模型为主的并行处理系统和并发式应用程序。

http://hi.baidu.com/baixuejiyi1111/blog/item/1eee6ff50bd8d042342acc02.html



哇,好东西呀,多谢!
3 楼 cczakai 2011-10-13  
感觉楼主的多线程理解有点偏差。



如果一条线程下来,1秒copy1000条数据,1分钟6000条,确实很难完成上亿条数据的任务量。



像楼主处理,应该是超线程CPU模式。


这里有篇文章:

随着计算机技术的发展,编程模型也越来越复杂多样化。但多线程编程模型是目前计算机系统架构的最终模型。随着CPU主频的不断攀升,X86架构的硬件已经成为瓶,在这种架构的CPU主频最高为4G。事实上目前3.6G主频的CPU已经接近了顶峰。
  如果不能从根本上更新当前CPU的架构(在很长一段时间内还不太可能),那么继续提高CPU性能的方法就是超线程CPU模式。那么,作业系统、应用程序要发挥CPU的最大性能,就是要改变到以多线程编程模型为主的并行处理系统和并发式应用程序。

http://hi.baidu.com/baixuejiyi1111/blog/item/1eee6ff50bd8d042342acc02.html


2 楼 sunnylocus 2010-11-26  
3
yuhui0531 写道
顺便说一下,计算每个线程数据传输量的算法有bug,有可能会丢一条数据,把原来的long ownerRecordNum =(long) Math.ceil((totalNum / syncThreadNum));
改成 long ownerRecordNum = ((totalNum % syncThreadNum) == 0) ? (totalNum / syncThreadNum):((totalNum / syncThreadNum) + 1);就OK了

这个是我在论坛里的回复
1 楼 yuhui0531 2010-11-26  
顺便说一下,计算每个线程数据传输量的算法有bug,有可能会丢一条数据,把原来的long ownerRecordNum =(long) Math.ceil((totalNum / syncThreadNum));
改成 long ownerRecordNum = ((totalNum % syncThreadNum) == 0) ? (totalNum / syncThreadNum):((totalNum / syncThreadNum) + 1);就OK了

相关推荐

    轻松解决异构数据同步:赶集网CDC数据同步方案实践.

    ### 轻松解决异构数据同步:赶集网CDC数据同步方案实践 #### 异构数据同步背景与挑战 随着互联网技术的发展以及用户需求的不断变化,网站面临越来越多的技术挑战,例如性能优化、跨平台支持、社交化的增强、数据...

    C#编程中关于数据缓存的经验总结

    ### C#编程中关于数据缓存的经验总结 #### 一、引言 在现代软件开发过程中,特别是Web应用开发中,提升程序性能是一项至关重要的任务。数据缓存作为一种提高应用程序性能的有效手段,已经被广泛应用于各种场景之中...

    高性能MySQL 第3版 中文 PDF

    总的来说,《高性能MySQL》第三版是MySQL管理员、开发人员和DBA的必备参考书,它不仅提供了丰富的理论知识,更强调实战技巧和经验分享,对于理解和提升MySQL数据库的高性能运行具有极高的价值。通过阅读本书,读者...

    终稿-李淼-高性能消息数据存储引擎的设计解析.pdf

    ### 高性能消息数据存储引擎的设计解析 #### 一、背景与需求分析 在现代互联网技术背景下,即时通讯(Instant Messaging, IM)已成为人们日常沟通不可或缺的一部分。随着用户数量的不断增长,对消息数据处理的需求...

    dsp SPI 应用经验总结以及例程FIFO

    ### DSP SPI 应用经验总结及例程FIFO #### 一、背景介绍 数字信号处理器(Digital Signal Processor,简称DSP)是一种专门用于快速执行数字信号处理算法的微处理器。TI公司的TMS320F2812(简称F2812)是一款广泛...

    LoadRunnet性能测试经验总结

    ### LoadRunnet性能测试经验总结 #### 一、性能测试规划阶段 **1. 明确测试目标** - **确定压力点**:首先要明确测试的目的,即确定系统的哪些部分需要承受压力测试,例如数据库访问、高并发请求处理等。 - **...

    Java面试题和经验总结2

    ### Java面试题和经验总结2 #### 一、面向对象的特征 面向对象编程(OOP)的核心特性有四个:抽象、继承、封装和多态。 1. **抽象**: - 定义:抽象是指从具体事物中抽取共同特征的过程。在面向对象编程中,抽象...

    qtp学习与实践经验总结

    通过阅读QTP学习与实践经验总结2.doc文档,你将能获得更具体的操作指导和案例分析,进一步提升你的QTP测试技能。 总之,QTP是一个强大且灵活的自动化测试工具,掌握其核心原理和实践经验,将对你的软件测试工作带来...

    otter分布式数据库同步系统 v4.2.18.zip

    版本v4.2.18是Otter的一个稳定版本,包含了对系统性能的优化和一些bug的修复,使得数据同步更加稳定可靠。 一、Otter概述 Otter的核心功能在于解决多数据库之间的数据一致性问题,它能够实现实时的数据迁移、复制和...

    高性能MySQL(第3版).Baron.Scbwartz等.扫描版

    根据提供的文件信息,“高性能MySQL(第3版).Baron.Scbwartz等.扫描版”,我们可以了解到这是一本关于MySQL性能优化的专业书籍。虽然提供的部分内容链接无法直接转化为文本信息,但从标题、描述以及标签来看,这...

    构建oracle高可用环境--企业级高可用数据库架构实战与经验总结

    ### 构建Oracle高可用环境——企业级高可用数据库架构实战与经验总结 在现代企业的IT基础设施中,数据库系统的稳定性与可用性至关重要。一个高效、可靠的数据库架构不仅能保证业务连续性,还能提升用户体验,增强...

    FPGA经验总结

    ### FPGA经验总结:深入解析数字电路设计中的时序与毛刺问题 #### 一、时序设计基础 在数字电路设计领域,特别是针对FPGA/CPLD等可编程逻辑器件,时序设计的重要性不可忽视。它直接影响着系统性能,尤其在高层次...

    FPGA设计经验总结

    ### FPGA设计经验总结 #### 一、Quartus II软件中的设计流程 1. **文本输入与图形输入**: - **文本输入**:通过文本编辑器直接输入VHDL或Verilog HDL等硬件描述语言,适用于逻辑复杂的模块设计。 - **图形输入*...

    性能测试经验总结资料,适宜初学和中级(由于是公司内部资料,不想花分勿下)

    ### 性能测试经验总结资料知识点详述 #### 一、概述 - **编写目的**:此文档旨在为参与性能测试的相关人员(如需求分析师、开发人员、测试人员等)提供一套全面且实用的性能测试指导手册。目的是帮助他们更好地...

    省级BI规范-数据质量管理系统建设方案

    总结,省级BI规范中的数据质量管理系统建设方案旨在通过完善的系统架构和流程,确保省级经营分析系统的数据质量,提高数据分析的准确性和决策的有效性。该方案不仅关注技术层面的实现,还强调组织和流程的管理,形成...

    c# 多线程 同步问题解决

    C# 的多线程编程为创建高性能应用程序提供了强大的支持,但同时也引入了线程同步的挑战。通过采用正确的同步策略和机制,开发者可以确保多线程程序的正确性和性能。lock 语句和 Monitor 类是解决 C# 中多线程同步...

    Hadoop、Spark在七牛数据平台的实战20150730.doc

    Spark的强项在于内存计算和流处理,这与日志处理的实时性和高性能需求相吻合。 7. 总结:七牛数据平台通过选用成熟的大数据工具,构建了一个高效、可扩展且能够处理大规模半结构化日志的数据平台。这个平台强调数据...

Global site tag (gtag.js) - Google Analytics