`
coolxing
  • 浏览: 874762 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
9a45b66b-c585-3a35-8680-2e466b75e3f8
Java Concurre...
浏览量:97515
社区版块
存档分类
最新评论

配置ThreadPoolExecutor

阅读更多

[本文是我对Java Concurrency In Practice C08的归纳和总结.  转载请注明作者和出处,  如有谬误, 欢迎在评论中指正. ]

Executors的静态方法newCachedThreadPool, newFixedThreadPool, newScheduledThreadPool所返回的线程池都是ThreadPoolExecutor对象或者其子类对象. ThreadPoolExecutor提供了多种配置, 可以根据实际定制合适的线程池.

 

线程的创建和销毁

ThreadPoolExecutor构造函数中的corePoolSize, maximumPoolSize, keepAliveTime参数与线程的创建和销毁相关. 

corePoolSize指定ThreadPoolExecutor中持有的核心线程数, 除非task队列已满, ThreadPoolExecutor不会创建超过核心线程数的线程(corePoolSize为0时是一种特殊情况, 此时就算task队列没有饱和, 向线程池第一次提交task时仍然会创建新的线程), 核心线程一旦创建就不会销毁, 除非设置了allowCoreThreadTimeOut(true), 或者关闭线程池.

maximumPoolSize指定线程池中持有的最大线程数. 对于超过核心线程数的线程, 如果在指定的超时时间内没有使用到, 就会被销毁.

keepAliveTime指定超时时间.

Executors类的静态方法创建线程池的源码:

public static ExecutorService newCachedThreadPool() {
	// 核心线程数为0, 最大线程数为Integer.MAX_VALUE, 超时时间为60s
	return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads) {
	// 核心线程数和最大线程数都为调用方指定的值nThreads, 超时时间为0
	return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
			new LinkedBlockingQueue<Runnable>());
}

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
	// 核心线程数由调用方指定, 最大线程数为Integer.MAX_VALUE, 超时时间为0
	return new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue());
} 

 

task队列

线程池内部持有一个task队列, 当task的提交速度超过task的执行速度时, task将被缓存在task队列中等待有线程可用时再执行. ThreadPoolExecutor在创建时可以为其指定task队列, 开发者一般有三种选择: 有界队列, 无界队列以及同步队列. Executors.newFixedThreadPool和Executors.newScheduledThreadPool返回的ThreadPoolExecutor对象使用的是无界队列, 而Executors.newCashedThreadPool返回的ThreadPoolExecutor对象使用的是同步队列.

为线程数不多的线程池指定一个容量大的队列(或者无界队列), 有助于减少线程间切换, CPU等方面的消耗, 代价是可能会造成吞吐量下降. 如果使用的是有界队列, 队列可能会被填满, 此时将根据指定的饱和策略进行处理(见之后的讲述).

对于线程数很大的线程池, 可以使用同步队列. 同步队列(SynchronousQueue)其实不能算是一种队列, 因为同步队列没有缓存的作用. 使用同步队列时, task被提交时, 直接由线程池中的线程接手. 如果此时线程池中没有可用的线程, 线程池将创建新的线程接手. 如果线程池无法创建新的线程(比如线程数已到达maximumPoolSize), 则根据指定的饱和策略进行处理(同样见之后的讲述).

 

饱和策略

如果线程池使用的是有界队列, 那么当有界队列满时继续提交task时饱和策略会被触发.

如果线程池使用的是同步队列, 那么当线程池无法创建新的线程接手task时饱和策略会被触发.

如果线程池被关闭后, 仍然向其提交task时, 饱和策略也会被触发.

ThreadPoolExecutor.setRejectedExecutionHandler方法用于设定饱和策略. 该方法接受一个RejectedExecutionHandler对象作为参数. RejectedExecutionHandler只定义了一个方法:rejectedExecution(Runnable r, ThreadPoolExecutor executor). rejectedExecution方法在饱和策略被触发时由系统回调.

ThreadPoolExecutor类中预定义了多个RejectedExecutionHandler的实现类: AbortPolicy, CallerRunsPolicy, DiscardPolicy, 和DiscardOldestPolicy.

AbortPolicy是默认的饱和策略, 其rejectedExecution方法为:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
	throw new RejectedExecutionException();
} 

可见默认情况下, 触发饱和策略时将抛出RejectedExecutionException异常.

CallerRunsPolicy. 饱和时将在提交task的线程中执行task, 而不是由线程池中的线程执行:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
	if (!e.isShutdown()) {
		r.run();
	}
}

使用CallerRunsPolicy的例子:

class LifecycleWebServer {
	// MAX_THREAD_COUNT和MAX_QUEUE_COUNT的值根据系统的实际情况确定
	private static final int MAX_THREAD_COUNT = 100;
	private static final int MAX_QUEUE_COUNT = 1000;

	// 使用有界队列作为task队列, 当有界队列满时, 将触发饱和策略
	private final ThreadPoolExecutor exec = new ThreadPoolExecutor(0, MAX_THREAD_COUNT, 60L, TimeUnit.SECONDS,
			new ArrayBlockingQueue<Runnable>(MAX_QUEUE_COUNT));

	public void start() throws IOException {
		// 设置饱和策略为CallerRunsPolicy
		exec.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
		ServerSocket socket = new ServerSocket(80);
		while (!exec.isShutdown()) {
			try {
				final Socket conn = socket.accept();
				exec.execute(new Runnable() {
					public void run() {
						handleRequest(conn);
					}
				});
			} catch (RejectedExecutionException e) {
				if (!exec.isShutdown())
					log("task submission rejected", e);
			}
		}
	}

	public void stop() {
		exec.shutdown();
	}

	void handleRequest(Socket connection) {
		Request req = readRequest(connection);
		if (isShutdownRequest(req))
			stop();
		else
			dispatchRequest(req);
	}
	
	public static void main(String[] args) {
		LifecycleWebServer server = new LifecycleWebServer();
		try {
			// 在main线程中启动server
			server.start();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
} 

LifecycleWebServer中的线程池使用CallerRunsPolicy作为其饱和策略. 如果线程池饱和时main线程仍然向线程池提交task, 那么task将在main中执行. main线程执行task是需要一定时间的, 这样就给了线程池喘息的机会, 而且main线程在执行task的时间内无法接受socket连接, 因此socket连接请求将缓存在tcp层. 如果server过载持续的时间较长, 使得tcp层的缓存不够, 那么tcp缓存将根据其策略丢弃部分请求. 如此一来, 整个系统的过载压力逐步向外扩散: 线程池-线程池中的队列-main线程-tcp层-client. 这样的系统在发生过载时是比较优雅的: 既不会因为过多的请求而导致系统资源耗尽, 也不会一发生过载时就拒绝服务, 只有发生长时间系统过载时才会出现客户端无法连接的情况.

DiscardPolicy. 该策略将最新提交的task丢弃:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
	// 丢弃, 不做任何处理
} 

DiscardOldestPolicy. 该策略丢弃队列中处于对头的task, 且试着再次提交最新的task:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
	e.getQueue().poll();
	e.execute(r);
    }
} 

DiscardOldestPolicy与PriorityBlockingQueue结合使用时可能会造成不好的结果, 因为PriorityBlockingQueue中位于对头的task是优先级最高的task, 发生饱和时反而首先丢弃优先级高的task可能不符合需求.

ThreadPoolExecutor没有提供饱和时阻塞的策略, 不过开发者可以结合Semaphore实现:

public class BoundedExecutor {
	private final Executor exec;
	private final Semaphore semaphore;

	public BoundedExecutor(Executor exec, int bound) {
		this.exec = exec;
		// 设定信号量permit的上限
		this.semaphore = new Semaphore(bound);
	}

	public void submitTask(final Runnable command) throws InterruptedException {
		// 提交task前先申请permit, 如果无法申请到permit, 调用submitTask的线程将被阻塞, 直到有permit可用
		semaphore.acquire();
		try {
			exec.execute(new Runnable() {
				public void run() {
					try {
						command.run();
					} finally {
						// 提交成功了, 运行task后释放permit
						semaphore.release();
					}
				}
			});
		} catch (RejectedExecutionException e) {
			// 如果没有提交成功, 也需要释放permit
			semaphore.release();
		}
	}
}

 

ThreadFactory

在创建ThreadPoolExecutor时还可以为其指定ThreadFactory, 当线程池需要创建新的线程时会调用ThreadFactory的newThread方法. 默认的ThreadFactory创建的线程是nonDaemon, 线程优先级为NORM_PRIORITY的线程, 并且为其指定了可识别的线程名称:

public Thread newThread(Runnable r) {
	Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
	if (t.isDaemon())
		t.setDaemon(false);
	if (t.getPriority() != Thread.NORM_PRIORITY)
		t.setPriority(Thread.NORM_PRIORITY);
	return t;
} 

开发者可以根据自身需要为ThreadPoolExecutor指定自定义的ThreadFactory. 例如:

public class MyThreadFactory implements ThreadFactory {
	private final String poolName;

	public MyThreadFactory(String poolName) {
		this.poolName = poolName;
	}

	public Thread newThread(Runnable runnable) {
		return new MyAppThread(runnable, poolName);
	}
}

public class MyAppThread extends Thread {
	public static final String DEFAULT_NAME = "MyAppThread";
	private static volatile boolean debugLifecycle = false;
	private static final AtomicInteger created = new AtomicInteger();
	private static final AtomicInteger alive = new AtomicInteger();
	private static final Logger log = Logger.getAnonymousLogger();

	public MyAppThread(Runnable r) {
		this(r, DEFAULT_NAME);
	}

	public MyAppThread(Runnable runnable, String name) {
		// 为自定义的Thread类指定线程名称
		super(runnable, name + "-" + created.incrementAndGet());
		// 设置UncaughtExceptionHandler. UncaughtExceptionHandler的uncaughtException方法将在线程运行中抛出未捕获异常时由系统调用
		setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
			public void uncaughtException(Thread t, Throwable e) {
				log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e);
			}
		});
	}

	public void run() {
		// Copy debug flag to ensure consistent value throughout. 
		boolean debug = debugLifecycle;
		if (debug)
			log.log(Level.FINE, "Created " + getName());
		try {
			alive.incrementAndGet();
			super.run();
		} finally {
			alive.decrementAndGet();
			if (debug)
				log.log(Level.FINE, "Exiting " + getName());
		}
	}

	public static int getThreadsCreated() {
		return created.get();
	}

	public static int getThreadsAlive() {
		return alive.get();
	}

	public static boolean getDebug() {
		return debugLifecycle;
	}

	public static void setDebug(boolean b) {
		debugLifecycle = b;
	}
}

 

扩展ThreadPoolExecutor

ThreadPoolExecutor类提供了多个"钩子"方法, 以供其子类实现, 比如beforeExecute, afterExecute, terminated等. 所谓"钩子"是指基类预留的, 但是没有提供具体实现的方法, 其方法体为空. 子类可以根据需要为"钩子"提供具体实现.

beforeExecute和afterExecute方法分别在执行task前后调用:

private void runTask(Runnable task) {
	final ReentrantLock runLock = this.runLock;
	runLock.lock();
	try {
		if (runState < STOP && Thread.interrupted() && runState >= STOP)
			thread.interrupt();
		boolean ran = false;
		beforeExecute(thread, task);
		try {
			task.run();
			ran = true;
			afterExecute(task, null);
			++completedTasks;
		} catch (RuntimeException ex) {
			if (!ran)
				afterExecute(task, ex);
			throw ex;
		}
	} finally {
		runLock.unlock();
	}
} 

beforeExecute和afterExecute方法可以用于记录日志, 统计数据等操作.

terminated方法在线程池被关闭后调用. terminated方法可以用于释放线程池申请的资源.

 

4
1
分享到:
评论

相关推荐

    redis lits queue 和 ThreadPoolExecutor 结合

    - **线程池配置**:根据系统资源和需求,合理配置ThreadPoolExecutor的参数,如核心线程数、最大线程数、队列容量和超时策略等。 5. **优点**: - **解耦**:将任务存储和执行分开,使得系统更灵活,易于扩展。 ...

    Spring线程池ThreadPoolExecutor配置并且得到任务执行的结果

    在Spring中,我们可以使用ThreadPoolTaskExecutor来配置ThreadPoolExecutor。ThreadPoolTaskExecutor是Spring提供的一种线程池实现,基于ThreadPoolExecutor,它提供了诸如线程池大小、队列大小、线程存活时间等参数...

    Java并发编程part2

    8.3 配置threadpoolexecutor 8.4 扩展threadpoolexecutor 8.5 并行递归算法 第9章 gui应用程序 9.1 为什么gui是单线程化的 9.2 短期的gui任务 9.3 耗时gui任务 9.4 共享数据模型 9.5 其他形式的单线程子系统 第3部分...

    JAVA并发编程实践_中文版(1-16章全)_1/4

    8.3 配置threadpoolexecutor 8.4 扩展threadpoolexecutor 8.5 并行递归算法 第9章 gui应用程序 9.1 为什么gui是单线程化的 9.2 短期的gui任务 9.3 耗时gui任务 9.4 共享数据模型 9.5 其他形式的单线程子系统 第3部分...

    Java并发编程实践part1

    8.3 配置threadpoolexecutor 8.4 扩展threadpoolexecutor 8.5 并行递归算法 第9章 gui应用程序 9.1 为什么gui是单线程化的 9.2 短期的gui任务 9.3 耗时gui任务 9.4 共享数据模型 9.5 其他形式的单线程子系统 第3部分...

    Java多线程实现.pdf

    通过配置ThreadPoolExecutor对象,可以管理线程池的大小、工作队列、线程工厂和拒绝策略等参数。文档提到了两种线程池的创建方法:newCachedThreadPool和newFixedThreadPool。newCachedThreadPool创建一个可缓存的...

    2011.08.30(2)——— java BlockingQueue ExecutorService

    4. 如何创建和配置ThreadPoolExecutor。 5. 使用BlockingQueue与ExecutorService协同工作的示例代码。 6. 生产者-消费者模型在并发编程中的应用。 这些内容对于理解和优化Java并发程序的性能至关重要,也是面试和...

    Java并发编程实践 PDF 高清版

    8.3 配置ThreadPoolExecutor 8.4 扩展ThreadPoolExecutor 8.5 并行递归算法 第9章 GUI应用程序 9.1 为什么GUI是单线程化的 9.2 短期的GUI任务 9.3 耗时GUI任务 9.4 共享数据模型 9.5 其他形式的单线程子系统 第3部分...

    ThreadPoolExecutor源码解析.pdf

    《ThreadPoolExecutor源码解析》 ThreadPoolExecutor是Java并发编程中重要的组件,它是ExecutorService接口的实现,用于管理和调度...在实际开发中,合理配置线程池参数,避免线程池过载,是保障系统稳定运行的关键。

    spring线程池ThreadPoolExecutor配置以及FutureTask的使用

    这个类是Spring对Java内置的`java.util.concurrent.ThreadPoolExecutor`的封装,允许开发者在Spring应用上下文中声明式地定义线程池。在本篇文章中,我们将深入探讨`ThreadPoolTaskExecutor`的配置及其使用,并结合`...

    Java Concurrency in Practice

    - **8.3 配置ThreadPoolExecutor**:介绍了ThreadPoolExecutor类的主要配置选项。 - **8.4 扩展ThreadPoolExecutor**:探讨了如何自定义ThreadPoolExecutor以满足特定需求。 - **8.5 并行递归**:介绍了一种通过...

    Java.Concurrency.in.Practice.pdf

    - **8.3 配置 ThreadPoolExecutor**:深入讨论了 `ThreadPoolExecutor` 类的配置选项,以及如何根据应用场景进行定制。 - **8.4 扩展 ThreadPoolExecutor**:探讨了如何通过继承 `ThreadPoolExecutor` 来实现自定义...

    Java并发编程(学习笔记).xmind

    配置ThreadPoolExecutor(自定义的线程池) 此处需要注意系统默认提供的线程池是如何配置的 扩展ThreadPoolExector GUI应用程序探讨 活跃度(Liveness)、性能、测试 避免活跃性危险 死锁 锁...

    线程池之ThreadPoolExecutor.docx

    线程池的配置需要根据实际应用的需求进行,包括核心线程数、最大线程数、队列容量以及饱和策略的选择,都需要谨慎考虑,以保证线程池既能高效运行,又能防止资源过度消耗。同时,合理的线程池管理能够帮助开发者调试...

    Java并发编程实战

    8.3 配置ThreadPoolExecutor141 8.3.1 线程的创建与销毁142 8.3.2 管理队列任务142 8.3.3 饱和策略144 8.3.4 线程工厂146 8.3.5 在调用构造函数后再定制ThreadPoolExecutor147 8.4 扩展 ThreadPoolExecutor...

    ThreadPoolExecutor运转机制介绍

    `ThreadPoolExecutor` 提供了一种灵活的方式来配置线程池,通过控制线程的数量、队列的行为以及对超出容量的任务的处理方式,可以实现高性能的应用程序。 `ThreadPoolExecutor` 的构造函数如下: ```java public ...

    Java并发编程之ThreadPoolExecutor详解与实战

    主要涵盖ThreadPoolExecutor的基础概念介绍,创建配置参数的意义与选择方法,以及在实际编程中的几种典型应用场景,如任务的异步处理和周期定时任务调度。通过实例演示了如何利用ThreadPoolExecutor构建高效稳定的...

    java ThreadPoolExecutor 并发调用实例详解

    在这个示例中,我们首先创建了一个 ThreadPoolExecutor 对象,并将其配置为拥有 8 个核心线程和 12 个最大线程,keep alive 时间为 30 秒,并使用 ArrayBlockingQueue 作为任务队列。 接着,我们创建了 10 个 ...

    4_ThreadPoolExecutor源码阅读1

    如果你打算自己手动配置和调整ThreadPoolExecutor类时,建议先阅读一下下面的注意事项:Core and maximum pool sizesA T

    SpringBoot 多任务并行+线程池处理的实现

    我们可以根据需要配置 ThreadPoolExecutor,提高系统的性能和可扩展性。 结论 SpringBoot 多任务并行+线程池处理是 SpringBoot 框架中的一种高效的并发处理机制。它可以提高系统的性能和可扩展性,提高系统的吞吐...

Global site tag (gtag.js) - Google Analytics