`

多线程多批量插入大数据

阅读更多

 

参考  https://blog.csdn.net/xunwei0303/article/details/80241340?utm_source=blogxgwz1

 

创建多个线程,每个线程处理一批数据。

 

1. 创建表(mysql)

Sql代码  收藏代码
  1. CREATE TABLE TEST_BATCH_INSERT  
  2. (  
  3.   TEST_ID bigint PRIMARY key,  
  4.   TEST_NAME VARCHAR(100),  
  5.   AGE INT(5),  
  6.   CREATE_TIME DATETIME DEFAULT current_timestamp,  
  7.   UPDATE_TIME DATETIME DEFAULT current_timestamp  
  8. ) comment '测试批量插入';  

 

2. java bean

Java代码  收藏代码
  1. public class TestBatchInsertInfo {  
  2.     private Long testId;  
  3.   
  4.     private String testName;  
  5.   
  6.     private Integer age;  
  7.   
  8.     private Date createTime;  
  9.   
  10.     private Date updateTime;  
  11.   
  12.     // 省略getter/setter  
  13. }  

 

3. dao

Java代码  收藏代码
  1. public interface ITestBatchInsertMapper {  
  2.   
  3.     void batchInsert(List<TestBatchInsertInfo> list);  
  4. }  

 

4. mapper.xml

Xml代码  收藏代码
  1. <insert id="batchInsert" parameterType="java.util.List">  
  2.     INSERT INTO TEST_BATCH_INSERT   
  3.     (  
  4.      TEST_ID, TEST_NAME, AGE, CREATE_TIME, UPDATE_TIME  
  5.     )  
  6.     VALUES  
  7.     <foreach collection="list" item="log" index"index" separator =",">  
  8.         (  
  9.         #{log.testId, jdbcType=NUMERIC}, #{log.testName, jdbcType=VARCHAR}, #{log.age, jdbcType=NUMERIC},   
  10.         sysdate(), sysdate()  
  11.         )  
  12.     </foreach>  
  13.   </insert>  

 

5. 多线程

Java代码  收藏代码
  1. public class TestBatchInsertThread implements Runnable {  
  2.   
  3.     private ITestBatchInsertMapper testBatchInsertMapper;  
  4.   
  5.     /** 数据集合 */  
  6.     private List<TestBatchInsertInfo> list;  
  7.     /** 每个线程处理的起始数据 */  
  8.     private CountDownLatch begin;  
  9.     /** 每个线程处理的结束数据 */  
  10.     private CountDownLatch end;  
  11.   
  12.     public TestBatchInsertThread() {  
  13.     }  
  14.   
  15.     public TestBatchInsertThread(List<TestBatchInsertInfo> list, CountDownLatch begin, CountDownLatch end,  
  16.             ITestBatchInsertMapper testBatchInsertMapper) {  
  17.         this.list = list;  
  18.         this.begin = begin;  
  19.         this.end = end;  
  20.         this.testBatchInsertMapper = testBatchInsertMapper;  
  21.     }  
  22.   
  23.     @Override  
  24.     public void run() {  
  25.         try {  
  26.             if (list != null && !list.isEmpty()) {  
  27.                 testBatchInsertMapper.batchInsert(list);  
  28.             }  
  29.             // 执行完让线程直接进入等待  
  30.             begin.await();  
  31.         } catch (Exception e) {  
  32.             e.printStackTrace();  
  33.         } finally {  
  34.             // 当一个线程执行完 了计数要减一不然这个线程会被一直挂起  
  35.             end.countDown();  
  36.         }  
  37.     }  
  38. }  

 

6. service

多线程处理的方法是 batchInsertByThread;

 

普通批量处理的方法是 batchInsert

 

 

@Service(value = "testBatchInsertService")
public class TestBatchInsertServiceImpl implements ITestBatchInsertService {

    @Autowired
    private ITestBatchInsertMapper testBatchInsertMapper;

    @Override
    @Transactional
    public void batchInsertByThread(List<TestBatchInsertInfo> list) throws Exception {

        if (list == null || list.isEmpty()) {
            return;
        }
        // 一个线程处理300条数据
        int count = 1000;
        // 数据集合大小
        int listSize = list.size();
        // 开启的线程数
        int runSize = (listSize / count) + 1;
        // 存放每个线程的执行数据
        List<TestBatchInsertInfo> newList = null;
        // 创建一个线程池,数量和开启线程的数量一样
        ExecutorService executor = Executors.newFixedThreadPool(runSize);
        // 创建两个个计数器
        CountDownLatch begin = new CountDownLatch(1);
        CountDownLatch end = new CountDownLatch(runSize);

        for (int i = 0; i < runSize; i++) {
            /* 计算每个线程执行的数据 */
            if ((i + 1) == runSize) {
                int startIdx = (i * count);
                int endIdx = list.size();

                newList = list.subList(startIdx, endIdx);
            } else {
                int startIdx = (i * count);
                int endIdx = (i + 1) * count;

                newList = list.subList(startIdx, endIdx);
            }
            TestBatchInsertThread thread = new TestBatchInsertThread(newList, begin, end, testBatchInsertMapper);

            executor.execute(thread);
        }
        begin.countDown();
        end.await();

        executor.shutdown();
    }

    @Override
    public void batchInsert(List<TestBatchInsertInfo> list) {

        if (list == null || list.isEmpty()) {
            return;
        }

        List<TestBatchInsertInfo> tempList = new LinkedList<>();

        for (int i = 0; i < list.size(); i++) {
            
            tempList.add(list.get(i));
            
            if (i % 1000 == 0) {
                testBatchInsertMapper.batchInsert(tempList);
                tempList.clear();
            }
        }
        testBatchInsertMapper.batchInsert(tempList);
    }
}

 

7. junit4 测试方法

Java代码  收藏代码
  1. import java.util.LinkedList;  
  2. import java.util.List;  
  3.   
  4. import org.junit.Test;  
  5. import org.junit.runner.RunWith;  
  6. import org.springframework.beans.factory.annotation.Autowired;  
  7. import org.springframework.boot.test.context.SpringBootTest;  
  8. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;  
  9.   
  10. import com.jieshun.springboot.mybatis.MybatisApplication;  
  11. import com.jieshun.springboot.mybatis.bean.po.TestBatchInsertInfo;  
  12. import com.jieshun.springboot.mybatis.service.ITestBatchInsertService;  
  13.   
  14.   
  15. @RunWith(SpringJUnit4ClassRunner.class)  
  16. @SpringBootTest(classes = MybatisApplication.class/*, webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT*/)  
  17. public class TestBatchInsertService {  
  18.   
  19.     @Autowired  
  20.     private ITestBatchInsertService testBatchInsertService;  
  21.   
  22.     @Test  
  23.     public void batchInsertByThread() {  
  24.   
  25.         long startTime = System.currentTimeMillis();  
  26.   
  27.         try {  
  28.             List<TestBatchInsertInfo> list = new LinkedList<>();  
  29.   
  30.             TestBatchInsertInfo info = null;  
  31.   
  32.             for (int i = 0; i < 100301; i++) {  
  33.   
  34.                 Integer ig = i;  
  35.   
  36.                 info = new TestBatchInsertInfo();  
  37.                 info.setTestId(ig.longValue());  
  38.                 info.setTestName("test名称_" + i);  
  39.                 info.setAge(i);  
  40.   
  41.                 list.add(info);  
  42.             }  
  43.   
  44.             testBatchInsertService.batchInsertByThread(list);  
  45.   
  46.             System.out.println("------Batch Insert Success------");  
  47.   
  48.         } catch (Exception e) {  
  49.             e.printStackTrace();  
  50.         }  
  51.         System.out.println("耗时(ms):" + (System.currentTimeMillis() - startTime));  
  52.     }  
  53.   
  54.     @Test  
  55.     public void batchInsert() {  
  56.   
  57.         long startTime = System.currentTimeMillis();  
  58.   
  59.         try {  
  60.             List<TestBatchInsertInfo> list = new LinkedList<>();  
  61.   
  62.             TestBatchInsertInfo info = null;  
  63.   
  64.             for (int i = 0; i < 100301; i++) {  
  65.   
  66.                 Integer ig = i;  
  67.   
  68.                 info = new TestBatchInsertInfo();  
  69.                 info.setTestId(ig.longValue());  
  70.                 info.setTestName("test名称_" + i);  
  71.                 info.setAge(i);  
  72.   
  73.                 list.add(info);  
  74.             }  
  75.   
  76.             testBatchInsertService.batchInsert(list);  
  77.   
  78.             System.out.println("------Batch Insert Success------");  
  79.   
  80.         } catch (Exception e) {  
  81.             e.printStackTrace();  
  82.         }  
  83.         System.out.println("耗时(ms):" + (System.currentTimeMillis() - startTime));  
  84.   
  85.     }  
  86. }  

 

8. springboot 启动类

Java代码  收藏代码
  1. import org.mybatis.spring.annotation.MapperScan;  
  2. import org.springframework.boot.SpringApplication;  
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;  
  4. import org.springframework.context.annotation.ComponentScan;  
  5. import org.springframework.transaction.annotation.EnableTransactionManagement;  
  6.   
  7. /** 
  8.  * 应用启动类 
  9.  *  
  10.  * @author  
  11.  * @date 2018年10月17日 
  12.  * @since JDK 1.8 
  13.  */  
  14. @SpringBootApplication  
  15. @EnableTransactionManagement  
  16. @ComponentScan(basePackages = { "com.jieshun.springboot.mybatis" })  
  17. @MapperScan(basePackages = { "com.jieshun.springboot.mybatis.dao" })  
  18. public class MybatisApplication {  
  19.   
  20.     public static void main(String[] args) {  
  21.         SpringApplication.run(MybatisApplication.class, args);  
  22.     }  
  23.   
  24. }  

 

 

本文转自:http://xurichusheng.iteye.com/blog/2433024

分享到:
评论

相关推荐

    C#大数据批量插入Access程序

    - **优化数据读取**:如果数据来自文件,可以使用多线程并行读取,提高数据加载速度。 - **批量操作间隔**:在大量数据插入过程中,适当加入延迟,防止对数据库造成过大压力。 这个"C#大数据批量插入Access程序"的...

    C#处理大容量数据,及多线程简单应用

    C#作为一种强大的编程语言,提供了多种策略来高效地管理大数据并优化多线程应用,以提高性能和用户体验。以下将详细介绍“C#处理大容量数据,及多线程简单应用”这一主题。 首先,当我们面临大量数据时,一个关键的...

    socket 大数据并列接收存数据库小列子(带多线程模拟数据)

    在处理大数据时,可能需要考虑分片(Sharding)、分区(Partitioning)或者批量插入(Bulk Insert)等策略,以提高写入效率和查询性能。此外,事务的正确性和一致性也很重要,尤其是在多线程环境下,需要确保数据的...

    java多线程实现大批量数据导入源码

    在Java编程中,多线程技术是处理大数据批量导入或导出的重要手段。它能有效提升程序执行效率,尤其在数据库操作这样的I/O密集型任务中。本项目以"java多线程实现大批量数据导入源码"为题,旨在通过多线程策略将大量...

    Java多线程Executors批量执行数据实现限流

    Java多线程实现数据切割批量执行,实现限流操作。 java线程池Executors实现数据批量操作。 批量异步Executors处理数据,实现限流操作,QPS限流。 线程池调用第三方接口限流实现逻辑。 案例适合: 1.批量处理大数据。...

    EasyExcel 并发读取文件字段并进行校验,数据写入到新文件,批量插入数据到数据库

    EasyExcel提供了一种多线程并发读取Excel数据的方式,通过`readSheetHandler`和`executeParallel`方法,可以将读取任务分配到多个线程中执行。每个线程独立处理一部分行数据,这样可以显著提升数据处理速度。 3. *...

    android sqlite 批量插入数据优化代码

    6. **考虑使用异步操作**:如果可能,使用线程或异步任务进行批量插入,以免阻塞主线程,提高用户体验。 7. **数据分批**:对于非常大的数据集,可以考虑分批插入,每次插入一部分数据,避免一次性加载全部数据导致...

    C# Access 大数据量 批量 效率 快速 导入

    在"C#大数据效率批量插入Access-demo"这个示例项目中,很可能是提供了一个实际的C#代码示例,演示了如何使用上述方法将大量数据高效地导入到Access数据库。通过分析和学习这个示例,开发者可以了解并掌握在C#环境下...

    Sqlserver大数据量插入速度慢或丢失数据的解决方法

    1. **并发控制不当**:多线程或多个进程同时插入数据时,如果没有适当的并发控制机制,可能会导致数据丢失。 2. **事务管理问题**:如果在事务未提交的情况下就发生异常或者中断,则可能导致部分数据未能成功写入...

    JdbcTemplate的批量方法使用

    在这个场景中,我们将详细探讨如何使用`JdbcTemplate`进行批量插入和删除操作。 批量插入操作在数据库处理大量数据时非常常见,它可以显著提高性能,因为数据库通常会优化批量处理,而不是单独处理每一项。在提供的...

    MySql 快速插入千万级大数据的方法示例

    本文将探讨如何优化这一过程,以实现快速插入,并通过具体的Java多线程和MySQL批量插入策略来提升性能。 首先,我们需要理解传统的逐条插入方式在大数据量下效率极低,因为它涉及到频繁的数据库交互,这会消耗大量...

    C#中海量数据的批量插入和更新[顶].pdf

    在C#中处理海量数据的批量插入和更新是一项常见的任务,尤其是在大数据应用或者ETL(提取、转换、加载)流程中。尽管ADO.NET在某些方面可能不如JDBC提供那么直观和便捷的批量操作API,但它仍然提供了高效的方法来...

    利用Java多线程技术导入数据到Elasticsearch的方法步骤

    本文将介绍使用Java的多线程技术来提高数据导入效率的方法步骤,以期帮助读者更好地理解并应用这一技术。 ### Java多线程技术 Java中的多线程技术是指在同一个程序中可以同时运行多个线程来执行多个任务。Java提供...

    elasticsearch进行批量插入的时候总是少数据

    一、问题 现有a表和b表,两张mysql数据库的表,需要把两张表的数据取共同字段,合并并导入es中,其中a表共有数据1000条,b表共有数据1200条,a表和b表的主键id都是从1开始递增的,结果导入的时候显示成功导入2200条...

    java链接并对hbase进行增删改查操作的实例代码(含批量插入,范围查询等,并包含所需jar包)

    Java链接HBase进行增删改查操作是大数据领域常见的任务,尤其在处理大规模分布式存储时。HBase,作为Apache Hadoop生态...在实际项目中,你可能需要根据需求调整这些操作,比如增加过滤器、使用多线程处理批量操作等。

    超大xml解析导入数据库、千万级别大数据导出到Excel。实现核心:高性能、分段、分页循环:读取-写入-清空内存。解.zip

    同时,数据库端可能需要使用批量插入或者存储过程来提高插入速度。此外,预处理SQL语句、索引优化以及适当的事务管理也是提升性能的关键。 其次,将千万级别的大数据导出到Excel,通常会遇到Excel文件大小限制和...

    java对大数据的处理.pdf

    总结起来,Java在处理大数据时,主要采用了分批读取、批量入库、多线程并行处理和线程池管理等策略。这些技术能够有效优化内存使用,提高处理效率,同时防止服务器过载。在实际应用中,需要根据服务器配置和数据量...

    spring Batch实现数据库大数据量读写

    通过批处理操作,JdbcBatchItemWriter 可以高效地执行大量的数据库插入、更新操作,提高性能。 ### 5. 批处理作业配置 在 Spring Batch 中,作业(Job)由一个或多个步骤(Step)组成。每个步骤通常包含读取、处理...

    Hbase调用JavaAPI实现批量导入操作

    - 使用多线程并行处理:可以将数据分成多个部分,每个部分在单独的线程中导入。 - 控制批大小:合理设置批量操作的大小,避免内存压力过大或过小导致性能问题。 - 合理配置HBase:调整HBase的Region大小,负载...

Global site tag (gtag.js) - Google Analytics