`
Donald_Draper
  • 浏览: 981128 次
社区版块
存档分类
最新评论

百万级数据-程序迁移后续

阅读更多
百万级数据-程序迁移:http://donald-draper.iteye.com/blog/2327909
    在上面这一篇文章中,内存为2G的情况下,单线程分页数为10万,批量保存为5000的情况下,更新120万的数据,需要耗时20分钟左右,同时JVM被占满,由于以前认为数据更新一次就少,就没有优化;后来一次更新的记录达到百万,应用扛不住,现在总于抽出时间,来做一些优化。以前是单线程处理所有分页,现在每页通过一个线程去更新,每个线程获取一个jdbc连接,注意数据量过大,分页数小的情况下,jdbc连个可能同时需要建立多个,我们要保证数据库允许最大连接数够用,Oracle默认为100。
今天打算每一页用一个线程去更新,主要思路如下:
#######需要用到的变量
pageUpateSize:分页数
threadPoolSize:线程数
batchSize:批量保存数

sums为需要更新的记录数,我们测试的为126万,大于10万才分页更新
##############################
主线程核心代码:
ExecutorService exec = null;
int batches = 0;
if( sums > 100000){
	if(sums % pageUpateSize ==0){
		batches = sums/pageUpateSize;
	}
	else{
		batches = sums/pageUpateSize  + 1;
	}
}
AtomicInteger counts = new AtomicInteger(0);//更新记录数计数器
CountDownLatch doneSignal = new CountDownLatch(batches); 
exec = Executors.newFixedThreadPool(threadPoolSize);
for(int i =1;i<=batches;i++){
        //getConnection(),获取数据库连接
	exec.submit(new PageUpdateThread(getConnection(), tableName,(i-1)*pageUpateSize+1,(i)*pageUpateSize,counts,doneSignal));
}
doneSignal.await();//等待所有分页线程结束
logger.info("============All Insert Sizes:"+counts.get());




分页更新线程:
/**
 * 分页更新线程
 * @author donald
 * @date 2017-4-13
 * @time 下午4:37:07
 */
public class PageUpdateThread implements Runnable {
	private static final Logger log = LoggerFactory.getLogger(PageUpdateThread.class);
	private static int batchSize = 2500;
	private Connection con;
	private String tableName;
	private int startPos;
	private int endPos;
	private final  AtomicInteger totalCount;
	private final CountDownLatch doneSignal;
	private SynService synService = null;
	private String threadName;
	/**
	 * 
	 * @param con
	 * @param tableName
	 * @param startPos
	 * @param endPos
	 * @param totalCount
	 * @param doneSignal
	 */
	public PageUpdateAllThread(Connection con, String tableName,
			 int startPos, int endPos,
			AtomicInteger totalCount,CountDownLatch doneSignal) {
		super();
		this.con = con;
		this.startPos = startPos;
		this.endPos = endPos;
		this.totalCount = totalCount;
		this.doneSignal = doneSignal;
	}
	/**
	 * 
	 */
	private void init(){
		synService = new SynService();
		threadName = Thread.currentThread().getName();
	}
	@Override
	public void run() {
		init();
		try {
			log.info(threadName+"正在更新记录:"+startPos+","+endPos);
			work();
			log.info(threadName+"更新记录完毕:"+startPos+","+endPos);
		} catch (BatchUpdateException e) {
			e.printStackTrace();
		} catch (SQLException e) {
			e.printStackTrace();
		}
		finally{
			doneSignal.countDown();
		}
	}
	/**
	 * 
	 * @throws BatchUpdateException
	 * @throws SQLException
	 */
	private void work() throws BatchUpdateException, SQLException{
		ResultSet addRs = null;
		PreparedStatement ps = null;
		List<PageData> insertList = new ArrayList<PageData>();
		//分页语句
		String sql = "SELECT * FROM (SELECT t.*, ROWNUM as rowno FROM ( SELECT * FROM "
				+ tableName
				+ " ORDER BY CREATETIME"
				+ " ) t WHERE ROWNUM <= ?)" + " WHERE rowno >= ?";
		log.info(threadName+"======Search insert records sql:" + sql + ",startPos:"
				+ startPos + ",endPos:" + endPos);
		int counts = 0;// 记录数
		try {
			ps = con.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE,
					ResultSet.CONCUR_READ_ONLY);
			ps.setInt(1, endPos);
			ps.setInt(2, startPos);
			addRs = ps.executeQuery();
			while (addRs.next()) {
				HashMap dataMap = null;
				dataMap = switch(addRs);//将记录放在Map中
				insertList.add(pd);
				if (counts % batchSize == 0 && counts > 0) {
					long childStartTime = System.currentTimeMillis();
					synService.batchInsertSync(tableName + "Mapper.save", insertList);
					long childEndTime = System.currentTimeMillis();
					log.info(threadName+"保存2500记录所用时间s:"
							+ (childEndTime - childStartTime) / 1000.00);
					insertList.clear();
					log.info(threadName+"============Records:" + counts);
				}
				if (addRs.isLast()) {
					synService.batchInsertSync(tableName + "Mapper.save", insertList);
					insertList.clear();
				}

				pd = null;
				counts++;
				totalCount.incrementAndGet();
			}
		} catch (SQLException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
		finally {
		        insertList = null;
			sql = null;
			if (addRs != null) {
				addRs.close();
				addRs = null;
			}
			if (ps != null) {
				ps.close();
				ps = null;
			}
			if (con != null) {
				con.close();
			}
		}
	}
}

下面我们来测试:
########################################
硬件环境如下:
硬件酷睿i7,4核处理器,JVM内存2G,记录数126万,数据库oracle
#########################################
JVM虚拟机参数配置:
-server
-XX:+UseConcMarkSweepGC
-XX:+PrintGCDetails
-Xloggc:E:\gc.log


测试的过程中,我们主要调试着3个参数:
pageUpateSize:分页数
threadPoolSize:线程数
batchSize:批量保存数


先调线程数:
参数设置及内存消耗和所用时间情况:
线程数,分页数,批量保存数,消耗内存最大值(G),耗时(s)
8,30000,  5000, 1.039, 353.661

Jconsole内存使用情况,垃圾回收次数和时间:
时间: 
2017-04-13 14:50:43
已用: 
 1,023,612 KB
已提交: 
 1,155,084 KB
最大值: 
 2,038,528 KB
GC 时间: 
ParNew上的      45.212 秒 (4,582收集)
ConcurrentMarkSweep上的       0.620 秒 (20收集)

VisualVM-内存使用情况图:




参数设置及内存消耗和所用时间情况:
线程数,分页数,批量保存数,消耗内存最大值(G),耗时(s)
12,30000,  5000, 1.645, 254.612

Jconsole内存使用情况,垃圾回收次数和时间:
时间: 
2017-04-13 15:09:40
已用: 
   296,974 KB
已提交: 
 1,741,000 KB
最大值: 
 2,038,528 KB
GC 时间: 
ParNew上的      42.666 秒 (3,767收集)
ConcurrentMarkSweep上的       0.177 秒 (15收集)

VisualVM-内存使用情况图:




相对于8个线程,使用时间减少99秒,新生代和老年代的垃圾回收次数和时间减少,但同时内存的最大使用量增大了500M左后,
这是因为线程内更新的有记录,线程数增量,相应的峰值内存增加,内存占用变化较大。

参数设置及内存消耗和所用时间情况:
线程数,分页数,批量保存数,消耗内存最大值(G),耗时(s)
16,30000,  5000, 1.607,187.303

Jconsole内存使用情况,垃圾回收次数和时间:
时间: 
2017-04-13 15:23:08
已用: 
   840,564 KB
已提交: 
 1,770,688 KB
最大值: 
 2,038,528 KB
GC 时间: 
ParNew上的      29.268 秒 (2,560收集)
ConcurrentMarkSweep上的       0.163 秒 (13收集)

VisualVM-内存使用情况图:



相对于12个线程,使用时间减少67秒,新生代和老年代的垃圾回收次数和时间减少,但同时内存的最大使用量没有多大变化,
线程数增量。
小节:
在分页数和批量保存数相同的情况下,线程数越多,所用时间越少,
同时所耗内存最大值变大,新生代和老年代的垃圾回收次数和时间减少;

下面来调分页数量参数:
参数设置及内存消耗和所用时间情况:
线程数,分页数,批量保存数,消耗内存最大值(G),耗时(s)
8,30000,  5000, 1.039, 353.661
Jconsole内存使用情况,垃圾回收次数和时间:
时间: 
2017-04-13 14:50:43
已用: 
 1,023,612 KB
已提交: 
 1,155,084 KB
最大值: 
 2,038,528 KB
GC 时间: 
ParNew上的      45.212 秒 (4,582收集)
ConcurrentMarkSweep上的       0.620 秒 (20收集)
以上面参数配置,内存使用量,耗时作为对比基础


参数设置及内存消耗和所用时间情况:
线程数,分页数,批量保存数,消耗内存最大值(G),耗时(s)
8,20000,  5000, 0.851, 411.734

Jconsole内存使用情况,垃圾回收次数和时间:
时间: 
2017-04-13 15:40:23
已用: 
   202,855 KB
已提交: 
   893,508 KB
最大值: 
 2,038,528 KB
GC 时间: 
ParNew上的      42.290 秒 (4,530收集)
ConcurrentMarkSweep上的       0.236 秒 (23收集)

VisualVM-内存使用情况图:



在8个线程和批量保存数为5000的情况下,分页数量减少10000,内存的峰值减少了188M,
但所耗时间增加了58秒,新生代和老年代的垃圾回收次数和时间增加;

参数设置及内存消耗和所用时间情况:
线程数,分页数,批量保存数,消耗内存最大值(G),耗时(s)
8,10000,  5000, 0.622, 696.168

Jconsole内存使用情况,垃圾回收次数和时间:
时间: 
2017-04-13 16:07:15
已用: 
   398,122 KB
已提交: 
   622,284 KB
最大值: 
 2,038,528 KB
GC 时间: 
ParNew上的      41.930 秒 (4,970收集)
ConcurrentMarkSweep上的       0.301 秒 (29收集)

VisualVM-内存使用情况图:



在线程数和批量保存数相同情况下,
在8个线程和批量保存数为5000的情况下,分页数量减少,内存的峰值减少,
但所耗时间增加,新生代和老年代的垃圾回收次数和时间增加;

再来调批量保存数量:
以上一个参数配置,内存使用量,耗时作为对比基础作为对比基础
参数设置及内存消耗和所用时间情况:
线程数,分页数,批量保存数,消耗内存最大值(G),耗时(s)
8,10000,  2500, 0.474, 664.131

Jconsole内存使用情况,垃圾回收次数和时间:
时间: 
2017-04-13 16:31:18
已用: 
   256,886 KB
已提交: 
   535,884 KB
最大值: 
 2,038,528 KB
GC 时间: 
ParNew上的      38.173 秒 (4,693收集)
ConcurrentMarkSweep上的       0.380 秒 (32收集)

VisualVM-内存使用情况图:



在8个线程,分页数为10000的情况下,批量保存数减少,内存的峰值减少,
但所耗时间增加,新生代和老年代的垃圾回收次数和时间增加;


总结:
在内存充足的情况,线程数量越多,所耗时间越少,新生代和老年代的垃圾回收次数和时间减少,但同时内存的峰值越大,在本次测试中,更新126万数据,内存峰值为1.607G,所耗时间为187.303s;平均每秒处理6737条记录,这个是在硬件酷睿i7,4核处理器,JVM内存2G情况下取得,假设有2个CPU,CPU为8核的,总共有16线程,内存为8G,保守估计,再想同时记录数和批量保存记录数的情况下每秒可以处理的记录数为(2x8x8)/(1x4x2)x6737,约每秒11万,拿相同时间可以处理的记录数来看,187.303s可以处理2057万数据。当然线程不是越多要好,凡是要有一个度,以前看过一篇文章,线程数最好为2xSum(CPU)xCore(CPU);从测试来看当线程数为Sum(CPU)xCore(CPU)的4倍时,性能相当好,不过这个要看具体的场景,仁者见仁智者见智啦;
如果内存不够用的情况,对处理耗时没有要求的话,我们可以减少线程数,分页数和批量保存数;现在有一台服务,奶奶的普通PC,4G的内存,由于服务器上还有其他数据库在跑,最后JVM只有650M的内存可用,由于应用服务为Server模式,默认为内存的1/4,但现在只有650M,由于应用没有时间上的要求,所以取线程数为8,分页数为10000,批量保存数为2500,更新120万的数据,10分钟还是可以接受的,内存峰值还不到500M。


  • 大小: 33.5 KB
  • 大小: 46.8 KB
  • 大小: 37.8 KB
  • 大小: 49.5 KB
  • 大小: 46.7 KB
  • 大小: 53.9 KB
0
0
分享到:
评论

相关推荐

    虚拟化技术-冷迁移概念.pptx

    - 在冷迁移过程中,如果选择了移至其他数据存储的选项,则迁移完成后会自动删除源主机和数据存储上的旧版本虚拟机,简化了后续的清理工作。 #### 三、冷迁移的实施方法 冷迁移支持三种不同的实施方式,以满足不同...

    Hadoop数据迁移--从Oracle向Hadoop

    在具体的实施过程中,可能还需要进行一系列的准备工作和后续的优化措施,比如调整Hadoop的配置参数以获得更好的性能,或者在数据迁移前对数据进行清洗和预处理,以确保数据质量。此外,数据迁移完成后,还需要进行...

    数据迁移整合的方案的报告.pdf

    数据迁移整合是IT领域中常见的任务,特别是在系统升级或新旧系统替换时。在这个报告中,我们关注的是如何有效地进行历史数据迁移和不同系统间的整合,以确保业务的连续性和系统的稳定性。 首先,新老系统迁移整合的...

    MySQL数据迁移到postgresql必备手册.pdf

    MySQL到PostgreSQL的数据迁移是一项常见的任务,特别是在企业需要利用PostgreSQL的特定功能或者希望转换到一个更开放的数据库系统时。以下是对标题和描述中提到的知识点的详细说明: 1. **数据迁移工具:Navicat ...

    MYCAT数据扩容+数据迁移

    - **安装MySQL客户端**: 确保MYCAT所在的环境中已安装MySQL客户端程序,这有助于后续的数据迁移操作。 - **添加JDBC驱动**: 在MYCAT的`lib`目录下添加MySQL的JDBC驱动包。这是为了确保MYCAT能够正确地与MySQL数据库...

    (迁移成分分析TCA)迁移学习算法程序实现_TCA迁移学习_TCA_迁移学习_迁移成分分析算法代码_

    在提供的"(迁移成分分析TCA)迁移学习算法程序实现.docx"文件中,你可能会找到TCA算法的具体Python代码实现,包括数据预处理、PCA计算、协方差矩阵处理、优化算法和应用变换的函数等。通过阅读和理解这段代码,你...

    人大金仓安装与数据迁移(windows)

    本文将详细介绍如何在Windows操作系统上安装人大金仓数据库(kingbase SE V8),以及如何进行数据迁移,特别是从MySQL迁移到金仓数据库。 首先,我们需要从人大金仓官方网站下载数据库安装包和授权码。下载地址为:...

    sqlserver和oracle数据迁移方案

    这些示例可以帮助评估迁移后的性能、数据完整性等问题,并提供必要的基准数据用于后续的性能调优工作。 总之,从SQL Server到Oracle或反之的数据迁移是一项复杂的工作,涉及到多个层面的知识和技术。通过上述详尽的...

    mysql与oracle数据迁移工具

    - 验证与测试:迁移后,必须验证数据的完整性,并进行功能测试,确保应用程序能在Oracle环境中正常运行。 6. 注意事项: - 权限管理:在迁移过程中,确保有必要的权限来操作两个数据库。 - 时间窗口:尽可能在低...

    IBM DB2,Oracle,MS-SQL Server,MySQL数据库数据迁移方法图解

    数据迁移是指将数据从一个系统移动到另一个系统的过程,这个过程可能涉及到不同的硬件、操作系统、文件系统或应用程序。对于企业而言,随着业务的发展和技术的进步,进行数据迁移变得越来越常见,例如升级数据库系统...

    数据迁移操作说明1

    这会根据`_InitialCreate.cs`或后续创建的迁移文件来调整数据库结构,同时保持已有数据完整。 5. **自动数据迁移**: 要设置自动数据迁移,在DbContext的构造函数中,调用`Database.SetInitializer`方法,传入一个...

    Maximo 迁移管理

    这包括确定哪些数据、配置和应用程序需要迁移,以及迁移后的系统应该如何运作。需求分析有助于识别潜在的问题和挑战,为后续的迁移计划制定提供依据。 ##### 2.2 工具选择 - **数据加载工具**:为了确保数据的准确...

    DXWB 数据迁移 SAP

    DXWB 数据迁移是 SAP 系统中用于将外部数据导入的一种方法,特别是涉及到与业务对象交互时,如资产主数据的转移。DXWB(Data Transfer Workbench)利用 IDoc(Intermediate Document)格式来处理这些外部数据,因为...

    (原)mssql数据迁移到oracle

    10. 后续维护:迁移完成后,别忘了更新任何依赖数据库的应用程序配置,并进行系统和数据验证,确保一切正常运行。 总的来说,从mssql到Oracle的数据迁移是一个涉及多个步骤和技术的复杂过程,需要对两种数据库系统...

    金仓数据库数据迁移服务KDTS使用手册.pdf

    - **增量迁移**:首次进行全量迁移后,后续只迁移新增或修改的数据。 - **定时迁移**:根据设定的时间周期自动执行数据迁移任务。 **3.4 迁移任务监控** KDTS提供实时的迁移任务监控功能,可以查看迁移进度、错误...

    [系统管理]数据迁移工具使用说明.pdf

    1. **启动数据迁移工具**:通常可以通过双击程序图标或者从命令行启动工具。启动后,用户界面会显示出来,准备进行后续操作。 2. **创建数据迁移项目**:在工具中创建一个新的项目,这一步主要是为了组织和管理...

    Mysql迁移Oracle方案

    - **新建DBA用户**:如“cssdj_zsy”,用于后续的数据迁移操作。同时,为该用户设置足够的权限,以便能够创建表空间和其他对象。 - **创建表空间TS_CSSDJ**:根据实际需求预设足够的存储空间。 - **设置字符集utf8**...

    基于某某平台的数据源迁移oracle -mysql

    ### 基于某某平台的数据源迁移:Oracle 至 MySQL #### 一、背景 本文档旨在介绍一种从 Oracle 数据库迁移到 MySQL 的方案。起初,项目使用的是本地 Oracle 数据库作为数据源,但考虑到每次运行系统都需要手动启动...

    mysql迁移oracle

    - 确保迁移后的 Oracle 数据库用户权限设置正确,以免影响后续的数据库管理和使用。 通过以上步骤,您可以成功地将 MySQL 数据库的数据迁移到 Oracle 数据库中。然而,迁移过程可能会因具体环境和需求有所不同,...

    数据库迁移工具安装程序

    综上所述,数据库迁移工具安装程序应该是为了简化和自动化上述过程而设计的,它应该包含数据迁移、对象转换、权限迁移等功能,并可能提供自动化脚本和报告功能,以帮助IT人员更高效、安全地完成数据库迁移任务。...

Global site tag (gtag.js) - Google Analytics