`

线程池汇总2

 
阅读更多

上篇(线程池汇总1):http://1181731633.iteye.com/admin/blogs/2342551

线程池的源码实现:

一、线程池基类ThreadPoolExecutor:

1)看下其成员变量:

 

    private final BlockingQueue<Runnable> workQueue;//任务队列,工作线程从这里拿任务执行  
    private final ReentrantLock mainLock = new ReentrantLock();  
    /** 
     * Set containing all worker threads in pool. Accessed only when 
     * holding mainLock. 
     */  
    private final HashSet<Worker> workers = new HashSet<Worker>();//工作线程(执行任务可理解为线程take()取队列任务,无则阻塞),就是初始化的时候有多少个同时工作的线程  
    private final Condition termination = mainLock.newCondition();  
    private int largestPoolSize;//  
    private long completedTaskCount;//完成的任务数  
    private volatile ThreadFactory threadFactory;  
  
    /** 
     * Handler called when saturated or shutdown in execute. 
     */  
    private volatile RejectedExecutionHandler handler;  
  
    /** 
     * Timeout in nanoseconds for idle threads waiting for work. 
     * Threads use this timeout when there are more than corePoolSize 
     * present or if allowCoreThreadTimeOut. Otherwise they wait 
     * forever for new work. 
     */  
    private volatile long keepAliveTime;  
  
    /** 
     * If false (default), core threads stay alive even when idle. 
     * If true, core threads use keepAliveTime to time out waiting 
     * for work. 
     */  
    private volatile boolean allowCoreThreadTimeOut;  
  
    /** 
     * Core pool size is the minimum number of workers to keep alive 
     * (and not allow to time out etc) unless allowCoreThreadTimeOut 
     * is set, in which case the minimum is zero. 
     */  
    private volatile int corePoolSize;  
  
    /** 
     * Maximum pool size. Note that the actual maximum is internally 
     * bounded by CAPACITY. 
     */  
    private volatile int maximumPoolSize;
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();//调用线程池子类的自定义队列的take()方法
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
 

 

 

 2)看下Worker的代码:

        public void run() {
            runWorker(this);
        }

 

 3)此类常用来创建线程池有两种():主要是最大线程数和任务队列不同。

public static ExecutorService newFixedThreadPool(int arg) {  
      return new ThreadPoolExecutor(arg, arg, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());  
   }  

 

核心线程数为0,最大线程数为Integer最大值,队列比较特殊

public static ExecutorService newCachedThreadPool() {  
      return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());  
   }  

 

 二、ScheduledThreadPoolExecutor类:

1)构造方法和提交线程任务方法:

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

 2)ScheduledFutureTask的实现:

    private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {

        /** Sequence number to break ties FIFO */
        private final long sequenceNumber;

        /** The time the task is enabled to execute in nanoTime units */
        private long time;
        private final long period;
        RunnableScheduledFuture<V> outerTask = this;
        public void run() {
            boolean periodic = isPeriodic();//是否周期性任务
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();//重置时间
                reExecutePeriodic(outerTask);//重新放入队列
            }
        }
        private void setNextRunTime() {
            long p = period;
            if (p > 0)
                time += p;
            else
                time = triggerTime(-p);
        }
    }

 3)DelayedWorkQueue队列:

 

    static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> {

        /*
         * A DelayedWorkQueue is based on a heap-based data structure
         * like those in DelayQueue and PriorityQueue, except that
         * every ScheduledFutureTask also records its index into the
         * heap array. This eliminates the need to find a task upon
         * cancellation, greatly speeding up removal (down from O(n)
         * to O(log n)), and reducing garbage retention that would
         * otherwise occur by waiting for the element to rise to top
         * before clearing. But because the queue may also hold
         * RunnableScheduledFutures that are not ScheduledFutureTasks,
         * we are not guaranteed to have such indices available, in
         * which case we fall back to linear search. (We expect that
         * most tasks will not be decorated, and that the faster cases
         * will be much more common.)
         *
         * All heap operations must record index changes -- mainly
         * within siftUp and siftDown. Upon removal, a task's
         * heapIndex is set to -1. Note that ScheduledFutureTasks can
         * appear at most once in the queue (this need not be true for
         * other kinds of tasks or work queues), so are uniquely
         * identified by heapIndex.
         */

        private static final int INITIAL_CAPACITY = 16;
        private RunnableScheduledFuture<?>[] queue =
            new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
        private final ReentrantLock lock = new ReentrantLock();
        private int size = 0;

        /**
         * Thread designated to wait for the task at the head of the
         * queue.  This variant of the Leader-Follower pattern
         * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
         * minimize unnecessary timed waiting.  When a thread becomes
         * the leader, it waits only for the next delay to elapse, but
         * other threads await indefinitely.  The leader thread must
         * signal some other thread before returning from take() or
         * poll(...), unless some other thread becomes leader in the
         * interim.  Whenever the head of the queue is replaced with a
         * task with an earlier expiration time, the leader field is
         * invalidated by being reset to null, and some waiting
         * thread, but not necessarily the current leader, is
         * signalled.  So waiting threads must be prepared to acquire
         * and lose leadership while waiting.
         */
        private Thread leader = null;
        private final Condition available = lock.newCondition();
        //取最小时间定时线程任务
        public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {//循环,直到获取到元素为止
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);//定时任务时间到,返回任务执行,并将数组最后一个元素放到队列头,然后依次与两个字节点进行比较,首先与根节点两个子节点[数组坐标1和2]比较,并与最小的交换位置,将之移到队列头,依然类推,在根节点的其中一个分支比较到底(用k<size >>> 1限制)。
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);//等待相应线程的定时时间
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }
        private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
            int s = --size;
            RunnableScheduledFuture<?> x = queue[s];
            queue[s] = null;
            if (s != 0)
                siftDown(0, x);
            setIndex(f, -1);
            return f;
        }
        private void siftDown(int k, RunnableScheduledFuture<?> key) {
            int half = size >>> 1;
            while (k < half) {//在根节点[数组坐标0]的其中一个分支比较到底
                int child = (k << 1) + 1;
                RunnableScheduledFuture<?> c = queue[child];
                int right = child + 1;//每个父节点有两个子节点,两个子节点的大小不确定,所以都需要比较一下
                if (right < size && c.compareTo(queue[right]) > 0)
                    c = queue[child = right];
                if (key.compareTo(c) <= 0)
                    break;
                queue[k] = c;
                setIndex(c, k);
                k = child;
            }
            queue[k] = key;
            setIndex(key, k);
        }

        //新增一个线程任务
        public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                if (i >= queue.length)
                    grow();
                size = i + 1;
                if (i == 0) {
                    queue[0] = e;
                    setIndex(e, 0);//如果只有一个元素,放在首位就行了
                } else {
                    siftUp(i, e);//比较时间大小并插入数组
                }
                if (queue[0] == e) {
                    leader = null;
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }
        /**先将元素放到数组的末尾,并不断与自己的父节点比较并交换位置,直到找到合适位置
         * Sifts element added at bottom up to its heap-ordered spot.
         * Call only when holding lock.
         */
        private void siftUp(int k, RunnableScheduledFuture<?> key) {
            while (k > 0) {
                int parent = (k - 1) >>> 1;//二叉树时间排序值末尾的最大分支的父节点
                RunnableScheduledFuture<?> e = queue[parent];
                if (key.compareTo(e) >= 0)
                    break;//大于父节点的值,则找到了真正的位置,返回
                queue[k] = e;
                setIndex(e, k);//小于父节点的值,和父节点调换位置
                k = parent;
            }
            queue[k] = key;
            setIndex(key, k);
        }

 以上算法逻辑是堆排序。


 三、阻塞队列

ArrayBlockingQueue 底层是数组,有界队列,如果我们要使用生产者-消费者模式,这是非常好的选择。
LinkedBlockingQueue 底层是链表,可以当做无界和有界队列来使用,所以大家不要以为它就是无界队列。
SynchronousQueue 本身不带有空间来存储任何元素,使用上可以选择公平模式和非公平模式。
PriorityBlockingQueue 是无界队列,基于数组,数据结构为二叉堆,数组第一个也是树的根节点总是最小值。

 

 

 

 

 

 

 

  • 大小: 10.6 KB
0
0
分享到:
评论

相关推荐

    Java8并行流中自定义线程池操作示例

    更多java相关内容感兴趣的读者可查看本站专题:《Java进程与线程操作技巧总结》、《Java数据结构与算法教程》、《Java操作DOM节点技巧总结》、《Java文件与目录操作技巧汇总》和《Java缓存操作技巧汇总》希望本文所...

    Python 应用之线程池.pdf

    首先,Python作为一门高级编程语言,非常适合处理网络密集型的任务,比如控制成百上千台客户机、分发任务和汇总提交等。它能够有效地抽象复杂任务,编写数据采集任务并自动完成各种报表数据的分析及收集。Python通过...

    uThreadPool线程池示例(查找0-1亿之间的质数任务)

    5. 回收结果:所有任务完成后,线程池将结果汇总,可能存储在共享数据结构中。 6. 关闭线程池:所有任务执行完毕后,关闭线程池,释放资源。 在压缩包文件“线程池D7”中,可能包含了`uThreadPool`的源代码、示例...

    线程池实现蚁群算法的简单并行

    2. **任务提交**:当需要启动新的搜索任务时,将任务信息(如蚂蚁个体、当前迭代次数)封装成结构体,并提交到线程池的任务队列中。 3. **线程工作逻辑**:线程从队列中取出任务,模拟蚂蚁行走并更新信息素,然后...

    单线程 多线程 线程池 计算随机数的一价和

    在计算随机数的一价和场景中,我们可以设置一个线程池,预先创建好一定数量的线程,每个线程从任务队列中取出一部分随机数进行累加,最后汇总所有线程的计算结果。这种方式既高效又灵活,可以应对任务量的变化。 **...

    fixedThreadPoolPlus:支持在线程池中线程耗尽的情况下,将线程池中所有线程当前正在执行那条代码的信息汇总并打印出来,方便定位问题

    支持在线程池中线程耗尽的情况下,将线程池中所有线程当前正在执行那条代码的信息汇总,作为异常信息抛出, 同时将线程全部的完整堆栈信息保存到用户目录下的 "线程名.detail" 文件中,方便定位及排查线程池耗尽时的...

    多线程并行执行,汇总结果

    这时,可以设置一个`CountDownLatch`,初始化计数值为线程的数量,每个线程执行完后调用`countDown()`,最后主线程通过调用`await()`方法等待计数值变为0,即所有线程执行完毕,然后进行结果的汇总。 **汇总结果** ...

    java线程池处理多并发,所有进程执行完后再统一处理结果

    java线程池处理多并发,所有进程执行完后再统一处理结果线程池配置类多线程并行demo 线程池配置类 import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation....

    多线程访问mysql数据库

    2.mysql5,开源的windows下使用的mysql2次开发库,mysql官网上可以下载 3.testDb,简单的应用DBSqlMan的例子 整个工程已经在VS2005下运行通过,并在实际应用中使用过。提供了mysql访问效率 简单讲解下CMySQLMan类的...

    BATJ面试题汇总及详解(进大厂必看)

    2. 控制并发量:通过设置线程池的大小,可以限制同时运行的线程数量,防止过多线程导致系统资源耗尽。 3. 管理线程:线程池提供了一种机制来管理线程,包括线程的调度、监控和异常处理。 4. 提高响应速度:新任务...

    各大公司java面试大汇总2

    【标题】"各大公司java面试大汇总2"揭示了这个压缩包内容的主旨,它是一个包含多份文档和PDF的集合,旨在提供Java面试的重要问题和答案,帮助求职者准备不同公司的面试。这些文档可能涵盖了从基础到高级的Java编程...

    Android、java面试技巧及常见性面试题型精编汇总.zip

    线程、多线程和线程池面试专题 Android面试中常被问基础知识点汇总宝典 初级面试专题(中小厂) 混合跨平台开发面试题 设计模式面试专题及答案 中高级专题(View+Handler+Binder) 底层机制突破 面试必备之乐观锁与...

    优秀博文汇总1

    本资源摘要信息涵盖了 Java 后端开发的多个方面,包括 JPA 防踩坑姿势、Servlet 3 异步原理与实践、Tomcat 源码剖析、Java 并发编程、Java 线程池、AbstractQueuedSynchronizer、Tomcat 系列、Netty 系列、Kafka ...

    Bat面试题汇总&详解

    2. 直接使用`ThreadPoolExecutor`类,自定义线程池的核心参数,包括核心线程数、最大线程数、线程存活时间、任务队列等。 线程的生命周期包括新建、就绪、运行、阻塞和死亡。僵死进程(僵尸进程)是已经终止但其父...

    Tomcat调优及相关设置汇总-paulen.docx.rar_Tomcat调优及相关设置汇总_flowerd54

    《Tomcat调优及相关设置汇总》是一份详细探讨如何优化Apache Tomcat服务器性能的重要文档,由作者flowerd54编写。Tomcat作为广泛使用的开源Java Servlet容器,其性能调优对于提升Web应用的响应速度和处理能力至关...

    2019金三银四30家公司面试题汇总.doc

    Java 线程池原理是指 Java 中的线程池实现机制,包括线程池的创建、线程的执行和线程的回收等过程。CAS 机制是 Compare-And-Swap 的缩写,是一种原子操作机制,用于实现多线程环境下的同步。Synchronized 是 Java 中...

    java高级面试题汇总

    Java高级面试题汇总 本资源摘要信息中,我们将对 Java 高级面试题进行总结和分析,涵盖了 Java 面试题的方方面面,从数据类型转换到框架选择,从线程池状态到 Java 8 的元空间替换,从跳出多重循环到 JSON 对象解析...

    java基础编程题目总汇

    Java基础编程题目总汇是针对Java初学者设计的一系列练习题,旨在巩固和深化Java编程基础知识。这些题目覆盖了从基本语法、数据类型到控制结构、类与对象、异常处理等多个核心领域,对于想要系统性地提升Java编程技能...

    2023年面试记录汇总

    2. **JVM调优**: - 调优主要包括内存配置、垃圾收集器选择、性能监控等,目标是提高系统性能和稳定性,避免内存泄漏或过度垃圾回收。 - 参数如`-Xms`和`-Xmx`用于设置堆内存的最小和最大值,`-XX:NewRatio`调整...

Global site tag (gtag.js) - Google Analytics