上篇(线程池汇总1):http://1181731633.iteye.com/admin/blogs/2342551
线程池的源码实现:
一、线程池基类ThreadPoolExecutor:
1)看下其成员变量:
private final BlockingQueue<Runnable> workQueue;//任务队列,工作线程从这里拿任务执行 private final ReentrantLock mainLock = new ReentrantLock(); /** * Set containing all worker threads in pool. Accessed only when * holding mainLock. */ private final HashSet<Worker> workers = new HashSet<Worker>();//工作线程(执行任务可理解为线程take()取队列任务,无则阻塞),就是初始化的时候有多少个同时工作的线程 private final Condition termination = mainLock.newCondition(); private int largestPoolSize;// private long completedTaskCount;//完成的任务数 private volatile ThreadFactory threadFactory; /** * Handler called when saturated or shutdown in execute. */ private volatile RejectedExecutionHandler handler; /** * Timeout in nanoseconds for idle threads waiting for work. * Threads use this timeout when there are more than corePoolSize * present or if allowCoreThreadTimeOut. Otherwise they wait * forever for new work. */ private volatile long keepAliveTime; /** * If false (default), core threads stay alive even when idle. * If true, core threads use keepAliveTime to time out waiting * for work. */ private volatile boolean allowCoreThreadTimeOut; /** * Core pool size is the minimum number of workers to keep alive * (and not allow to time out etc) unless allowCoreThreadTimeOut * is set, in which case the minimum is zero. */ private volatile int corePoolSize; /** * Maximum pool size. Note that the actual maximum is internally * bounded by CAPACITY. */ private volatile int maximumPoolSize; final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();//调用线程池子类的自定义队列的take()方法 if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
2)看下Worker的代码:
public void run() { runWorker(this); }
3)此类常用来创建线程池有两种():主要是最大线程数和任务队列不同。
public static ExecutorService newFixedThreadPool(int arg) { return new ThreadPoolExecutor(arg, arg, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }
核心线程数为0,最大线程数为Integer最大值,队列比较特殊
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }
二、ScheduledThreadPoolExecutor类:
1)构造方法和提交线程任务方法:
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; }
2)ScheduledFutureTask的实现:
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { /** Sequence number to break ties FIFO */ private final long sequenceNumber; /** The time the task is enabled to execute in nanoTime units */ private long time; private final long period; RunnableScheduledFuture<V> outerTask = this; public void run() { boolean periodic = isPeriodic();//是否周期性任务 if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime();//重置时间 reExecutePeriodic(outerTask);//重新放入队列 } } private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); } }
3)DelayedWorkQueue队列:
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> { /* * A DelayedWorkQueue is based on a heap-based data structure * like those in DelayQueue and PriorityQueue, except that * every ScheduledFutureTask also records its index into the * heap array. This eliminates the need to find a task upon * cancellation, greatly speeding up removal (down from O(n) * to O(log n)), and reducing garbage retention that would * otherwise occur by waiting for the element to rise to top * before clearing. But because the queue may also hold * RunnableScheduledFutures that are not ScheduledFutureTasks, * we are not guaranteed to have such indices available, in * which case we fall back to linear search. (We expect that * most tasks will not be decorated, and that the faster cases * will be much more common.) * * All heap operations must record index changes -- mainly * within siftUp and siftDown. Upon removal, a task's * heapIndex is set to -1. Note that ScheduledFutureTasks can * appear at most once in the queue (this need not be true for * other kinds of tasks or work queues), so are uniquely * identified by heapIndex. */ private static final int INITIAL_CAPACITY = 16; private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; private final ReentrantLock lock = new ReentrantLock(); private int size = 0; /** * Thread designated to wait for the task at the head of the * queue. This variant of the Leader-Follower pattern * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to * minimize unnecessary timed waiting. When a thread becomes * the leader, it waits only for the next delay to elapse, but * other threads await indefinitely. The leader thread must * signal some other thread before returning from take() or * poll(...), unless some other thread becomes leader in the * interim. Whenever the head of the queue is replaced with a * task with an earlier expiration time, the leader field is * invalidated by being reset to null, and some waiting * thread, but not necessarily the current leader, is * signalled. So waiting threads must be prepared to acquire * and lose leadership while waiting. */ private Thread leader = null; private final Condition available = lock.newCondition(); //取最小时间定时线程任务 public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) {//循环,直到获取到元素为止 RunnableScheduledFuture<?> first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first);//定时任务时间到,返回任务执行,并将数组最后一个元素放到队列头,然后依次与两个字节点进行比较,首先与根节点两个子节点[数组坐标1和2]比较,并与最小的交换位置,将之移到队列头,依然类推,在根节点的其中一个分支比较到底(用k<size >>> 1限制)。 first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay);//等待相应线程的定时时间 } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } } private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) { int s = --size; RunnableScheduledFuture<?> x = queue[s]; queue[s] = null; if (s != 0) siftDown(0, x); setIndex(f, -1); return f; } private void siftDown(int k, RunnableScheduledFuture<?> key) { int half = size >>> 1; while (k < half) {//在根节点[数组坐标0]的其中一个分支比较到底 int child = (k << 1) + 1; RunnableScheduledFuture<?> c = queue[child]; int right = child + 1;//每个父节点有两个子节点,两个子节点的大小不确定,所以都需要比较一下 if (right < size && c.compareTo(queue[right]) > 0) c = queue[child = right]; if (key.compareTo(c) <= 0) break; queue[k] = c; setIndex(c, k); k = child; } queue[k] = key; setIndex(key, k); } //新增一个线程任务 public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0);//如果只有一个元素,放在首位就行了 } else { siftUp(i, e);//比较时间大小并插入数组 } if (queue[0] == e) { leader = null; available.signal(); } } finally { lock.unlock(); } return true; } /**先将元素放到数组的末尾,并不断与自己的父节点比较并交换位置,直到找到合适位置 * Sifts element added at bottom up to its heap-ordered spot. * Call only when holding lock. */ private void siftUp(int k, RunnableScheduledFuture<?> key) { while (k > 0) { int parent = (k - 1) >>> 1;//二叉树时间排序值末尾的最大分支的父节点 RunnableScheduledFuture<?> e = queue[parent]; if (key.compareTo(e) >= 0) break;//大于父节点的值,则找到了真正的位置,返回 queue[k] = e; setIndex(e, k);//小于父节点的值,和父节点调换位置 k = parent; } queue[k] = key; setIndex(key, k); }
以上算法逻辑是堆排序。
相关推荐
更多java相关内容感兴趣的读者可查看本站专题:《Java进程与线程操作技巧总结》、《Java数据结构与算法教程》、《Java操作DOM节点技巧总结》、《Java文件与目录操作技巧汇总》和《Java缓存操作技巧汇总》希望本文所...
首先,Python作为一门高级编程语言,非常适合处理网络密集型的任务,比如控制成百上千台客户机、分发任务和汇总提交等。它能够有效地抽象复杂任务,编写数据采集任务并自动完成各种报表数据的分析及收集。Python通过...
5. 回收结果:所有任务完成后,线程池将结果汇总,可能存储在共享数据结构中。 6. 关闭线程池:所有任务执行完毕后,关闭线程池,释放资源。 在压缩包文件“线程池D7”中,可能包含了`uThreadPool`的源代码、示例...
2. **任务提交**:当需要启动新的搜索任务时,将任务信息(如蚂蚁个体、当前迭代次数)封装成结构体,并提交到线程池的任务队列中。 3. **线程工作逻辑**:线程从队列中取出任务,模拟蚂蚁行走并更新信息素,然后...
在计算随机数的一价和场景中,我们可以设置一个线程池,预先创建好一定数量的线程,每个线程从任务队列中取出一部分随机数进行累加,最后汇总所有线程的计算结果。这种方式既高效又灵活,可以应对任务量的变化。 **...
支持在线程池中线程耗尽的情况下,将线程池中所有线程当前正在执行那条代码的信息汇总,作为异常信息抛出, 同时将线程全部的完整堆栈信息保存到用户目录下的 "线程名.detail" 文件中,方便定位及排查线程池耗尽时的...
这时,可以设置一个`CountDownLatch`,初始化计数值为线程的数量,每个线程执行完后调用`countDown()`,最后主线程通过调用`await()`方法等待计数值变为0,即所有线程执行完毕,然后进行结果的汇总。 **汇总结果** ...
java线程池处理多并发,所有进程执行完后再统一处理结果线程池配置类多线程并行demo 线程池配置类 import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation....
2.mysql5,开源的windows下使用的mysql2次开发库,mysql官网上可以下载 3.testDb,简单的应用DBSqlMan的例子 整个工程已经在VS2005下运行通过,并在实际应用中使用过。提供了mysql访问效率 简单讲解下CMySQLMan类的...
2. 控制并发量:通过设置线程池的大小,可以限制同时运行的线程数量,防止过多线程导致系统资源耗尽。 3. 管理线程:线程池提供了一种机制来管理线程,包括线程的调度、监控和异常处理。 4. 提高响应速度:新任务...
【标题】"各大公司java面试大汇总2"揭示了这个压缩包内容的主旨,它是一个包含多份文档和PDF的集合,旨在提供Java面试的重要问题和答案,帮助求职者准备不同公司的面试。这些文档可能涵盖了从基础到高级的Java编程...
线程、多线程和线程池面试专题 Android面试中常被问基础知识点汇总宝典 初级面试专题(中小厂) 混合跨平台开发面试题 设计模式面试专题及答案 中高级专题(View+Handler+Binder) 底层机制突破 面试必备之乐观锁与...
本资源摘要信息涵盖了 Java 后端开发的多个方面,包括 JPA 防踩坑姿势、Servlet 3 异步原理与实践、Tomcat 源码剖析、Java 并发编程、Java 线程池、AbstractQueuedSynchronizer、Tomcat 系列、Netty 系列、Kafka ...
2. 直接使用`ThreadPoolExecutor`类,自定义线程池的核心参数,包括核心线程数、最大线程数、线程存活时间、任务队列等。 线程的生命周期包括新建、就绪、运行、阻塞和死亡。僵死进程(僵尸进程)是已经终止但其父...
《Tomcat调优及相关设置汇总》是一份详细探讨如何优化Apache Tomcat服务器性能的重要文档,由作者flowerd54编写。Tomcat作为广泛使用的开源Java Servlet容器,其性能调优对于提升Web应用的响应速度和处理能力至关...
Java 线程池原理是指 Java 中的线程池实现机制,包括线程池的创建、线程的执行和线程的回收等过程。CAS 机制是 Compare-And-Swap 的缩写,是一种原子操作机制,用于实现多线程环境下的同步。Synchronized 是 Java 中...
Java高级面试题汇总 本资源摘要信息中,我们将对 Java 高级面试题进行总结和分析,涵盖了 Java 面试题的方方面面,从数据类型转换到框架选择,从线程池状态到 Java 8 的元空间替换,从跳出多重循环到 JSON 对象解析...
Java基础编程题目总汇是针对Java初学者设计的一系列练习题,旨在巩固和深化Java编程基础知识。这些题目覆盖了从基本语法、数据类型到控制结构、类与对象、异常处理等多个核心领域,对于想要系统性地提升Java编程技能...
2. **JVM调优**: - 调优主要包括内存配置、垃圾收集器选择、性能监控等,目标是提高系统性能和稳定性,避免内存泄漏或过度垃圾回收。 - 参数如`-Xms`和`-Xmx`用于设置堆内存的最小和最大值,`-XX:NewRatio`调整...