`

ReentrantLock

阅读更多

   昨天看了reentrantLock的源码码,分析一下:

public class ReentrantLock implements Lock, java.io.Serializable {
//调用AbstractQueuedSynchronizer的release方法
    public void unlock() {
        sync.release(1);
    }

//调用FairSync或者NonfairSync的lock方法
    public void lock() {
        sync.lock();
    }

}
//sync抽象类
static abstract class Sync extends AbstractQueuedSynchronizer {
       
        //如果当前锁未被占有,那么当前线程占有锁。如果锁被当前线程占有,更新状态
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
         //如果状态是0,并且当前线程是持有锁的线程//setExclusiveOwnerThread(null);
         protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
  }
 //非公平的sync
  final static class NonfairSync extends Sync {
   //锁住当前对象
   final void lock() {
            //如果没有被锁,即state = 0,set state = 1 
            if (compareAndSetState(0, 1))
                //让当前线程占有锁
                setExclusiveOwnerThread(Thread.currentThread());
            else
                //调用父类AbstractQueuedSynchronizer的acquire()方法
                acquire(1);
        }
     protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
  }
  //公平的sync
   final static class FairSync extends Sync {
        private static final long serialVersionUID = -  3000897897090466540L;
        //调用AbstractQueuedSynchronizer中的acquire方法
        final void lock() {
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                //判断前面是不是已经有线程在申请锁,公平锁需要等待
                if (isFirst(current) &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
}

public abstract class AbstractQueuedSynchronizer{
private static final long stateOffset;
static {
        try {
            //state成员变量的偏移量
            stateOffset = unsafe.objectFieldOffset            (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));

        } catch (Exception ex) { throw new Error(ex); }
    }
//如果变量state的值等于expect,把state设置成update
protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
  //设置占有锁的线程
  protected final void setExclusiveOwnerThread(Thread t) {
        exclusiveOwnerThread = t;
    }

public final void acquire(int arg) {
        //调用子类的tryAcquire()方法,如果不能加锁acquireQueued返回true,那样//interrupt当前线程
        if (!tryAcquire(arg) &&
             //将当前线程加入等待队列中
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}
public final boolean release(int arg) {
        //判定是否可以释放当前的锁
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling. It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0); 

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         * 先查看当前节点的下一个节点是不是有效的,如果有效就直接处理。如果无效就从tail开始往前找。默认情况下是会将先等待的节点,先唤醒
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            //调用unsafe.unpark方法,unloke当前线程
            LockSupport.unpark(s.thread);
    }

}

static final class Node {
        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3; 
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;
        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened. 
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified only using
         * CAS.
         */
        volatile int waitStatus;

        final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //如果是第一个节点,则尝试去获取锁
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //获取失败,则会被pack
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    //获取锁,但是当线程被中断的时候,抛出一个InterruptedException,和acquire方法的区别是acquire方法即使被中断也会继续尝试获取锁    
public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

 /**
     * Acquires in exclusive interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

}

其他参考http://www.iteye.com/topic/623398

思考:unsafe.compareAndSwapInt(this, stateOffset, expect, update);这个方法是线程安全的,有硬件方面的支持

分享到:
评论

相关推荐

    Java并发之ReentrantLock类源码解析

    Java并发之ReentrantLock类源码解析 ReentrantLock是Java并发包中的一种同步工具,它可以实现可重入锁的功能。ReentrantLock类的源码分析对理解Java并发机制非常重要。本文将对ReentrantLock类的源码进行详细分析,...

    java ReentrantLock详解.docx

    `ReentrantLock`是Java并发编程中的一种高级锁机制,它是`java.util.concurrent.locks`包中的类,提供了比`synchronized`关键字更丰富的功能和更细粒度的控制。相较于`synchronized`,`ReentrantLock`的主要优势在于...

    Java中ReentrantLock的使用.docx

    Java中的ReentrantLock是线程安全编程中的一种高级锁机制,它属于Lock接口的一个实现,提供了比synchronized更丰富的功能和更高的灵活性。ReentrantLock的名字来源于它的可重入性,这意味着一个线程可以多次获取同一...

    ReentrantLock源码的使用问题详解.docx

    《ReentrantLock源码详解与应用》 ReentrantLock,可重入锁,是Java并发编程中一个重要的锁实现,它提供了比synchronized更高级别的控制能力,包括公平性和非公平性选择。本文将深入探讨ReentrantLock的原理,特别...

    Lock、Synchoronized和ReentrantLock的使用

    Lock、Synchronized 和 ReentrantLock 的使用 Lock、Synchronized 和 ReentrantLock 是 Java 中三种常用的同步机制,每种机制都有其特点和使用场景。下面对这三种机制进行详细的分析和比较。 一、Synchronized ...

    Java多线程之ReentrantLock与Condition - 平凡希 - 博客园1

    Java中的`ReentrantLock`是Java并发包`java.util.concurrent.locks`中的一个高级锁机制,它是可重入的互斥锁,具有与`synchronized`关键字相似的同步性,但提供了更多的灵活性和控制功能。本篇文章将深入探讨`...

    ReentrantLock解析

    《ReentrantLock深度解析》 在Java并发编程中,ReentrantLock是JDK提供的一个可重入互斥锁,它是java.util.concurrent.locks包下的核心类。与synchronized关键字相比,ReentrantLock提供了更高的灵活性,如尝试加锁...

    ReentrantLock与synchronized

    在Java多线程编程中,`ReentrantLock`和`synchronized`都是用于实现线程同步的重要工具,确保在并发环境中数据的一致性和正确性。两者虽然都能实现互斥访问,但在功能、性能以及使用场景上有所不同。下面我们将深入...

    ReentrantLock与synchronized区别

    java语言 并发编程 ReentrantLock与synchronized区别 详解

    ReentrantLock源码分析

    ### ReentrantLock源码分析 #### 一、ReentrantLock简介 ReentrantLock是一个基于`AbstractQueuedSynchronizer`(AQS)实现的高级锁工具类。与传统的synchronized关键字相比,ReentrantLock提供了更多控制手段,比如...

    ReentrantLock的使用及注意事项

    ReentrantLock的使用及注意事项

    使用ReentrantLock和Lambda表达式让同步更

    在高级并发编程中,`ReentrantLock`是一个强大的工具,相较于内置的`synchronized`关键字,它提供了更多的灵活性和控制。本篇文章将深入探讨`ReentrantLock`的使用以及如何结合Lambda表达式来优化同步代码。 `...

    ReentrantLock 实现原理 1

    ReentrantLock 实现原理详解 ReentrantLock 是 Java 中的一个同步工具类,它实现了 Lock 接口,提供了锁的获取和释放机制。ReentrantLock 的实现原理基于 AQS(AbstractQueuedSynchronizer),是一个重入锁,允许一...

    一张图将整个ReentrantLock流程看懂

    一张图将整个ReentrantLock流程看懂,干货满满 一张图将整个ReentrantLock流程看懂,干货满满 一张图将整个ReentrantLock流程看懂,干货满满 一张图将整个ReentrantLock流程看懂,干货满满 一张图将整个...

    ReentrantLock代码剖析之ReentrantLock_lock

    `ReentrantLock`是Java并发编程中非常重要的一种锁,它提供了比`synchronized`关键字更细粒度的控制,并且在高竞争条件下的性能更优。在本文中,我们将深入分析`ReentrantLock`的`lock()`方法,理解其内部机制,包括...

    locks框架_ReentrantLock.pdf

    ReentrantLock作为Lock接口的一个实现,是这个框架中的核心组件,尤其值得关注。它的主要特点是可重入性,这意味着一个线程可以多次获取同一锁,从而避免了死锁的风险。 ReentrantLock内部依赖于...

    简单聊聊Synchronized和ReentrantLock锁.docx

    本文将深入探讨Synchronized关键字锁和ReentrantLock锁的异同、功能特性以及它们在实际应用中的适用场景。 首先,Synchronized是一种内置的Java关键字,它提供了简单而强大的线程同步机制。当一个线程进入一个由...

    ReentrantLock 与 synchronized 简介

    ### ReentrantLock 与 synchronized 的比较 #### 一、引言 在Java中,多线程和并发控制一直是程序员关注的重点。随着Java的发展,其语言本身及标准库提供了丰富的工具来帮助开发者处理并发问题。其中,`...

    Java多线程ReentrantLock1

    Java的ReentrantLock是Java并发编程中非常重要的一个同步机制,它是JDK 5.0引入的并发包`java.util.concurrent.locks`中的一个高级锁,提供了比synchronized更细粒度的控制,同时具备更高的灵活性和性能。...

    Java 多线程与并发(11-26)-JUC锁- ReentrantLock详解.pdf

    Java中的ReentrantLock是Java并发包(java.util.concurrent.locks)中的一个高级锁,它是可重入的,意味着一个线程可以多次获取同一锁。在深入ReentrantLock之前,我们首先需要了解Java并发编程的基础,特别是Java...

Global site tag (gtag.js) - Google Analytics