`
85977328
  • 浏览: 1903533 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

java并发(二十)线程池

 
阅读更多
基本介绍
线程池(Thread Pool)对于限制应用程序中同一时刻运行的线程数很有用。因为每启动一个新线程都会有相应的性能开销,每个线程都需要给栈分配一些内存等等。

我们可以把并发执行的任务传递给一个线程池,来替代为每个并发执行的任务都启动一个新的线程。只要池里有空闲的线程,任务就会分配给一个线程执行。在线程池的内部,任务被插入一个阻塞队列(Blocking Queue ),线程池里的线程会去取这个队列里的任务。当一个新任务插入队列时,一个空闲线程就会成功的从队列中取出任务并且执行它。


线程池经常应用在多线程服务器上。每个通过网络到达服务器的连接都被包装成一个任务并且传递给线程池。线程池的线程会并发的处理连接上的请求。以后会再深入有关 Java 实现多线程服务器的细节。

Java 5 在 java.util.concurrent 包中自带了内置的线程池,所以你不用非得实现自己的线程池。你可以阅读我写的 java.util.concurrent.ExecutorService 的文章以了解更多有关内置线程池的知识。不过无论如何,知道一点关于线程池实现的知识总是有用的。

这里有一个简单的线程池实现:
package com.chinaso.search.phl;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * 线程池类,他是一个容器
 * 
 * @author piaohailin
 * 
 */
public class ThreadPool {
    private BlockingQueue<Runnable> taskQueue = null; // 任务队列
    private List<PoolThread> threads = new ArrayList<PoolThread>(); // 执行线程
    private boolean isStopped = false; // 线程池运行状态

    /**
     * 
     * @param noOfThreads
     *            线程数
     * @param maxNoOfTasks
     *            队列数
     */
    public ThreadPool(int noOfThreads, int maxNoOfTasks) {
        taskQueue = new ArrayBlockingQueue<Runnable>(maxNoOfTasks);
        for (int i = 0; i < noOfThreads; i++) {
            threads.add(new PoolThread(taskQueue));
        }
        for (PoolThread thread : threads) {
            thread.start();
        }
    }

    /**
     * 提交任务
     * @param task
     */
    public synchronized void execute(Runnable task) {
        if (this.isStopped)
            throw new IllegalStateException("ThreadPool is stopped");
        try {
            this.taskQueue.put(task);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 关闭线程池
     * @return
     */
    public synchronized boolean stop() {
        this.isStopped = true;
        for (PoolThread thread : threads) {
            thread.interrupt();
        }
        return this.isStopped;
    }
}


package com.chinaso.search.phl;

import java.util.concurrent.BlockingQueue;

/**
 * 线程对象,他运行在线程池中
 * 
 * @author piaohailin
 * 
 */
public class PoolThread extends Thread {
    private BlockingQueue<Runnable> taskQueue = null; // 从线程池传递过来的任务队列引用
    private boolean isStopped = false;// 线程运行状态

    public PoolThread(BlockingQueue<Runnable> queue) {
        taskQueue = queue;
    }

    /**
     * 无限循环,阻塞调用
     */
    public void run() {
        while (!isStopped()) {
            try {
                Runnable runnable = taskQueue.take();//阻塞方法,如果队列里没有任务,则在此一直等待
                runnable.run();
            } catch (Exception e) {
                // 写日志或者报告异常,
                // 但保持线程池运行.
                e.printStackTrace();
            }
        }
    }

    public synchronized void toStop() {
        isStopped = true;
        this.interrupt(); // 打断池中线程的 dequeue() 调用.
    }

    public synchronized boolean isStopped() {
        return isStopped;
    }
}



线程池的实现由两部分组成。类 ThreadPool 是线程池的公开接口,而类 PoolThread 用来实现执行任务的子线程。

为了执行一个任务,方法 ThreadPool.execute(Runnable r) 用 Runnable 的实现作为调用参数。在内部,Runnable 对象被放入 阻塞队列 (Blocking Queue),等待着被子线程取出队列。

一个空闲的 PoolThread 线程会把 Runnable 对象从队列中取出并执行。你可以在 PoolThread.run() 方法里看到这些代码。执行完毕后,PoolThread 进入循环并且尝试从队列中再取出一个任务,直到线程终止。

调用 ThreadPool.stop() 方法可以停止 ThreadPool。在内部,调用 stop 先会标记 isStopped 成员变量(为 true)。然后,线程池的每一个子线程都调用 PoolThread.stop() 方法停止运行。注意,如果线程池的 execute() 在 stop() 之后调用,execute() 方法会抛出 IllegalStateException 异常。

子线程会在完成当前执行的任务后停止。注意 PoolThread.stop() 方法中调用了 this.interrupt()。它确保阻塞在 taskQueue.dequeue() 里的 wait() 调用的线程能够跳出 wait() 调用(校对注:因为执行了中断interrupt,它能够打断这个调用),并且抛出一个 InterruptedException 异常离开 dequeue() 方法。这个异常在 PoolThread.run() 方法中被截获、报告,然后再检查 isStopped 变量。由于 isStopped 的值是 true, 因此 PoolThread.run() 方法退出,子线程终止。

java执行器(Executors)
java.util.concurrent中包括三个Executor接口:

Executor,一个运行新任务的简单接口。
ExecutorService,扩展了Executor接口。添加了一些用来管理执行器生命周期和任务生命周期的方法。
ScheduledExecutorService,扩展了ExecutorService。支持Future和定期执行任务。
通常来说,指向Executor对象的变量应被声明为以上三种接口之一,而不是具体的实现类。

Executor接口
Executor接口只有一个execute方法,用来替代通常创建(启动)线程的方法。例如:r是一个Runnable对象,e是一个Executor对象。
//可以使用
e.execute(r);
//来代替
(new Thread(r)).start();

但execute方法没有定义具体的实现方式。对于不同的Executor实现,execute方法可能是创建一个新线程并立即启动,但更有可能是使用已有的工作线程运行r,或者将r放入到队列中等待可用的工作线程。(我们将在线程池一节中描述工作线程。)

ExecutorService接口
ExecutorService接口在提供了execute方法的同时,新加了更加通用的submit方法。submit方法除了和execute方法一样可以接受Runnable对象作为参数,还可以接受Callable对象作为参数。使用Callable对象可以能使任务返还执行的结果。通过submit方法返回的Future对象可以读取Callable任务的执行结果,或是管理Callable任务和Runnable任务的状态。

ExecutorService也提供了批量运行Callable任务的方法。最后,ExecutorService还提供了一些关闭执行器的方法。如果需要支持即时关闭,执行器所执行的任务需要正确处理中断。

ScheduledExecutorService接口
ScheduledExecutorService扩展ExecutorService接口并添加了schedule方法。调用schedule方法可以在指定的延时后执行一个Runnable或者Callable任务。ScheduledExecutorService接口还定义了按照指定时间间隔定期执行任务的scheduleAtFixedRate方法和scheduleWithFixedDelay方法。

结果组装CompletionService
CompletionService将Executor和BlockingQueue的功能融合在一起。你可以将Callable任务提交给它来执行,然后使用类似于队列操作的take和poll等方法来获得已经完成的结果,而这些结果会在完成时被封装为Future。ExecutorCompletionService实现了CompletionService,并将计算部分委托给一个Executor。
示例代码
Executor executor = Executors.newCachedThreadPool();
CompletionService<List<String>> completionService = new ExecutorCompletionService<List<String>>(executor);
completionService.submit(new Callable<List<String>>() {
    @Override
    public List<String> call() throws Exception {
        List<String> data = new ArrayList<String>();
        return data;
    }
});
List<String> result = new ArrayList<String>();
for (int i = 0; i < threadCount; i++) {
    result.addAll(completionService.take().get()); // 取得结果,如果没有返回,则阻塞
}


任务取消
一般在Executor任务框架里,任务是在线程中执行的。有固定数量的线程从阻塞的任务队列中,获取任务然后执行。任务取消有多种方式:
1.用户请求取消
2.有时间限制的操作
3.应用程序事件
4.错误
5.关闭

java线程池使用
在java.util.concurrent包中多数的执行器实现都使用了由工作线程组成的线程池,工作线程独立于所它所执行的Runnable任务和Callable任务,并且常用来执行多个任务。 使用工作线程可以使创建线程的开销最小化。

在大规模并发应用中,创建大量的Thread对象会占用占用大量系统内存,分配和回收这些对象会产生很大的开销。一种最常见的线程池是固定大小的线程池。这种线程池始终有一定数量的线程在运行,如果一个线程由于某种原因终止运行了,线程池会自动创建一个新的线程来代替它。需要执行的任务通过一个内部队列提交给线程,当没有更多的工作线程可以用来执行任务时,队列保存额外的任务。 使用固定大小的线程池一个很重要的好处是可以实现优雅退化。例如一个Web服务器,每一个HTTP请求都是由一个单独的线程来处理的,如果为每一个HTTP都创建一个新线程,那么当系统的开销超出其能力时,会突然地对所有请求都停止响应。如果限制Web服务器可以创建的线程数量,那么它就不必立即处理所有收到的请求,而是在有能力处理请求时才处理。 Executors类提供了下列一下方法:
  • newCachedThreadPool方法创建了一个可扩展的线程池。适合用来启动很多短任务的应用程序。将线程池的最大大小设置为Integer.MZX_VALUE。而将基本大小设置为0,超时时间设置为1分钟。
  • newSingleThreadExecutor方法创建了每次执行一个任务的执行器。
  • newFixedThreadPool方法将线程池的基本大小和最大大小设置额外icanshuzhong指定的值。而且创建的线程池不会超时。

还有一些创建ScheduledExecutorService执行器的方法。

原始接口使用
    
/**
     * 其中
     * 第一个参数为初始空闲
     * 第二个参数为最大线程
     * 第三个参数为超过coresize的空闲线程回收等待时间
     * 第四个参数是第三个参数的时间单位
     * 第五个参数是当超过最大线程数以后,可以放在队列中的线程
     * 第六个参数
     * 第七个参数是线程池任务队列塞满时候的饱和策略
     */
    private static int                      corePoolSize    = 1;
    private static int                      maximumPoolSize = 3;
    private static long                     keepAliveTime   = 0;
    private static TimeUnit                 unit            = TimeUnit.NANOSECONDS;
    private static BlockingQueue<Runnable>  workQueue       = new ArrayBlockingQueue<Runnable>(5);
    private static ThreadFactory            threadFactory   = Executors.defaultThreadFactory();
    /**
     * AbortPolicy 如果总线成熟超过maximumPoolSize + workQueue ,则跑异常java.util.concurrent.RejectedExecutionException
     */
    private static RejectedExecutionHandler handler         = new AbortPolicy();

    private static ThreadPoolExecutor       executor        = new ThreadPoolExecutor(
            corePoolSize, 
            maximumPoolSize, 
            keepAliveTime,
            unit, 
            workQueue, 
            threadFactory, 
            handler);
    /**
     * 当keepAliveTime=0时
     * 只有线程总数>=maximumPoolSize + workQueue时,才会按照maximumPoolSize的多线程数执行
     * 否则按照corePoolSize的多线程数执行
     * @param args
     */

工厂方法使用
ExecutorService executor = Executors.newFixedThreadPool(10);

spring与线程池
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
	xsi:schemaLocation="
http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 
http://www.springframework.org/schema/context 
http://www.springframework.org/schema/context/spring-context-3.0.xsd 
http://www.springframework.org/schema/tx 
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd 
http://www.springframework.org/schema/aop  
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd">
	<!-- 
	spring自带的线程池
	 -->
	<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
		<property name="corePoolSize" value="4" /> <!-- 并发线程数,想达到真正的并发效果,最好对应CPU的线程数及核心数 -->
		<property name="maxPoolSize" value="10" /> <!-- 最大线程池容量 -->
		<property name="queueCapacity" value="500" /> <!-- 超过最大线程池容量后,允许的线程队列数 -->
	</bean>
	
	
	<!-- 
	自定义线程池
	 -->
	 <!-- 枚举类型、静态属性声明 -->
	<bean id="nanoseconds" class="org.springframework.beans.factory.config.FieldRetrievingFactoryBean">
		<property name="staticField" value="java.util.concurrent.TimeUnit.NANOSECONDS" />
	</bean>
	<!-- 
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {	
	 -->
	<bean id="myThreadPool" class="java.util.concurrent.ThreadPoolExecutor">
		<constructor-arg index="0" value="4" />
		<constructor-arg index="1" value="10" />
		<constructor-arg index="2" value="0" />
		<constructor-arg index="3" ref="nanoseconds" />
		<constructor-arg index="4">
			<bean class="java.util.concurrent.ArrayBlockingQueue">
				<constructor-arg value="500" />
			</bean>
		</constructor-arg>
		<constructor-arg index="5">
			<!-- 此bean返回的是 java.util.concurrent.ThreadFactory-->
			<bean class="java.util.concurrent.Executors" factory-method="defaultThreadFactory" />
		</constructor-arg>
		<constructor-arg index="6">
			<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
		</constructor-arg>
	</bean>
	
<!-- 
给工厂方法传参数
<bean id="exampleBean" class="...ExampleBeanFactory"  scope="prototype"
        factory-method="createExampleBean">
    <constructor-arg value="default value"/>
</bean>
 -->	
</beans>
分享到:
评论

相关推荐

    Java并发编程:线程池的使用 - 平凡希 - 博客园1

    Java并发编程中的线程池是提高系统效率的关键工具,它解决了频繁创建和销毁线程的问题。线程池通过复用已存在的线程来处理任务,从而避免了每次任务执行完毕后销毁线程的开销。在Java中,线程池的核心实现是`java....

    23 高并发编程和线程池

    23 高并发编程和线程池,教程视频:java中高并发编程和线程池

    并发容器和线程池,java并发编程3

    `Future`接口提供了获取异步计算结果的能力,它是Java并发编程中重要的API之一。 ##### 3.1 FutureTask `FutureTask`是`Future`接口的一个具体实现,它可以包装一个`Runnable`或`Callable`对象,并允许用户获取...

    Java8并行流中自定义线程池操作示例

    Java8并行流中自定义线程池操作示例 Java8并行流中自定义线程池操作示例主要介绍了Java8并行流中自定义线程池操作,结合实例形式分析了并行流的相关概念、定义及自定义线程池的相关操作技巧。 1. 概览 Java8引入了...

    Java并发之串行线程池实例解析

    这和并发线程池不同,后者可以同时执行多个线程。 为了实现一个串行的线程池,我们需要使用Java提供的ThreadPoolExecutor类。ThreadPoolExecutor类是Java并发包中一个非常重要的类,它提供了一个线程池的实现,我们...

    Java 并发学习笔记:进程和线程,并发理论,并发关键字,Lock 体系,原子操作类,发容器 &amp; 并发工具,线程池,并发实践

    Java 并发学习笔记: 进程和线程, 并发理论, 并发关键字, Lock 体系, 原子操作类, 发容器 & 并发工具, 线程池, 并发实践 Java是一种面向对象的编程语言,由Sun Microsystems于1995年推出。它是一种跨平台的...

    java并发笔记,包含线程池

    线程池作为Java并发处理的核心工具,它的理解和运用是每个Java开发者必备的技能。 Java并发笔记可能涵盖了以下几个核心知识点: 1. **线程与进程**:在操作系统中,进程是资源分配的基本单位,而线程是执行的基本...

    java并发编程实战源码,java并发编程实战pdf,Java

    《Java并发编程实战》是Java并发编程领域的一本经典著作,它深入浅出地介绍了如何在Java平台上进行高效的多线程编程。这本书的源码提供了丰富的示例,可以帮助读者更好地理解书中的理论知识并将其应用到实际项目中。...

    java并发编程:juc线程池

    Java并发编程中的JUC线程池是Java程序员必须掌握的关键技术之一,它允许开发者高效地管理并发执行的任务,充分利用多核处理器的性能。线程池的出现解决了在并发环境中线程创建、销毁带来的开销,提高了系统资源的...

    java线程池完整代码

    Java 线程池是 Java 语言中的一个重要概念,它允许开发者创建和管理多个线程,以提高程序的并发性和性能。下面是对给定文件的解析,包括 title、description、标签和部分内容的解析。 标题解析 标题 "Java 线程池...

    Java 并发编程实战.pdf

    《Java并发编程实战》这本书是关于Java语言中并发编程技术的经典著作。它详细介绍了如何在Java环境中有效地实现多线程程序和并发控制机制。在Java平台上,由于其本身提供了强大的并发编程支持,因此,掌握并发编程...

    JAVA 写的SOCKET线程池

    Java中的Socket线程池是一种高效的网络编程模型,它结合了Socket通信和多线程技术,以提高服务端处理客户端请求的并发性能。对于初学者来说,理解并掌握这个概念至关重要,因为这能帮助他们构建更稳定、可扩展的网络...

    JAVA集中常用的线程池比较.pdf

    Java线程池是一种高效管理线程的工具,它允许开发者预先配置一组线程,以便在处理并发任务时能更好地控制系统的资源。线程池的概念源于服务器应用程序中对大量短小任务处理的需求,避免频繁创建和销毁线程带来的性能...

    java并发编程内部分享PPT

    总的来说,这份“java并发编程内部分享PPT”涵盖了Java并发编程的多个重要方面,包括线程创建与管理、同步机制、并发容器、线程池、并发问题以及异步计算。通过深入学习和实践这些知识点,开发者可以更好地应对多...

    java线程池封装j

    Java线程池封装是Java并发编程中重要的一环,合理的线程池配置和封装能显著提升程序的性能和稳定性。理解线程池的工作原理,根据业务需求选择合适的参数,以及正确处理拒绝策略,都是实现高效并发处理的关键。在实际...

    java并发编程2

    Java并发编程是Java开发中的重要领域,特别是在多核处理器和分布式系统中,高效地利用并发可以极大地提升程序的性能和响应速度。以下是对标题和描述中所提及的几个知识点的详细解释: 1. **线程与并发** - **线程*...

    《java 并发编程实战高清PDF版》

    《Java并发编程实战》是一本深入探讨Java平台并发编程的权威指南。这本书旨在帮助开发者理解和掌握在Java环境中创建高效、可扩展且可靠的多线程应用程序的关键技术和实践。它涵盖了从基本概念到高级主题的广泛内容,...

    Java实现的线程池、消息队列功能

    标题中的“Java实现的...总的来说,这个主题涵盖了Java并发编程中的核心概念,线程池和消息队列的实现原理及应用场景,以及可能用到的相关开发工具。理解并熟练运用这些知识对于提升Java应用的性能和稳定性至关重要。

Global site tag (gtag.js) - Google Analytics