`
wubo.wb
  • 浏览: 28524 次
  • 来自: 成都
文章分类
社区版块
存档分类
最新评论
阅读更多

在AQS队列中通过nextWaiter指针串起来的就是条件队列,实际上是通过ConditionObject来实现的。ConditionObject类实现了Condition接口。Condition 实现可以提供不同于 Object 监视器方法的行为和语义。比如一个对象里面可以有多个Condition,可以注册在不同的condition,可以有选择性的调度线程,很灵活。而Synchronized只有一个condition(就是对象本身),所有的线程都注册在这个conditon身上,线程调度不灵活。

Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。Condition 实例实质上被绑定到一个锁上。要为特定 Lock实例获得 Condition 实例,请使用其 newCondition() 方法。

条件变量在某个状态条件到达之前一直挂起该线程,由于多个线程都会访问这个共享状态信息,因此这个状态信息必须是线程安全的,这时就需要有锁来支撑。比如等待一个条件变量需要以原子方式释放获取的锁,并挂起当前线程,这和object.wait()类似。

以下是Condition接口的主要方法:

public interface Condition {
    void await() throws InterruptedException;
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();
}

多个wait方法对应object.wait()方法,signal方法对应object.notify()方法,signalAll方法对应object.notifyAll()方法。下面分析每个方法的实现过程:

 await操作,前面说过await主要做了两件事:释放锁,然后挂起当前线程,以下是它的具体实现:

        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

 1、通过addConditionWaiter把当前线程加到条件队列

        /**
         * Adds a new waiter to wait queue.
         * @return its new wait node
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

a、如果条件队列队尾结点waitStatus不为CONDITION,证明该条件队列可能包含被取消的线程,unlinkCancelledWaiters操作正是踢掉这些waitStatus不为CONDITION的结点。

b、新建一个含有当前线程的结点放到条件队列尾部

c、返回新结点

2、然后是释放当前结点拥有的锁,不然会造成死锁,这是通过fullyRelease来实现的

    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

 fullyRelease首先获取state,然后调用release方法释放锁,如果释放成功,直接返回,如果释放失败,证明当前结点存在异常,设置当前结点为CANCEL。

 release方法是调用tryRelease来实现的,tryRelease成功后需要唤醒在等待的线程,此处唤醒的是同步队列中的第一个非CANCEL结点。

3、自旋挂起当前线程,直到被唤醒,超时或者被CANCEL.这是通过while循环实现的,如果没有在同步队列中就会一直挂起。是否在同步队列是通过isOnSyncQueue判断的

final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        return findNodeFromTail(node);
   }

 显然waitStatus为CONDITION这种结点,只属于条件队列,不在同步队列中。

4、 获取锁,并从条件队列中移除,表示它已经拿到锁了。获取锁是通过acquireQueued实现的

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

 acquireQueued的过程如下:

a、如果当前结点的前一个结点是傀儡节点,就尝试获取锁,如果获取成功就设置头结点为当前结点。

b、到此表明还有其他结点在排除,当前就应该被挂起,挂起当前线程,设置中断标志,然后返回。

整个await就经过这几步(释放锁---加入条件队列----挂起---条件满足进入同步队列),其他await方法过程差不多,这里不再分析了。

signal就是要将条件队列中的第一个结点唤醒,以下是它的具体实现:

        /**
         * Moves the longest-waiting thread, if one exists, from the
         * wait queue for this condition to the wait queue for the
         * owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

  1、首先判断正在运行是不是当前线程,如果不是,则抛出异常!这是通过 isHeldExclusively来实现的,isHeldExclusively方法在AQS中是一个抽象方法,有多种实现,以下是在ReentrantLock中的实现:

        protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

 2、然后调用doSignal方法唤醒第一个线程

    private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

    final boolean transferForSignal(Node node) {
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

 doSignal()方法是一个while循环,直到transferForSignal成功为止,transferForSignal做了两件事,1、将结点从条件队列移到同步队列中,2、设置同步中它的前趋结点的waitStatus为SIGNAL,并挂起当前线程。其实doSignal就是不断地完成这样一个transfer(条件队列--->同步队列)操作,直到有一个成功为止。

signalAll就是唤醒条件列表中的所有线程,实现很简单,循环调用signal方法即可。

public final void signalAll() {
	if (!isHeldExclusively())
		throw new IllegalMonitorStateException();
	Node first = firstWaiter;
	if (first != null)
		doSignalAll(first);
}

/**
 * Removes and transfers all nodes.
 * @param first (non-null) the first node on condition queue
 */
private void doSignalAll(Node first) {
	lastWaiter = firstWaiter = null;
	do {
		Node next = first.nextWaiter;
		first.nextWaiter = null;
		transferForSignal(first);
		first = next;
	} while (first != null);
}
 

 

 

 

 

 

 

 

 

分享到:
评论

相关推荐

    带你看看Java-AQS同步器 源码解读四 条件队列Condition上

    `Lock`接口中提供了`newCondition()`方法来创建一个与该锁关联的新条件对象。 让我们通过一个小例子来理解条件队列的作用。假设有一个资源管理器,只有在资源可用时,线程才能执行某些操作。线程A获取资源,发现...

    深入理解Java中的AQS.docx

    AQS支持条件变量,允许线程在满足特定条件时等待。每个条件变量关联一个单独的等待队列,当调用`signal()`或`signalAll()`时,符合条件的节点会从条件队列转移到同步队列,等待锁的释放。 总之,AQS是Java并发库的...

    java并发编程:juc、aqs

    此外,AQS还支持条件变量(Condition),每个Condition对应一个单向链表,只有在使用条件变量时才会创建。 3. **核心方法**:AQS提供了几个关键的抽象方法,如`tryAcquire()`和`tryRelease()`用于独占模式的资源...

    Java并发编程解析 | 解析AQS基础同步器的设计与实现

    "Java并发编程解析 | 解析AQS基础同步器的设计与实现" 在Java领域中,解决并发编程问题的...在Java领域中,AQS是基于MESA模型的管程技术的实现,MESA模型中引入了条件变量的概念,每个条件变量对应有一个等待队列。

    深入浅出学习AQS组件

    下面对AQS的基本执行过程、队列节点结构、独占锁模式、共享锁模式、条件队列和API进行了详细的介绍。 一、基本执行过程 AQS的基本执行过程就是尝试获取锁,成功则返回,如果失败就进入同步队列进行锁资源的等待。 ...

    并发编程之synchronized&Lock&AQS详解(1)1

    1. `synchronized`:这是一种内置锁,也称为对象锁,它的作用粒度是对象。同步实例方法和同步代码块都会锁定对应对象,防止多个线程同时访问。`synchronized`在JVM层面是基于监视器锁(Monitor)实现的,依赖于操作...

    aqs-并发编程(2)笔记.pdf

    在并发编程中,保护性暂停模式(Guarded Suspension Design Pattern)是一种常用...不过需要注意的是,在实现的时候要考虑到多线程环境中可能出现的竞态条件、死锁和资源竞争等问题,并采取相应的措施进行预防和解决。

    java锁详解.pdf

    2. AQS 的内部实现:AQS 机制的内部实现基于 Node 对象和同步队列。 3. 自定义锁:AQS 机制可以用于实现自定义锁。 六、总结 Java 锁是 Java 并发编程中的一种基本机制,用于确保线程安全和避免竞争条件。了解 ...

    04 并发编程专题07.zip

    AQS的核心是状态变量(state)和基于双向链表的条件队列。通过继承AQS,开发者可以自定义同步器,只需关注如何改变状态以及如何根据状态转移来唤醒或者阻塞线程,而无需关心具体的线程调度和队列操作。 具体到...

    04 并发编程专题08.zip

    线程通过acquire()方法请求资源,如果资源不足(state不满足条件),则进入等待队列;当其他线程释放资源(调用release())时,会唤醒等待队列中的一个线程,使其重新尝试获取资源。 AQS支持自定义同步组件,通过...

    针铁矿的非生物还原解离特征及Slogistic模型拟合

    在上述给定的文件中,研究人员朱维晃、臧辉和黄廷林以针铁矿为研究对象,专注于分析其非生物还原解离特征,并探讨了电子供体、针铁矿浓度以及氧化还原中介体AQS等因素对其非生物还原解离作用的影响。研究的结论表明...

    并发编程,学习手记.pdf

    7. **测试案例**:为了验证并发代码的正确性,通常会编写测试案例,例如`ReentrantReadWriteLock`的读写锁测试、`CountDownLatch`的计数器测试等,这些测试能帮助开发者找出潜在的竞态条件和死锁等问题。 8. **设计...

    Java并发系列之AbstractQueuedSynchronizer源码分析(概要分析)

    在AQS中,每个等待的线程都会被封装成一个Node对象,包含线程引用和等待状态。Node的状态包括以下几种: - **CANCELLED**:线程已经取消请求,不再等待。 - **PARKED**:线程已暂停,等待被唤醒或取消。 - ** SIGNAL...

    java面试讲题汇总-word可打印版

    2. == 和 equals 的区别:==比较的是两个对象的引用是否相同,而equals()方法在默认情况下也是比较对象引用,但可以被重写来比较对象内容。 3. final在Java中的作用:final用于声明常量、防止变量被重新赋值、确保...

    04-并发编程面试题-重点.docx

    死锁的条件:互斥、占有并等待、不可剥夺、循环等待。 十二、锁升级 锁升级是指在多线程环境下,如何从低级锁升级到高级锁。 锁升级的原理:从偏向锁升级到轻量级锁,从轻量级锁升级到重量级锁。 十三、AQS AQS...

    多线程面试经典问答.docx

    阻塞状态是线程因等待某些条件满足(如I/O完成或锁释放)而暂停执行。死亡状态是线程执行完毕或被强制停止。 CAS操作是一种非阻塞的原子操作,通过JNI调用CPU的cmpxchg指令,比较并更新内存值。在多线程环境下,CAS...

    Java并发系列之AbstractQueuedSynchronizer源码分析(共享模式)

    - **条件队列**:AQS支持条件队列,线程在满足特定条件后才能获取锁,这对于实现复杂的同步逻辑非常有用。 通过对AQS源码的深入学习,我们可以更好地掌握Java并发编程的底层机制,提高并发程序的设计和调试能力。在...

    java并发等待条件的实现原理详解

    `Condition`接口是`AbstractQueuedSynchronizer(AQS)`类的一部分,AQS是Java并发包中许多锁和同步器的基础。每个`Condition`对象都关联了一个等待队列,当线程调用`Condition`的`await()`方法时,线程会被阻塞并加入...

    java面试题2024资源下载

    - wait会释放对象锁,sleep不会释放对象锁。 - **中断响应**: - wait可以被中断,sleep也可以被中断。 #### 二十八、线程池中常见的阻塞队列 - **ArrayBlockingQueue**: - 基于数组结构的有界阻塞队列。 - **...

    【美团】Java 岗 154 道面试题1

    38. **对象被垃圾回收条件**:无引用可达,且经过两次GC标记。 39. **Java 内存分配与回收策略**:新生代和老年代划分, Minor GC 和 Major GC 分别处理年轻代和老年代的垃圾。 40. **JVM 永久代回收**:Java 8...

Global site tag (gtag.js) - Google Analytics