`

ThreadPoolExecutor 线程池源码分析-基于jdk8

    博客分类:
  • Java
阅读更多

测试demo, ThreadPoolExecutorTest:

 

public class ThreadPoolExecutorTest {
	public static void main(String[] args) throws InterruptedException {
		final boolean isFair = false;
		ArrayBlockingQueue<Runnable> arrayBlockingQueue = new ArrayBlockingQueue<Runnable>(10, isFair);
//		arrayBlockingQueue.add(new MyThreadTask(10086));
		
		final int corePoolSize = 3;
		final int maximumPoolSize = 6;
		ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
				corePoolSize, maximumPoolSize,
		        1, TimeUnit.SECONDS,
		        arrayBlockingQueue,
		        new ThreadPoolExecutor.CallerRunsPolicy());
		
//		threadPool.allowCoreThreadTimeOut(true);
//		Integer result = 9;
		for (int index = 1; index <= 10; index++) {
			Thread tempNewThread = new MyThreadTask(index);
		    threadPool.execute(tempNewThread);
//			result = threadPool.submit(new MyThreadTask(i), result);
		}
	    
//	    threadPool.shutdown();
	}
}

ThreadPoolExecutor 抽出来的一些核心方法:

 

 

public class ThreadPoolExecutor extends AbstractExecutorService {

	private final BlockingQueue<Runnable> workQueue;
	private final ReentrantLock mainLock = new ReentrantLock();
	private final HashSet<Worker> workers = new HashSet<Worker>();
	
	/***
	 * 线程中真正执行的Worker线程
	 */
	private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
	/** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
        
		Worker(Runnable firstTask) {
			setState(-1); // inhibit interrupts until runWorker
			this.firstTask = firstTask;
			this.thread = getThreadFactory().newThread(this);
		}

		/***
		 * 代理, 执行上层的runWorker方法;
		 */
		public void run() {
			runWorker(this);
		}
	}

	/***
	 * 把firstTask 添加到核心线程, 并启动;
	 * @param firstTask
	 * @param core 是否是核心线程
	 * @return
	 */
	private boolean addWorker(Runnable firstTask, boolean core) {
		boolean workerStarted = false;
		boolean workerAdded = false;
		Worker w = null;
		try {
			w = new Worker(firstTask);
			final Thread t = w.thread;
			if (t != null) {
				int rs = runStateOf(ctl.get());
				if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
					workers.add(w);
					int s = workers.size();
					if (s > largestPoolSize)
						largestPoolSize = s;
					workerAdded = true;
				}
				if (workerAdded) {
					t.start();
					workerStarted = true;
				}
			}
		} finally {
			if (!workerStarted)
				addWorkerFailed(w);
		}
		return workerStarted;
	}

	/***
	 * 从 workQueue 的线程等待队列中获取线程(后面准备执行);
	 * @return
	 */
	private Runnable getTask() {
		boolean timedOut = false; // Did the last poll() time out?
		for (;;) {
			try {
				Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
				if (r != null)
					return r;
				timedOut = true;
			} catch (InterruptedException retry) {
				timedOut = false;
			}
		}
	}

	/***
	 * 运行Worker线程; 
	 * 
	 * while (task != null || (task = getTask()) != null) 
	 * 第一次判断是当前的核心线程;
	 * 第二个判断是核心线程第一次执行完毕, 则从workQueue中获取线程继续执行;
	 * 
	 * task.run();
	 * 直接调用的run方法(外层已经有worker的线程包装起的)
	 */
	final void runWorker(Worker w) {
		Thread wt = Thread.currentThread();
		Runnable task = w.firstTask;
		w.firstTask = null;
		w.unlock(); // allow interrupts
		boolean completedAbruptly = true;
		try {
			while (task != null || (task = getTask()) != null) {
				w.lock();
				// If pool is stopping, ensure thread is interrupted;
				// if not, ensure thread is not interrupted. This
				// requires a recheck in second case to deal with
				// shutdownNow race while clearing interrupt
				if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
						&& !wt.isInterrupted())
					wt.interrupt();
				try {
					beforeExecute(wt, task);
					Throwable thrown = null;
					try {
						task.run();
					} catch (RuntimeException x) {
						thrown = x;
						throw x;
					} catch (Error x) {
						thrown = x;
						throw x;
					} catch (Throwable x) {
						thrown = x;
						throw new Error(x);
					} finally {
						afterExecute(task, thrown);
					}
				} finally {
					task = null;
					w.completedTasks++;
					w.unlock();
				}
			}
			completedAbruptly = false;
		} finally {
			processWorkerExit(w, completedAbruptly);
		}
	}

	/***
	 * 执行, 线程池执行线程的总入口
	 * @param command
	 */
	public void execute(Runnable command) {
		int c = ctl.get();
		// 核心线程执行
		if (workerCountOf(c) < corePoolSize) {
			if (addWorker(command, true))
				return;
			c = ctl.get();
		}
		// 核心已经多了, 添加到 workQueue
		if (isRunning(c) && workQueue.offer(command)) {
			int recheck = ctl.get();
			if (!isRunning(recheck) && remove(command))
				reject(command);
			else if (workerCountOf(recheck) == 0)
				addWorker(null, false);
		} else if (!addWorker(command, false))
			reject(command);
	}

	/**
	 * 其中一个拒绝策略
	 */
	public static class CallerRunsPolicy implements RejectedExecutionHandler {
		public CallerRunsPolicy() {
		}
		public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
			if (!e.isShutdown()) {
				r.run();
			}
		}
	}
}

 

分享到:
评论

相关推荐

    JDK之ThreadPoolExecutor源码分析1

    《JDK之ThreadPoolExecutor源码分析1》 在Java编程中,线程池是一种高效的管理线程的方式,它通过复用已存在的线程来避免频繁创建和销毁线程带来的开销。ThreadPoolExecutor作为Java中的线程池实现,其内部机制相当...

    java线程池概念.txt

    3:对线程池的基本使用及其部分源码的分析(注意:这里的源码分析是基于jdk1.6;) a:线程池的状态 volatile int runState; static final int RUNNING = 0; 运行状态 static final int SHUTDOWN = 1; 关闭状态;...

    JDK 性能

    此外,线程池(ThreadPoolExecutor)的源码分析也有助于我们合理配置线程池参数,避免资源浪费。 其次,JDK提供了一系列的性能监控和诊断工具。例如,JConsole和VisualVM是两个常用的JVM监视工具,可以实时查看CPU...

    基于线程池的工作原理与源码解读

    基于线程池的工作原理与源码解读 本篇文章主要讲解了基于线程池的工作原理与源码解读,旨在帮助开发者更好地理解线程池的工作机制和实现原理。 一、线程池创建 线程池的创建主要涉及到ThreadPoolExecutor类的构造...

    JavaCommon:Java基础用法,集合,线程,JUC,jdk5--8各个版本特性。

    该项目还深入解析了从JDK 5到JDK 8各版本的重要特性,为开发者提供了丰富的代码示例和源码分析。 1. **Java基础用法**: - **变量与数据类型**:Java支持基本数据类型如int、float、char等,以及引用类型如类、...

    深入理解高并发编程-核心技术原理

    书中还特别提及了线程池的创建和管理,以及通过ThreadPoolExecutor类的源码分析,揭示线程池的工作原理。 在**基础案例篇**,作者通过实际问题,如并发编程中的常见问题和挑战,引导读者理解并发编程中的可见性和...

    java精通程度应该会什么

    #### 六、JDK源码分析 深入理解JDK源码可以帮助开发者更好地理解Java语言的底层实现。 - **重要源码分析**: - List、Map、Set等集合类的实现细节。 - 多线程相关的类如`Thread`、`Lock`等。 - I/O相关的类如`...

    java并发源码集合-DeepLeaner_Java:Java集合框架源码,多线程,jdk5新特性泛型,并发知识总结学习

    这个名为"DeepLeaner_Java"的资源集合主要涵盖了Java集合框架的源码分析,多线程技术,以及JDK5引入的新特性——泛型,并发知识的综合总结。以下是对这些知识点的详细讲解: 1. **Java集合框架源码**:Java集合框架...

    backport-util-concurrent_java_backport_源码.zip

    《backport-util-concurrent_java_backport_源码分析》 backport-util-concurrent是一个Java库,它的主要目的是将Java 5中的并发工具类(java.util.concurrent)回移植到Java 1.3和1.4等早期版本。这个库使得开发者...

    ArchKnowledgeTree:架构师知识谱系梳理,包含Java core, JUC, JVM, MySQL,MQ, redis,分布式相关等各种知识点整理。是我按个人理解学习、整理出的一个知识库

    ArchKnowledgeTree ... 源码分析之Java线程池ThreadPoolExecutor 常见工具 [源码分析之Guava RateLimiter源码分析](source_code/源码分析之Guava RateLimiter源码分析.md) 源码分析之netty线程模型 Mes

    java基础课程学习资源

    - 线程池的使用:`ExecutorService`、`ThreadPoolExecutor`等。 - 同步机制:锁(`synchronized`)、显式锁(`ReentrantLock`)等。 - 并发工具类:`ConcurrentHashMap`、`CountDownLatch`等。 #### 八、总结与展望 ...

    JAVA 面试题

    - 线程池的使用,如ExecutorService和ThreadPoolExecutor。 - volatile关键字和原子类的理解,如AtomicInteger、AtomicReference等。 - Lock接口和ReentrantLock的使用。 10. **Java 8及更高版本的新特性**: -...

    基于百度地图Android SDK的外卖送餐配送线路规划程序.zip

    6. **源码分析**: - `code`目录下的源码应该包含了上述所有功能的实现,通过阅读源码,我们可以深入理解每个部分的具体实现细节,如如何调用百度地图API,如何处理定位数据,以及如何实现路线规划和优化。 在实际...

    java源码笔记

    - 核心库分析:深入JDK源码,如Collections、Stream、反射等模块,了解其内部实现机制。 - 设计模式:学习Java源码中常见的设计模式,如单例、工厂、观察者等,提升代码质量。 - 内存模型:理解JVM内存结构,如堆...

    Java进阶知识点汇总.pdf

    ### Java进阶知识点详解 #### 第一章:基本语法 ##### 关键字 - **static**:用于定义...以上是对Java进阶知识点的详细解析,覆盖了基本语法、JDK源码分析等多个方面,有助于深入理解Java语言的核心机制及高级特性。

    JAVA后端架构师.pdf

    6. 线程池基础知识:CachedThreadPool、FixThreadPool、SingleThreadPool、ScheduledThreadPool、ThreadPoolExecutor详解、自定义线程池七大参数详解、线程池任务提交、线程池任务执行、线程池参数调节、线程池监控...

    java+多线程+同步详解源码整理

    这里我们将深入探讨这两个主题,并结合源码分析,以帮助你更好地理解和掌握。 1. **Java多线程** Java多线程允许程序同时执行多个独立的线程,从而提高计算机系统的资源利用率和程序的响应速度。Java提供了两种...

    java详细笔记

    - 学习线程池ExecutorService、ThreadPoolExecutor和Future接口,以提高多线程效率。 6. **输入/输出流** - Java的I/O系统用于处理数据的读写,包括字节流、字符流、对象流和缓冲流。 - 文件操作、网络通信和...

    各软件公司笔试面试题整理

    在IT行业的求职过程中,...以上只是部分核心知识点,实际的笔试面试题可能还会涵盖JVM内存管理、并发编程、性能调优、源码分析等更多内容。通过深入学习和练习这些知识点,可以有效提升你在Java笔试和面试中的竞争力。

    java技术指南

    在JDK8源码分析章节中,文档首先介绍了Unsafe类,它提供了硬件级别的操作支持,然后转向了java.lang包下的各个核心类,如String、Thread、ThreadLocal等。对于java.util包下的集合框架,文档不仅介绍了各种集合类,...

Global site tag (gtag.js) - Google Analytics