`

读部分ThreadPoolExecutor源码

阅读更多
//线程池
//先看构造方法
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }


public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }


//执行线程并添加到集合中
/**
如果当前线程比corePoolSize小,那么直接新启一个线程放入到集合中并执行。
(这里执行其实是无限循环从队列中获取任务来执行)
否则将任务放入队列中,由线程去取来执行。
*/
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
       
        int c = ctl.get();
	//如果当前线程数量小于corePoolSize
        if (workerCountOf(c) < corePoolSize) {
	    //新增线程并执行线程
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
	        //拒绝任务
                reject(command);
            //线程数为0新增线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }


//将线程添加到集合中并执行
 private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
		//新增线程数成功跳出循环
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
		   //这里是关键这里会执行runWorker(this);
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
	       //尝试终止线程
                addWorkerFailed(w);
        }
        return workerStarted;
    }


 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
	    //无限循环等待接受线程
            while (task != null || (task = getTask()) != null) {
                w.lock();
                //是否需要中断
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
		    //钩子方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
		        //钩子方法
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
	    //如果发生异常了做清除工作
            processWorkerExit(w, completedAbruptly);
        }
    }


//从队列中获取任务
 private Runnable getTask() {
        boolean timedOut = false; 
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            boolean timed;      // Are workers subject to culling?

            for (;;) {
                int wc = workerCountOf(c);
                timed = allowCoreThreadTimeOut || wc > corePoolSize;

                if (wc <= maximumPoolSize && ! (timedOut && timed))
                    break;
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

//线程执行完做一些清理工作
private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
	    //删掉当前线程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        
	//尝试终止
        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
	    //添加一个线程处理线程池中的任务
            addWorker(null, false);
        }
    }

 final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }


分享到:
评论

相关推荐

    安卓源码和异步处理

    在安卓应用开发中,源码和异步处理是至关重要的组成部分。源码是应用程序的核心,它揭示了程序的内部工作原理,而异步处理则确保了用户界面的流畅性,避免了因耗时操作导致的卡顿。在这个场景中,我们有两个安卓项目...

    《Java并发编程高阶技术-高性能并发框架源码解析与实战》学习.zip

    这些框架如线程池(java.util.concurrent.ThreadPoolExecutor)、并发集合(ConcurrentHashMap、CopyOnWriteArrayList等)以及同步工具类(Semaphore、CyclicBarrier、CountDownLatch等),都是Java并发编程的核心...

    java多线程设计模式详解(PDF及源码)

    Java多线程设计模式是Java开发中不可或缺的一部分,它涉及到并发编程、系统性能优化以及程序的稳定性。在Java中,多线程可以提高程序的执行效率,通过将任务分解到多个独立的执行线程,使得程序能同时处理多个任务。...

    java高并发编程源码.zip

    - **CopyOnWriteArrayList/CopyOnWriteArraySet**:写时复制策略,读操作无锁,写操作复制整个数组,适合读多写少的场景。 5. **线程池**: - **ExecutorService**:Java并发框架的一部分,提供线程池服务,可以...

    Java多线程设计模式(附带书中源码)

    4. **读写锁模式**:Java的`ReentrantReadWriteLock`提供了读写锁,允许多个读操作并发执行,但在写操作时,所有读写操作都会阻塞,从而保证数据的一致性。这种模式提高了对共享资源的并发访问效率。 5. **条件变量...

    java并发集合

    Java并发集合是Java并发编程的一部分,提供了线程安全的数据结构,使得多个线程可以同时访问并修改集合内容而不会产生数据不一致的问题。这些集合类主要存在于`java.util.concurrent`包中,由Doug Lea设计,极大地...

    java多线程设计模式详解

    Java的ExecutorService和ThreadPoolExecutor类提供了线程池的实现,可以根据任务需求调整线程池的大小、工作队列容量等参数。 5. **读写锁模式**:在多线程访问共享数据时,读操作通常比写操作更频繁且不会改变数据...

    Java Concurrency的一些资料

    在Java编程领域,多线程并发处理是不可或缺的一部分,它涉及到如何高效地利用多核处理器资源,提升程序的运行效率。本资料集主要探讨的是Java Concurrency相关的知识点,旨在帮助开发者理解和掌握Java平台上的并发...

    线程的同步(培训机构内部资料)

    线程同步的主要目标是确保数据的一致性和完整性,防止“脏读”、“不可重复读”和“幻读”等并发问题。 Java提供了多种线程同步机制,包括: 1. **synchronized 关键字**:这是Java中最基础的同步机制,可以用于...

    异步调度设计

    在Java中,我们可以利用线程池(ThreadPoolExecutor)进行异步任务调度。线程池允许我们预先创建一定数量的线程,任务提交后,线程池中的空闲线程会立即处理,而不需要每次都创建新线程,这降低了系统开销。另外,...

    java-concurrent:java 并发编程例子

    在Java编程领域,并发编程是不可或缺的一部分,尤其是在多线程应用和服务器端开发中。`java-concurrent`通常指的是Java的并发包`java.util.concurrent`,这个包提供了大量的类和接口,帮助开发者有效地管理和控制...

    job4j_threads

    在Java编程中,多线程是不可或缺的一部分,尤其在当今高性能、高并发的应用场景下,掌握多线程技术至关重要。"job4j_threads"是一个专门针对Java多线程学习的资源包,它可能包含了项目源码、教程文档等内容,旨在...

    DeFiler:一个简单的多线程文件系统,称为Devil File System,或DeFiler

    在Java中实现多线程,开发者通常会使用Thread类、Runnable接口或者是ExecutorService,ThreadPoolExecutor等高级API。DeFiler的实现可能包括了线程池的设计,以有效地管理和调度线程,避免过度创建和销毁线程带来的...

Global site tag (gtag.js) - Google Analytics