`
androidssh
  • 浏览: 114902 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

java(多线程)实现高性能数据同步

    博客分类:
  • java
阅读更多

    需要将生产环境上Infoxmix里的数据原封不动的Copy到另一台 Oracle数据库服务器上,然后对Copy后的数据作些漂白处理。为了将人为干预的因素降到最低,在系统设计时采用Java代码对数据作Copy,思路 如图:



    首 先在代码与生产库间建立一个Connection,将读取到的数据放在ResultSet对象,然后再与开发库建立一个Connection。从 ResultSet取出数据后通过TestConnection插入到开发库,以此来实现Copy。代码写完后运行程序,速度太慢了,一秒钟只能Copy 一千条数据,生产库上有上亿条数据,按照这个速度同步完要到猴年马月呀,用PreparedStatement批处理速度也没有提交多少。我想能不能用多 线程处理,多个人干活总比一个人干活速度要快。
    假设生产库有1万条数据,我开5个线程,每个线程分2000条数据,同时向开发库里插数据,Oracle支持高并发这样的话速度至少会提高好多倍,按照这 个思路重新进行了编码,批处理设置为1万条一提交,统计插入数量的变量使用 java.util.concurrent.atomic.AtomicLong,程序一运行,传输速度飞快CPU利用率在70%~90%,现在一秒钟可 以拷贝50万条记录,没过几分钟上亿条数据一条不落地全部Copy到目标库。

在查询的时候我用了如下语句
String queryStr = "SELECT * FROM xx";
ResultSet coreRs = PreparedStatement.executeQuery(queryStr);

实习生问如果xx表里有上千万条记录,你全部查询出来放到ResultSet, 那内存不溢出了么?Java在设计的时候已经考虑到这个问题了,并没有查询出所有的数据,而是只查询了一部分数据放到ResultSet,数据“用完”它 会自动查询下一批数据,你可以用setFetchSize(int rows)方法设置一个建议值给ResultSet,告诉它每次从数据库Fetch多少条数据。但我不赞成,因为JDBC驱动会根据实际情况自动调整 Fetch的数量。另外性能也与网线的带宽有直接的关系。
相关代码

package com.dlbank.domain; 
 
import java.sql.Connection; 
import java.sql.PreparedStatement; 
import java.sql.ResultSet; 
import java.sql.Statement; 
import java.util.List; 
import java.util.concurrent.atomic.AtomicLong; 
 
import org.apache.log4j.Logger; 
 
/**
*<p>title: 数据同步类 </p>  
*<p>Description: 该类用于将生产核心库数据同步到开发库</p>  
*@author Tank Zhang 
*/ 
public class CoreDataSyncImpl implements CoreDataSync { 
     
    private List<String> coreTBNames; //要同步的核心库表名 
    private ConnectionFactory connectionFactory; 
    private Logger log = Logger.getLogger(getClass()); 
     
    private AtomicLong currentSynCount = new AtomicLong(0L); //当前已同步的条数 
     
    private int syncThreadNum;  //同步的线程数 
 
    @Override 
    public void syncData(int businessType) throws Exception { 
         
        for (String tmpTBName : coreTBNames) { 
            log.info("开始同步核心库" + tmpTBName + "表数据"); 
            // 获得核心库连接 
            Connection coreConnection = connectionFactory.getDMSConnection(4); 
            Statement coreStmt = coreConnection.createStatement(); 
            //为每个线程分配结果集 
            ResultSet coreRs = coreStmt.executeQuery("SELECT count(*) FROM "+tmpTBName); 
            coreRs.next(); 
            //总共处理的数量 
            long totalNum = coreRs.getLong(1); 
            //每个线程处理的数量 
            long ownerRecordNum =(long) Math.ceil((totalNum / syncThreadNum));  
            log.info("共需要同步的数据量:"+totalNum); 
            log.info("同步线程数量:"+syncThreadNum); 
            log.info("每个线程可处理的数量:"+ownerRecordNum); 
            // 开启五个线程向目标库同步数据 
            for(int i=0; i < syncThreadNum; i ++){ 
                StringBuilder sqlBuilder = new StringBuilder(); 
                //拼装后SQL示例 
                //Select * From dms_core_ds Where id between 1 And 657398 
                //Select * From dms_core_ds Where id between 657399 And 1314796 
                //Select * From dms_core_ds Where id between 1314797 And 1972194 
                //Select * From dms_core_ds Where id between 1972195 And 2629592 
                //Select * From dms_core_ds Where id between 2629593 And 3286990 
                //.. 
                sqlBuilder.append("Select * From ").append(tmpTBName) 
                        .append(" Where id between " ).append(i * ownerRecordNum +1) 
                        .append( " And ") 
                        .append((i * ownerRecordNum + ownerRecordNum)); 
                Thread workThread = new Thread( 
                        new WorkerHandler(sqlBuilder.toString(),businessType,tmpTBName)); 
                workThread.setName("SyncThread-"+i); 
                workThread.start(); 
            } 
            while (currentSynCount.get() < totalNum); 
            //休眠一会儿让数据库有机会commit剩余的批处理(只针对JUnit单元测试,
//因为单元测试完成后会关闭虚拟器,使线程里的代码没有机会作提交操作); 
            //Thread.sleep(1000 * 3); 
            log.info( "核心库"+tmpTBName+"表数据同步完成,共同步了" + currentSynCount.get() + "条数据"); 
        } 
    }// end for loop 
     
    public void setCoreTBNames(List<String> coreTBNames) { 
        this.coreTBNames = coreTBNames; 
    } 
 
    public void setConnectionFactory(ConnectionFactory connectionFactory) { 
        this.connectionFactory = connectionFactory; 
    } 
     
    public void setSyncThreadNum(int syncThreadNum) { 
        this.syncThreadNum = syncThreadNum; 
    } 
     
    //数据同步线程 
    final class WorkerHandler implements Runnable { 
        ResultSet coreRs; 
        String queryStr; 
        int businessType; 
        String targetTBName; 
        public WorkerHandler(String queryStr,int businessType,String targetTBName) { 
            this.queryStr = queryStr; 
            this.businessType = businessType; 
            this.targetTBName = targetTBName; 
        } 
        @Override 
        public void run() { 
            try { 
                //开始同步 
                launchSyncData(); 
            } catch(Exception e){ 
                log.error(e); 
                e.printStackTrace(); 
            } 
        } 
        //同步数据方法 
        void launchSyncData() throws Exception{ 
            // 获得核心库连接 
            Connection coreConnection = connectionFactory.getDMSConnection(4); 
            Statement coreStmt = coreConnection.createStatement(); 
            // 获得目标库连接 
            Connection targetConn = connectionFactory.getDMSConnection(businessType); 
            targetConn.setAutoCommit(false);// 设置手动提交 
            PreparedStatement targetPstmt =
targetConn.prepareStatement("INSERT INTO " + targetTBName+" VALUES (?,?,?,?,?)"); 
            ResultSet coreRs = coreStmt.executeQuery(queryStr); 
            log.info(Thread.currentThread().getName()+"'s Query SQL::"+queryStr); 
            int batchCounter = 0; //累加的批处理数量 
            while (coreRs.next()) { 
                targetPstmt.setString(1, coreRs.getString(2)); 
                targetPstmt.setString(2, coreRs.getString(3)); 
                targetPstmt.setString(3, coreRs.getString(4)); 
                targetPstmt.setString(4, coreRs.getString(5)); 
                targetPstmt.setString(5, coreRs.getString(6)); 
                targetPstmt.addBatch(); 
                batchCounter++; 
                currentSynCount.incrementAndGet();//递增 
                if (batchCounter % 10000 == 0) { //1万条数据一提交 
                    targetPstmt.executeBatch(); 
                    targetPstmt.clearBatch(); 
                    targetConn.commit(); 
                } 
            } 
            //提交剩余的批处理 
            targetPstmt.executeBatch(); 
            targetPstmt.clearBatch(); 
            targetConn.commit(); 
            //释放连接  
            connectionFactory.release(targetConn, targetPstmt,coreRs); 
        } 
    } 

 

分享到:
评论

相关推荐

    JAVA 线程实现数据库的主从同步更新

    在Java编程环境中,...总之,使用Java线程实现数据库主从同步更新是一种常见且实用的技术手段,它涉及到多线程编程、数据库操作、事务管理等多个方面。理解和掌握这些知识点对于开发高可用性的分布式系统至关重要。

    Java多线程实现异步调用实例

    总之,Java多线程和异步调用是构建高效、响应迅速的应用程序的关键技术。通过合理利用这些工具和机制,开发者可以编写出能够充分利用多核处理器优势的代码,从而提高软件性能。在实际应用中,理解并熟练掌握这些概念...

    java多线程处理数据库数据

    在Java编程中,多线程处理是提升程序性能和效率的重要手段,特别是在处理大量数据库数据时。本主题将深入探讨如何使用Java的并发包(java.util.concurrent)来实现多线程对数据库数据的批量处理,包括增、删、改等...

    java多线程进度条

    总之,实现Java多线程进度条涉及线程同步、共享数据更新以及UI更新的协调。理解这些核心概念,并根据具体需求选择合适的方法,是构建高效、用户友好进度条的关键。在ProgressTest这个示例项目中,你可能会找到更多...

    java 多线程同步

    Java多线程同步是Java编程中关键的并发概念,它涉及到如何在多个线程访问共享资源时保持数据的一致性和完整性。`java.util.concurrent`包是Java提供的一个强大的并发工具库,它为开发者提供了多种线程安全的工具,...

    Java多线程优化百万级数据

    本篇将深入探讨如何利用Java多线程技术来优化这种高负载场景。 首先,理解Java多线程的基础至关重要。在Java中,线程是程序执行的最小单元,可以通过实现`Runnable`接口或继承`Thread`类来创建线程。创建多线程的...

    Java多线程同步具体实例讲解 .doc

    Java多线程同步是编程中一个非常重要的概念,特别是在并发编程和高并发系统设计中起到关键作用。在Java中,为了保证线程安全,避免数据竞争和不一致的状态,我们通常会使用同步机制来控制对共享资源的访问。本文将...

    java多线程实现五子棋游戏

    在Java编程领域,多线程技术是实现...综上所述,"java多线程实现五子棋游戏"项目涵盖了多线程编程、网络通信和棋盘游戏逻辑等多个方面的知识,通过Java的特性实现了在线对战的五子棋游戏,具有较高的技术含量和趣味性。

    JAVAJAVA多线程教学演示系统论文

    总的来说,《JAVA多线程教学演示系统》这篇论文不仅是对JAVA多线程技术的理论探讨,更是一个实践与教学相结合的优秀范例,对于提升学生的多线程编程能力具有很高的参考价值。VB图书管理系统论文范文虽不在本文讨论...

    java多线程设计

    总结,Java多线程设计是构建高性能、高并发应用的基础。通过理解并合理使用不可变对象,我们可以有效预防多线程环境中的非安全问题,确保程序的稳定性和正确性。在实际开发中,结合各种线程同步机制和并发工具,可以...

    JAVA多线程编程技术PDF

    总结起来,“JAVA多线程编程技术PDF”涵盖了多线程的基本概念、同步机制、线程通信、死锁避免、线程池以及线程安全的集合类等内容。通过深入学习这份资料,开发者可以全面掌握Java多线程编程技术,提升程序的并发...

    Java多线程技术精讲

    总的来说,Java多线程技术包括线程的创建、管理、同步、通信以及异常处理等多个方面。深入理解和熟练掌握这些知识,对于编写高效、可靠的并发程序至关重要。在实际开发中,结合《Java多线程编程核心技术_完整版 带...

    基于Java多线程实现所有顶点间最短路径的并行算法

    实验证明,在多核处理器环境下,通过Java多线程技术实现的并行算法相较于传统的单线程Dijkstra算法,在处理大规模图时具有明显的性能提升。尤其是在顶点数量较大时,这种提升更加明显。 #### 结论 本文提出了一种...

    多线程导入excel 数据

    在Java编程中,多线程导入Excel数据是一项常见的任务,特别是在大数据处理和高并发场景下。这个场景通常涉及到性能优化和资源管理,以确保系统稳定性和数据一致性。下面将详细阐述多线程导入Excel数据的核心知识点。...

    现代多线程 JAVA和c++多线程实现 测试和调试

    本资源主要探讨了如何在JAVA和C++中实现多线程,以及相关的测试和调试技术。 在JAVA中,多线程的实现主要依赖于`Thread`类和`Runnable`接口。开发者可以通过直接继承`Thread`类或实现`Runnable`接口来创建新的线程...

    java多线程详解(比较详细的阐述了多线程机制)

    总之,Java多线程是构建高性能并发应用的基础,理解并掌握线程的创建、同步、通信、协作模式以及异常处理,对于编写高效、稳定的Java程序至关重要。在实际开发中,结合Java提供的工具和设计模式,能够更好地解决多...

    java多线程代码案例(创建线程,主线程,线程优先级,线程组,线程同步,线程间的通信)

    本文将深入探讨Java多线程中的关键知识点,包括创建线程、主线程、线程优先级、线程组、线程同步以及线程间的通信。 1. **创建线程** 在Java中,可以通过两种方式创建线程:继承`Thread`类或实现`Runnable`接口。...

    java 多线程示例

    Java多线程是Java编程中的重要概念,尤其在开发高性能、高并发的应用中不可或缺。本示例旨在为初学者提供一个全面理解Java多线程的起点。通过学习这个实例,你可以掌握如何创建和管理线程,理解线程同步与通信的重要...

    java 多线程交互简单范例

    这个压缩包中的文件提供了几个关于Java多线程交互的实例,可以帮助我们深入理解如何在并发环境中控制线程的同步,确保数据的一致性和安全性。 首先,让我们讨论一下标题和描述中提到的关键概念——“多线程交互”和...

Global site tag (gtag.js) - Google Analytics