`
jnullpointer
  • 浏览: 15831 次
  • 性别: Icon_minigender_1
社区版块
存档分类
最新评论

AbstractQueuedSynchronizer方法解析

阅读更多
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                //如果设置t.next = node;
                //compareAndSetTail如果设置tail失败,则需要解除t的next关联,所以在compareAndSetTail设置成功后再设置t.next = node
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }


    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;

        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED;
        //分三种情况:
         //1.如果node为tail,将pred设为tail,pred.next设为空
         //2.如果node不为tail,且pred为signal时,不用unpark后续节点,
         //  只需要设置pred.next = node.next,丢失中间所有cancell的节点
         //3.pred为head节点,unpark后续节点
        // If we are the tail, remove ourselves.
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }
            //导致unparkSuccessor只能从tail往前找waitStatus <= 0的节点
              //从head节点往后找会出现闭环
            node.next = node; // help GC
        }
        //比jdk1.5的实现要更加精确
    }


       
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        /*
        1.如果前一个节点的等待状态waitStatus<0,也就是前面的节点还没有获
           得到锁,那么返回true,表示当前节点(线程)就应该park()了。否则进行2。 
          2.如果前一个节点的等待状态waitStatus>0,也就是前一个节点
            被CANCELLED了,那么就将前一个节点去掉,递归此操作直到所有前一个节
            点的waitStatus<=0,进行4。否则进行3。 
          3.前一个节点等待状态waitStatus=0,修改前一个节点状态位为SINGAL,
            表示后面有节点等待你处理,需要根据它的等待状态来决定是否该park()。
            进行4。 
          4.返回false,表示线程不应该park()。
        */
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }


    private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
        long lastTime = System.nanoTime();
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                if (nanosTimeout <= 0)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                //如果nanosTimeout < spinForTimeoutThreshold
                //自旋会比线程切换更有效率,否则才做park
                    LockSupport.parkNanos(this, nanosTimeout);
                long now = System.nanoTime();
                nanosTimeout -= now - lastTime;
                lastTime = now;
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }


    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        //前任节点状态为cancelled,compareAndSetWaitStatus设置前任节点
         //状态失败,表明前任节点此时已经cancelled,唤醒线程
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
分享到:
评论

相关推荐

    Java并发系列之AbstractQueuedSynchronizer源码分析(概要分析)

    **Java并发系列之AbstractQueuedSynchronizer源码分析概要** **1. AbstractQueuedSynchronizer(AQS)的定义与作用** AbstractQueuedSynchronizer(AQS)是Java并发编程中的核心组件,它是一个抽象的、基于FIFO...

    AbstractQueuedSynchronizer.pdf

    Java大神Doug Lea对AQS的解析:Most synchronizers (locks, barriers, etc.) in the J2SE1.5 java.util.concurrent package are constructed using a small framework based on class AbstractQueuedSynchronizer. ...

    ThreadPoolExecutor源码解析.pdf

    《ThreadPoolExecutor源码解析》 ThreadPoolExecutor是Java并发编程中重要的组件,它是ExecutorService接口的实现,用于管理和调度线程的执行。理解其源码有助于我们更好地控制并发环境下的任务执行,提高系统的...

    JDK_AQS解析

    本文将详细解析AQS中的关键方法以及其工作原理。 #### 锁的类图与架构 AQS采用模板方法模式,大多数与锁相关的操作都在`AbstractQueuedSynchronizer`类中完成。它提供了一个同步器的框架,其中包含了共享资源的状态...

    Java并发编程解析 | 解析AQS基础同步器的设计与实现

    "Java并发编程解析 | 解析AQS基础同步器的设计与实现" 在Java领域中,解决并发编程问题的关键是解决同步和互斥的问题。同步是指线程之间的通信和协作,互斥是指同一时刻只能允许一个线程访问共享资源。Java领域中有...

    第六章 ReentrantLock源码解析2--释放锁unlock()1

    从源码中可以看到,unlock()方法调用了AbstractQueuedSynchronizer的release()方法,并传入参数1,表示要释放一个锁的数量。release()方法的源码如下: ```java public final boolean release(int arg) { if ...

    ReentrantLock解析

    《ReentrantLock深度解析》 在Java并发编程中,ReentrantLock是JDK提供的一个可重入互斥锁,它是java.util.concurrent.locks包下的核心类。与synchronized关键字相比,ReentrantLock提供了更高的灵活性,如尝试加锁...

    [原创]咕泡教育30万字大厂面试真题深度解析(1).pdf

    1. AQS 的理解:AQS(AbstractQueuedSynchronizer)是 Java 中的一种同步器框架,提供了一种能够实现锁的机制,用于协调多个线程之间的访问。AQS 的核心思想是使用一个 volatile 变量来记录锁的状态,并使用 CAS ...

    21 更高级的锁—深入解析Lock.pdf

    【标题】: "21 更高级的锁—深入解析Lock.pdf" 【描述】: 这份资料详细介绍了Java并发编程中的高级锁机制,特别是ReentrantLock的使用与原理。 【标签】: java, 并发, 编程, 宝典 在Java并发编程中,synchronized...

    Java源码解析之可重入锁ReentrantLock

    Java源码解析之可重入锁ReentrantLock ReentrantLock是一个可重入锁,在ConcurrentHashMap中使用了ReentrantLock。它是一个可重入的排他锁,它和synchronized的方法和代码有着相同的行为和语义,但有更多的功能。 ...

    CountDownLatch源码解析之countDown()

    compareAndSetState()方法是AQS(AbstractQueuedSynchronizer)类中的一个方法,用于比较并设置当前状态的值。该方法的实现可以分为以下两步: * 首先,通过unsafe.compareAndSwapInt()方法比较当前状态的值expect...

    Chinese_AQS:添加中文注释, 重新按业务部门开发工程师的思维角度组织代码中的方法顺序, 使中国程序员轻松理解 AbstractQueuedSynchronizer;

    《Chinese_AQS:为中国程序员解析AbstractQueuedSynchronizer》 在Java并发编程领域,AbstractQueuedSynchronizer(AQS)是一个至关重要的组件,它为实现锁和其他同步构建块提供了基础。然而,由于其复杂性和原始...

    基于JDK源码解析Java领域中的并发锁之设计与实现.pdf

    本文将基于JDK源码解析Java领域中的并发锁,探讨AQS基础同步器、LockSupport、Condition接口、Lock接口、ReadWriteLock接口以及自定义API操作的设计与实现。 一、AQS(AbstractQueuedSynchronizer)基础同步器的...

    Semaphore 源码解析

    Semaphore的核心实现基于Java并发库中的AbstractQueuedSynchronizer(AQS)类,它是一个抽象类,提供了线程同步的基本框架。Semaphore类有两个内部静态类——NonfairSync和FairSync,分别对应非公平锁和公平锁。在...

    多线程countDownLatch方法介绍

    CountDownLatch内部使用了AQS(AbstractQueuedSynchronizer)来实现同步控制。AQS维护了一个整型状态字段,对于CountDownLatch,这个状态就是计数器的值。`countDown()`方法相当于原子性地对状态进行减1操作,而`...

    基于JDK源码解析Java领域中的并发锁,我们需要特别关注哪些内容?

    基于JDK源码解析Java并发锁,我们需要关注以下几个关键知识点: 1. **AQS(AbstractQueuedSynchronizer)基础同步器**: AQS是一个用于构建锁和同步器的框架,它维护了一个FIFO的等待队列,提供了两种模式:独占和...

    CountDownLatch源码解析之await()

    下面我们将详细解析CountDownLatch源码之await()方法的原理。 首先,我们来看await()方法的源码: public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } 可以看到,await...

    Java并发编程原理与实战

    AbstractQueuedSynchronizer(AQS)详解.mp4 使用AQS重写自己的锁.mp4 重入锁原理与演示.mp4 读写锁认识与原理.mp4 细读ReentrantReadWriteLock源码.mp4 ReentrantReadWriteLock锁降级详解.mp4 线程安全性问题简单总结...

    第五章 ReentrantLock源码解析1--获得非公平锁与公平锁lock()1

    本篇文章将深入解析ReentrantLock的源码,重点讨论非公平锁和公平锁的获取过程。 1. **ReentrantLock的基本概念** ReentrantLock是由Java提供的可重入互斥锁,支持公平锁和非公平锁两种模式。非公平锁的特性是获取...

    ReentrantLock源码解析(二)

    ReentrantLock的核心实现基于AbstractQueuedSynchronizer(AQS),这是一个抽象的队列式同步器。AQS维护了一个FIFO的等待队列,用于存储等待锁的线程。每个节点表示一个线程,节点间通过状态字段进行同步。 1.1 AQS...

Global site tag (gtag.js) - Google Analytics