`
rxin2009
  • 浏览: 17357 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

CountDownLatch、Semaphore、FutureTask源码解析

 
阅读更多

本文的主要内容是认识CountDownLatch、Semaphore、FutureTask的源码,这三个类在AQS中都用的是共享模式的,而且是可中断的,不同的是各自的请求锁和释放锁的操作,先说他们的不同点,在来说相同点。

 

首先看看CountDownLatch(闭锁),这个类在构建的时候初始一个阀值N,每调用一次countDown()后,N将减一直到为0时,线程才可以通过,下面来看看请求锁的代码

public int tryAcquireShared(int acquires) {
            return getState() == 0? 1 : -1;
        }

 

这里的state就是构建的时候指定的阀值,当state=0时,线程可以通过,否则进入同步队列等待。再来看看释放锁的操作

 public boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

      每执行一次countDown()方法就将state减一,如果state=0就返回ture,那么他将唤醒等待队列中第一个等待者。将这两部分串起来理解就是这个类的特点了。

 

      接下来看看Semaphore(信号量),这个类有两种状态:公平和非公平,这里介绍的是非公平的实现。信号量的意义是指在构建的时候指定许可数N,在超过N个线程请求后,线程将进入同步队列等待,当然它也提供了增加许可的方法,现在来看看请求许可的代码

final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

 这是一个很典型的非阻塞算法,每次请求的时候将当前许可数减一,如果剩余许可remaining>=0,将执行cas操作,然后返回remaining,如果剩余许可大于等于0,线程可通过,否则进入同步队列等待。

再来看看释放许可的代码

 protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int p = getState();
                if (compareAndSetState(p, p + releases))
                    return true;
            }
        }

将许可数state增加指定的值,cas操作成功后返回,否则重试。将这两部分的代码结合起来理解就是信号量要表达的意思了。

 

接下来看看FutureTask,这是一种异步任务的实现,它的特点就是在任务执行完之前去获取任务的结果,线程将进入同步队列等待直到任务完成,如果任务已完成直接获得结果。

在FutureTask的实现中,它自己定义了三个状态

 /** State value representing that task is running */
        private static final int RUNNING   = 1;
        /** State value representing that task ran */
        private static final int RAN       = 2;
        /** State value representing that task was cancelled */
        private static final int CANCELLED = 4;

 具体的含义有英文解释,定义的值是1、2、4是为了方便位运算,下面来看看请求锁的代码

/**
         * Implements AQS base acquire to succeed if ran or cancelled
         */
        protected int tryAcquireShared(int ignore) {
            return innerIsDone()? 1 : -1;
        }

方法innerIsDone()表示任务是否完成,包括RAN和CANCELLED,任务在刚新建的时候的状态是为0,执行的过程中为

1,执行完为2,被删除后状态为4。下面再来看看"释放锁"的代码

/**
         * Implements AQS base release to always signal after setting
         * final done status by nulling runner thread.
         */
        protected boolean tryReleaseShared(int ignore) {
            runner = null;
            return true;
        }

 

始终返回true,结合这两部分代码就可以理解FutureTask的特点了。

 

最后来看看他们的相同部分:共享模式的入队操作,代码如下

/**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) { //标记1
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    break;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
        // Arrive here only if interrupted
        cancelAcquire(node);
        throw new InterruptedException();
    }

 

代码中同独占模式最大的不同点在于标记1的位置,条件判断中变量r在闭锁中表示state是否为0(true返回1,否则为-1),信号量中表示剩余信号量的值,FutureTask中表示任务是否完成(true返回1,否则返回-1),接下来看看

setHeadAndPropagate(node, r)的代码

private void setHeadAndPropagate(Node node, int propagate) {
        setHead(node);
        if (propagate > 0 && node.waitStatus != 0) {
            /*
             * Don't bother fully figuring out successor.  If it
             * looks null, call unparkSuccessor anyway to be safe.
             */
            Node s = node.next;
            if (s == null || s.isShared())
                unparkSuccessor(node);
        }
    }

 将当前节点设置为头结点,如果有足够的许可,并且后面还有等待节点就唤醒下一个节点,这一点和独占模式比起来就有一点共享的意思,比如CountDownLatch,只要countDown次数足够全部的等待线程都会唤醒;Semaphore,只要还有信号量就会继续唤醒等待线程;FutureTask,只要任务完成,所有的等待线程都将唤醒。

这里的中断机制是,线程被中断后,先把节点从同步队列中出队后抛出中断异常,java中中断就是这种模式的:线程A中断线程B后,由线程B选择适当的时候响应中断,这个操作由线程B自己处理的。

 

以上三个类都是AQS在共享模式下的应用。

分享到:
评论

相关推荐

    线程实例(并发库引入到Java标准库 )

    有以下类的实例: ThreadPool ScheduledThread CyclicBarrier BlockingQueue CountDownLatch FutureTask CompletionService Semaphore

    Java并发编程原理与实战

    CountDownLatch,CyclicBarrier,Semaphore源码解析.mp4 提前完成任务之FutureTask使用.mp4 Future设计模式实现(实现类似于JDK提供的Future).mp4 Future源码解读.mp4 ForkJoin框架详解.mp4 同步容器与并发容器.mp4 ...

    java_util_concurrent中文版pdf

    4. **同步工具类**:CountDownLatch、CyclicBarrier、Semaphore和FutureTask等工具类为并发编程提供了更多的协调机制。CountDownLatch用于一次性释放多个线程,CyclicBarrier则允许一组线程等待所有线程到达屏障点后...

    javaforkjoin源码-xxy-JavaStudy:xx-JavaStudy

    java forkjoin 源码 JDK源码学习: Java 容器 ArrayList LinkedList PriorityQueue ...CountDownLatch CyclicBarrier Semaphore ForkJoin FutureTask BlockingQueue Spring AOP IOC 面向面经复习 ​

    Java并发包源码分析(JDK1.8)

    AQS相关应用(CountDownLatch、CyclicBarrier、Semaphore等),executor(ThreadPoolExecutor、ScheduledThreadPoolExecutor、FutureTask等),collection(ConcurrentHashMap、CopyOnWriteArrayList等), ...

    疯狂Java面试题

    - 并发工具类:Semaphore,CyclicBarrier,CountDownLatch,FutureTask等。 6. **IO流** - 字节流与字符流:InputStream、OutputStream、Reader、Writer的区别。 - 流的分类:节点流与处理流,输入流与输出流。 ...

    笔记-2、线程的并发工具类2

    - CountDownLatch的放行是由第三方(持有计数器的线程)控制,而CyclicBarrier的放行是由参与等待的线程自身控制。 - CountDownLatch的放行条件是计数器的值大于等于线程数,而CyclicBarrier的放行条件是计数器值...

    这就是标题—— JUC.pdf

    CountDownLatch (减少计数器) CyclicBarrier(加法计数器) Semaphore(信号量,流量控制) ReentrantReadWriteLock (读写锁) BlockingQueue(阻塞队列) 线程池 池化技术 线程池的优势 线程池的特点 线程池三大...

    龙果 java并发编程原理实战

    第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57分钟 | 第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future...

    Java 并发编程原理与实战视频

    第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57分钟 | 第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future...

    龙果java并发编程完整视频

    第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57分钟 | 第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future...

    java并发编程

    第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57分钟 | 第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future...

    java多线程并发编程例子

    5. **CountDownLatch**:`CountDownLatch`是一个计数器,通常用于让一个线程等待其他线程完成操作。计数器减到零时,所有等待的线程都可以继续执行。`TestCountDownLatch.java`可能展示了如何使用`CountDownLatch`来...

    JUC+课程源码+线程操作

    在JUC中,核心组件包括`ExecutorService`、`Semaphore`、`CountDownLatch`、`CyclicBarrier`、`Future`、`BlockingQueue`等。这些组件提供了丰富的线程管理、同步、通信和协调机制。 1. `ExecutorService`:它是...

    java中的并发变成学习笔记2

    - **FutureTask**:代表一个异步计算的结果,它实现了`Future`接口,允许查询计算是否完成,获取结果或者取消计算。 2. **使用场景** - **CountDownLatch**:适用于需要等待多个线程执行完毕后再继续的场景,如...

    backport-util-concurrent-3.1.jar

    这个库包含了许多实用的工具类,如`FutureTask`、`CountDownLatch`、`Semaphore`等,这些工具类可以帮助开发者更好地处理并发问题,提高程序的性能和可扩展性。 主要特点如下: 1. 兼容性:这个库兼容JDK 1.5及以上...

    个人学习JUC代码笔记总集

    如线程池(ExecutorService)、并发容器(ConcurrentHashMap、CopyOnWriteArrayList等)、锁机制(ReentrantLock、Semaphore等)、原子类(AtomicInteger、AtomicReference等)以及并发工具类(CountDownLatch、...

    java实现线程的异步

    还需要考虑并发策略,如同步机制(synchronized、Lock)、并发容器(ConcurrentHashMap、CopyOnWriteArrayList)等,以及异常处理和线程间的通信(wait/notify、CountDownLatch、CyclicBarrier、Semaphore)。...

Global site tag (gtag.js) - Google Analytics