`
85977328
  • 浏览: 1898894 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

java并发(二十四)多线程结果组装

 
阅读更多
本文主要介绍多线程的结果组装。其中可以忽略2处代码创建线程池的区别,请关注其他的业务逻辑代码。全部代码已经在附件中上传。如有疑问,请跟帖留言,笔者会予以答复。信号量相比自旋锁的优点很多,性能、代码简单。自旋锁不停得sleep并唤醒,而信号量的底层采用了wait进行编程,只唤醒一次即可。因此性能优越许多。


【自旋锁】
通常,我们会使用自旋锁进行多线程结果组装。这样的性能非常差。比如,大数据库表的多线程查询,无状态服务器的任务分发后的汇总等。
示例代码如下
package com.chinaso.search.spinlocks.service;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
import java.util.concurrent.TimeUnit;

import com.chinaso.search.ServerSemaphore;
import com.chinaso.search.dao.DataDao;

/**
 * piaohailin 2014-3-21
 */
public class DataService {
    private final DataDao dataDao = new DataDao();

    /**
     * 多线程查询数据库
     * @param userId
     * @return
     * @throws Exception
     */
    public List<String> getAllData(final String userId) throws Exception {
        /**
         * 其中 第一个参数为初始空闲 
         * 第二个参数为最大线程 
         * 第三个参数为超时时间 
         * 第四个参数是超时时间的单位
         * 第五个参数是当超过最大线程数以后,可以放在队列中的线程 
         * 第六个参数 
         * 第七个参数是线程池塞满时候的策略
         */
        int corePoolSize = 2;
        int maximumPoolSize = 3;
        long keepAliveTime = 0;
        TimeUnit unit = TimeUnit.NANOSECONDS;
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(5);
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        /**
         * AbortPolicy 如果总线成熟超过maximumPoolSize + workQueue
         * ,则跑异常java.util.concurrent.RejectedExecutionException
         */
        RejectedExecutionHandler handler = new AbortPolicy();

        // ExecutorService 为线程池的接口
        ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        List<Future<List<String>>> futures = new ArrayList<Future<List<String>>>();

        final int count = dataDao.getCount(userId); // 总记录数
        System.out.println("count=" + count);
        // 如果总记录数小于设置的阈值,就直接单线程查询
        int threadCount = ServerSemaphore.threadCount;
        if (count < ServerSemaphore.hold) {
            threadCount = 1;
        }
        int section = count / threadCount; // 区间大小
        // 创建线程
        for (int i = 0; i < threadCount; i++) {
            final int begin = i * section;
            final int end;
            // 最后一个区间判断
            if ((i + 1) == threadCount) {
                end = count;
            } else {
                end = (i + 1) * section;
            }
            System.out.print("begin=" + begin);
            System.out.print(",end=" + end);
            System.out.println(",size=" + (end - begin));

            // 根据总记录数count和线程数Server.threadCount进行分页任务分发
            Future<List<String>> future = executor.submit(new Callable<List<String>>() {
                @Override
                public List<String> call() throws Exception {
                    List<String> data = new ArrayList<String>();
                    try {
                        data = dataDao.find(userId, begin, end);
                    } catch (Throwable t) {
                        t.printStackTrace();
                    }
                    return data;
                }
            });
            futures.add(future);
        }

        this.waitForDone(futures);

        // 结果的组装
        List<String> reuslt = new ArrayList<String>();
        for (Future<List<String>> future : futures) {
            List<String> tmp = future.get();
            reuslt.addAll(tmp);
        }
        return reuslt;
    }

    private void waitForDone(List<Future<List<String>>> futures) {
        boolean done = false;
        while (!done) {
            done = true;
            for (Future<List<String>> future : futures) {
                future.isDone();
                if (!future.isDone()) {
                    done = false;
                    try {
                        Thread.sleep(50);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    break;
                }
            }
        }
    }
}


【信号量】
代码相比线程轮询更简单,思路清晰,而且可以共用一个线程池来进行全局的控制。当所有线程完成任务的时候,唤醒组装线程进行结果的反馈。
package com.chinaso.search.semaphore.service;

import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import com.chinaso.search.ServerSemaphore;
import com.chinaso.search.dao.DataDao;
import com.chinaso.search.semaphore.concurrent.DataSemaphore;

/**
 * piaohailin
 * 2014-3-22
 */
public class DataService {
    private Executor executor = Executors.newFixedThreadPool(4); // 共用线程池,是为了从全局角度,叫多线程可控
    private final DataDao dataDao = new DataDao();

    /**
     * 多线程查询数据库
     * @param userId
     * @return
     * @throws Exception
     */
    public List<String> getAllData(final String userId) throws Exception {
        final DataSemaphore semaphore = new DataSemaphore(0);

        final int count = dataDao.getCount(userId); // 总记录数
        System.out.println("count=" + count);
        // 如果总记录数小于设置的阈值,就直接单线程查询
        int threadCount = ServerSemaphore.threadCount;
        if (count < ServerSemaphore.hold) {
            threadCount = 1;
        }
        int section = count / threadCount; // 区间大小
        // 创建线程
        for (int i = 0; i < threadCount; i++) {
            final int begin = i * section;
            final int end;
            // 最后一个区间判断
            if ((i + 1) == threadCount) {
                end = count;
            } else {
                end = (i + 1) * section;
            }
            System.out.print("begin=" + begin);
            System.out.print(",end=" + end);
            System.out.println(",size=" + (end - begin));

            // 根据总记录数count和线程数Server.threadCount进行分页任务分发
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        List<String> data = dataDao.find(userId, begin, end);
                        semaphore.fillData(data);
                    } catch (Throwable t) {
                        t.printStackTrace();
                    } finally {
                        // 执行成功后,发放授权
                        semaphore.release();
                    }
                }
            });
        }

        semaphore.acquire(threadCount); // 等待授权数量满足条件,放行
        return semaphore.getResult();
    }
}


【CompletionService】
CompletionService将Executor和BlockingQueue的功能融合在一起。你可以将Callable任务提交给它来执行,然后使用类似于队列操作的take和poll等方法来获得已经完成的结果,而这些结果会在完成时被封装为Future。ExecutorCompletionService实现了CompletionService,并将计算部分委托给一个Executor。
package com.chinaso.search.completionservice.service;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;

import com.chinaso.search.ServerSemaphore;
import com.chinaso.search.dao.DataDao;
import com.chinaso.search.semaphore.concurrent.DataSemaphore;

/**
 * piaohailin
 * 2014-3-22
 */
public class DataService {
    private Executor executor = Executors.newCachedThreadPool(); // 共用线程池,是为了从全局角度,叫多线程可控
    private final DataDao dataDao = new DataDao();

    /**
     * 多线程查询数据库
     * 
     * @param userId
     * @return
     * @throws Exception
     */
    public List<String> getAllData(final String userId) throws Exception {
        final int count = dataDao.getCount(userId); // 总记录数
        System.out.println("count=" + count);
        // 如果总记录数小于设置的阈值,就直接单线程查询
        int threadCount = ServerSemaphore.threadCount;
        if (count < ServerSemaphore.hold) {
            threadCount = 1;
        }
        int section = count / threadCount; // 区间大小
        // 创建线程
        CompletionService<List<String>> completionService = new ExecutorCompletionService<List<String>>(executor);// 多线程任务管理
        for (int i = 0; i < threadCount; i++) {
            final int begin = i * section;
            final int end;
            // 最后一个区间判断
            if ((i + 1) == threadCount) {
                end = count;
            } else {
                end = (i + 1) * section;
            }
            System.out.print("begin=" + begin);
            System.out.print(",end=" + end);
            System.out.println(",size=" + (end - begin));

            // 根据总记录数count和线程数Server.threadCount进行分页任务分发

            completionService.submit(new Callable<List<String>>() {
                @Override
                public List<String> call() throws Exception {
                    List<String> data = null;
                    try {
                        data = dataDao.find(userId, begin, end);
                    } catch (Throwable t) {
                        t.printStackTrace();
                    } finally {
                        if (data == null) {
                            data = new ArrayList<String>();
                        }
                    }
                    return data;
                }
            });
        }

        List<String> result = new ArrayList<String>();
        for (int i = 0; i < threadCount; i++) {
            result.addAll(completionService.take().get()); // 取得结果,如果没有返回,则阻塞
        }
        return result;
    }
}


【更新历史】
2014-04-26
上传1.1版本,增加CompletionService的接口用法。

2014-05-04
上传1.2版本,重构service,使代码结构更清晰易读。
  • 大小: 104.8 KB
2
0
分享到:
评论

相关推荐

    java多线程查询数据库

    本文将详细探讨如何利用Java的多线程技术和线程池来实现并发查询数据库,以及相关的文件`BatchDataUtil.java`和`BatchDataRunnable.java`可能涉及的关键知识点。 ### 1. 多线程并发查询 多线程并发查询允许我们将一...

    13-Java并发编程学习宝典.zip

    "13-Java并发编程学习宝典.zip" 包含了一系列关于Java并发编程的学习资源,旨在帮助开发者掌握多线程编程的核心技术和最佳实践。以下是这些资源所涵盖的关键知识点: 1. **多线程基础** - "03 多线程开发如此简单—...

    基于Java多线程与线程安安全实践-基于Http协议的断点续传的毕业设计,通过分析用户的网络环境和网络状况,确定合适的分包大小

    设计采用多线程技术,开启多个线程同时处理多个用户的下载请求,提高系统的并发处理能力和吞吐量。同时采用线程安全技术,通过锁的使用避免多个线程同时修改同一资源,保证数据的一致性和准确性。 具体实现中,设计了...

    飞鸽传书JAVA源代码

    此外,JAVA的多线程技术也是实现飞鸽传书并发处理的关键。通过Thread或Runnable接口,可以创建多个执行线程,分别处理不同的通信请求,提高系统的并发处理能力。 总的来说,通过分析这个JAVA实现的飞鸽传书源代码,...

    java常见错误集合以及描述

    **描述**:当多线程环境下尝试修改正在遍历的数据结构时会抛出此异常。 **解决方案**: 1. **使用迭代器的remove方法**:使用Iterator的remove方法安全地删除元素。 2. **使用并发集合类**:如ConcurrentHashMap等...

    高级Java经典面试题2019

    #### 多线程并发与并行 - **并发**:指一个系统内多个进程或线程交替执行的过程,每个任务都可能被执行一小段时间,然后让位给其他任务,使得多个任务看起来同时运行。 - **并行**:指多个任务在同一时刻执行,通常...

    EJB概述(下) Java数据库 操作

    1. **组装应用程序**:将多个组件组合成一个完整的应用程序,包括但不限于EJB Beans、客户端、Applets、Servlets等。 2. **定义部署策略**:确定应用程序的部署方式,比如部署到何种服务器环境、如何处理并发请求等...

    java-lan-chat-system.rar_聊天 文件 java

    在Java中,多线程是实现并发处理的关键。聊天系统通常需要同时处理用户的输入、发送消息、接收消息等多个任务,因此会使用多线程来保证系统的响应速度和效率。 4. **Socket编程**: Java的Socket类提供了在网络中...

    Java大文件传输示例额

    此外,为了优化性能,可以考虑使用多线程并发传输数据块,或者使用Java NIO的非阻塞I/O,以提高文件传输的效率。同时,合理设置数据块的大小也很关键,过大可能导致内存压力,过小则可能增加网络开销。 总结来说,...

    软件开发+Java+核心知识

    本篇文章将详细探讨Java的核心知识,包括JVM(Java虚拟机)、多线程、泛型以及Spring框架。 首先,我们来深入理解JVM,它是Java程序运行的基石。JVM负责将字节码解释为机器码执行,提供了一个与硬件无关的运行环境...

    java实现CMPP3.0源代码

    9. **多线程处理**:为了提高并发处理能力,Java实现通常会使用多线程技术,分别处理连接、发送、接收和状态报告等任务。 10. **日志记录**:良好的日志记录是调试和排查问题的关键,源代码会包含日志模块,记录与...

    JAVA文件传输(论文+源代码).rar

    总之,"JAVA文件传输(论文+源代码)"提供的资料可以帮助学习者全面理解Java环境下的文件传输技术,包括基础的网络编程、多线程并发、数据流管理和高级特性,对于提升Java网络编程技能和毕业设计能力非常有帮助。...

    java银行系统完整代码

    6. **多线程与并发控制**:银行系统可能面临大量并发请求,因此需要使用Java的线程机制,如`Thread`类、`ExecutorService`和`synchronized`关键字,来处理多线程环境下的任务执行和同步问题。 7. **Spring框架**:...

    基于Java的会议室管理系统源码 .zip

    7. **并发与多线程**:在处理多个预定请求时,Java的并发API(如ExecutorService、Semaphore等)可以保证系统的高效运行。 8. **异常处理**:通过try-catch-finally语句和自定义异常,提高程序的健壮性。 【Spring...

    34 谁都不能偷懒-通过 CompletableFuture 组装你的异步计算单元.pdf

    `CompletableFuture`是Java 8引入的一个用于异步编程的重要工具,它的设计目的是解决多步骤计算任务中的依赖管理问题,以提高并发性能。传统的并发编程通常涉及到线程池和同步机制,但`CompletableFuture`提供了一种...

    模拟tomcat的工作原理

    在模拟Tomcat的过程中,了解并实现这些步骤可以帮助我们更好地理解Web服务器的工作流程,特别是Java的多线程模型在其中的应用。同时,标签"java tomcat"提示我们需要关注的是与Java和Tomcat相关的技术,例如Servlet...

Global site tag (gtag.js) - Google Analytics