测试demo, ThreadPoolExecutorTest:
public class ThreadPoolExecutorTest { public static void main(String[] args) throws InterruptedException { final boolean isFair = false; ArrayBlockingQueue<Runnable> arrayBlockingQueue = new ArrayBlockingQueue<Runnable>(10, isFair); // arrayBlockingQueue.add(new MyThreadTask(10086)); final int corePoolSize = 3; final int maximumPoolSize = 6; ThreadPoolExecutor threadPool = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, 1, TimeUnit.SECONDS, arrayBlockingQueue, new ThreadPoolExecutor.CallerRunsPolicy()); // threadPool.allowCoreThreadTimeOut(true); // Integer result = 9; for (int index = 1; index <= 10; index++) { Thread tempNewThread = new MyThreadTask(index); threadPool.execute(tempNewThread); // result = threadPool.submit(new MyThreadTask(i), result); } // threadPool.shutdown(); } }
ThreadPoolExecutor 抽出来的一些核心方法:
public class ThreadPoolExecutor extends AbstractExecutorService { private final BlockingQueue<Runnable> workQueue; private final ReentrantLock mainLock = new ReentrantLock(); private final HashSet<Worker> workers = new HashSet<Worker>(); /*** * 线程中真正执行的Worker线程 */ private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /*** * 代理, 执行上层的runWorker方法; */ public void run() { runWorker(this); } } /*** * 把firstTask 添加到核心线程, 并启动; * @param firstTask * @param core 是否是核心线程 * @return */ private boolean addWorker(Runnable firstTask, boolean core) { boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (!workerStarted) addWorkerFailed(w); } return workerStarted; } /*** * 从 workQueue 的线程等待队列中获取线程(后面准备执行); * @return */ private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } /*** * 运行Worker线程; * * while (task != null || (task = getTask()) != null) * 第一次判断是当前的核心线程; * 第二个判断是核心线程第一次执行完毕, 则从workQueue中获取线程继续执行; * * task.run(); * 直接调用的run方法(外层已经有worker的线程包装起的) */ final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } /*** * 执行, 线程池执行线程的总入口 * @param command */ public void execute(Runnable command) { int c = ctl.get(); // 核心线程执行 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 核心已经多了, 添加到 workQueue if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } /** * 其中一个拒绝策略 */ public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } }
相关推荐
《JDK之ThreadPoolExecutor源码分析1》 在Java编程中,线程池是一种高效的管理线程的方式,它通过复用已存在的线程来避免频繁创建和销毁线程带来的开销。ThreadPoolExecutor作为Java中的线程池实现,其内部机制相当...
3:对线程池的基本使用及其部分源码的分析(注意:这里的源码分析是基于jdk1.6;) a:线程池的状态 volatile int runState; static final int RUNNING = 0; 运行状态 static final int SHUTDOWN = 1; 关闭状态;...
此外,线程池(ThreadPoolExecutor)的源码分析也有助于我们合理配置线程池参数,避免资源浪费。 其次,JDK提供了一系列的性能监控和诊断工具。例如,JConsole和VisualVM是两个常用的JVM监视工具,可以实时查看CPU...
基于线程池的工作原理与源码解读 本篇文章主要讲解了基于线程池的工作原理与源码解读,旨在帮助开发者更好地理解线程池的工作机制和实现原理。 一、线程池创建 线程池的创建主要涉及到ThreadPoolExecutor类的构造...
书中还特别提及了线程池的创建和管理,以及通过ThreadPoolExecutor类的源码分析,揭示线程池的工作原理。 在**基础案例篇**,作者通过实际问题,如并发编程中的常见问题和挑战,引导读者理解并发编程中的可见性和...
该项目还深入解析了从JDK 5到JDK 8各版本的重要特性,为开发者提供了丰富的代码示例和源码分析。 1. **Java基础用法**: - **变量与数据类型**:Java支持基本数据类型如int、float、char等,以及引用类型如类、...
#### 六、JDK源码分析 深入理解JDK源码可以帮助开发者更好地理解Java语言的底层实现。 - **重要源码分析**: - List、Map、Set等集合类的实现细节。 - 多线程相关的类如`Thread`、`Lock`等。 - I/O相关的类如`...
这个名为"DeepLeaner_Java"的资源集合主要涵盖了Java集合框架的源码分析,多线程技术,以及JDK5引入的新特性——泛型,并发知识的综合总结。以下是对这些知识点的详细讲解: 1. **Java集合框架源码**:Java集合框架...
《backport-util-concurrent_java_backport_源码分析》 backport-util-concurrent是一个Java库,它的主要目的是将Java 5中的并发工具类(java.util.concurrent)回移植到Java 1.3和1.4等早期版本。这个库使得开发者...
ArchKnowledgeTree ... 源码分析之Java线程池ThreadPoolExecutor 常见工具 [源码分析之Guava RateLimiter源码分析](source_code/源码分析之Guava RateLimiter源码分析.md) 源码分析之netty线程模型 Mes
- 线程池的使用:`ExecutorService`、`ThreadPoolExecutor`等。 - 同步机制:锁(`synchronized`)、显式锁(`ReentrantLock`)等。 - 并发工具类:`ConcurrentHashMap`、`CountDownLatch`等。 #### 八、总结与展望 ...
- 线程池的使用,如ExecutorService和ThreadPoolExecutor。 - volatile关键字和原子类的理解,如AtomicInteger、AtomicReference等。 - Lock接口和ReentrantLock的使用。 10. **Java 8及更高版本的新特性**: -...
6. **源码分析**: - `code`目录下的源码应该包含了上述所有功能的实现,通过阅读源码,我们可以深入理解每个部分的具体实现细节,如如何调用百度地图API,如何处理定位数据,以及如何实现路线规划和优化。 在实际...
- 核心库分析:深入JDK源码,如Collections、Stream、反射等模块,了解其内部实现机制。 - 设计模式:学习Java源码中常见的设计模式,如单例、工厂、观察者等,提升代码质量。 - 内存模型:理解JVM内存结构,如堆...
### Java进阶知识点详解 #### 第一章:基本语法 ##### 关键字 - **static**:用于定义...以上是对Java进阶知识点的详细解析,覆盖了基本语法、JDK源码分析等多个方面,有助于深入理解Java语言的核心机制及高级特性。
6. 线程池基础知识:CachedThreadPool、FixThreadPool、SingleThreadPool、ScheduledThreadPool、ThreadPoolExecutor详解、自定义线程池七大参数详解、线程池任务提交、线程池任务执行、线程池参数调节、线程池监控...
这里我们将深入探讨这两个主题,并结合源码分析,以帮助你更好地理解和掌握。 1. **Java多线程** Java多线程允许程序同时执行多个独立的线程,从而提高计算机系统的资源利用率和程序的响应速度。Java提供了两种...
- 学习线程池ExecutorService、ThreadPoolExecutor和Future接口,以提高多线程效率。 6. **输入/输出流** - Java的I/O系统用于处理数据的读写,包括字节流、字符流、对象流和缓冲流。 - 文件操作、网络通信和...
在IT行业的求职过程中,...以上只是部分核心知识点,实际的笔试面试题可能还会涵盖JVM内存管理、并发编程、性能调优、源码分析等更多内容。通过深入学习和练习这些知识点,可以有效提升你在Java笔试和面试中的竞争力。
在JDK8源码分析章节中,文档首先介绍了Unsafe类,它提供了硬件级别的操作支持,然后转向了java.lang包下的各个核心类,如String、Thread、ThreadLocal等。对于java.util包下的集合框架,文档不仅介绍了各种集合类,...