`
zhhphappy
  • 浏览: 121316 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

ReentrantLock AQS 源码阅读笔记

    博客分类:
  • java
 
阅读更多

ReentrantLock是JDK1.5引入的,它拥有与synchronized相同的并发性和内存语义,并提供了超出synchonized的其他高级功能(例如,中断锁等候、条件变量等),并且使用ReentrantLock比synchronized能获得更好的可伸缩性

AQS(AbstractQueuedSynchronizer)主要利用硬件原语指令(CAS compare-and-swap),来实现轻量级多线程同步机制,并且不会引起CPU上文切换和调度,同时提供内存可见性和原子化更新保证(线程安全的三要素:原子性、可见性、顺序性)。

AQS的本质上是一个同步器/阻塞锁的基础框架,其作用主要是提供加锁、释放锁,并在内部维护一个FIFO等待队列,用于存储由于锁竞争而阻塞的线程。

 

ReentrantLock的实现核心是基于AQS(AbstractQueuedSynchronizer)来实现的.

 

先看ReentrantLock的构造方法和核心的类:

public ReentrantLock() {
        sync = new NonfairSync();
    }
	
	public ReentrantLock(boolean fair) {
        sync = (fair)? new FairSync() : new NonfairSync();
    }
	
	private final Sync sync;
	
	static abstract class Sync extends AbstractQueuedSynchronizer {
		...
    }
	
	final static class NonfairSync extends Sync {
		...
	}
	
	final static class FairSync extends Sync {
		...
	}

 通过构造方法可以看出:ReentrantLock 中的 Sync 继承自 AbstractQueuedSynchronizer, 其还有两个子类 NonfairSync 和 FairSync, 从构造方法中看出这两个子类决定了ReentrantLock的公平性问题。

这里提到一个锁获取的公平性问题,如果在绝对时间上,先对锁进行获取的请求一定被先满足,那么这个锁是公平的,反之,是不公平的,也就是说等待时间最长的线程最有机会获取锁,也可以说锁的获取是有序的。

 

非公平

// ReentrantLock.NonfairSync
	final boolean nonfairTryAcquire(int acquires) {
		final Thread current = Thread.currentThread();
		int c = getState();
		if (c == 0) {
			if (compareAndSetState(0, acquires)) {
				setExclusiveOwnerThread(current);
				return true;
			}
		}
		else if (current == getExclusiveOwnerThread()) {
			int nextc = c + acquires;
			if (nextc < 0) // overflow
				throw new Error("Maximum lock count exceeded");
			setState(nextc);
			return true;
		}
		return false;
	}

 公平:比较非公平的获取,仅加入了当前线程(Node)之前是否有前置节点在等待的判断hasQueuedPredecessors()

// ReentrantLock.FairSync
	protected final boolean tryAcquire(int acquires) {
		final Thread current = Thread.currentThread();
		int c = getState();
		if (c == 0) {
			if (!hasQueuedPredecessors() &&
				compareAndSetState(0, acquires)) {
				setExclusiveOwnerThread(current);
				return true;
			}
		}
		else if (current == getExclusiveOwnerThread()) {
			int nextc = c + acquires;
			if (nextc < 0)
				throw new Error("Maximum lock count exceeded");
			setState(nextc);
			return true;
		}
		return false;
	}

 获取锁操作(先以非公平模式来看)

// ReentrantLock
	public void lock() {
		sync.lock();
	}

	// ReentrantLock.NonfairSync
	final void lock() {
		if (compareAndSetState(0, 1))
			setExclusiveOwnerThread(Thread.currentThread());
		else
			acquire(1);
	}
	
	// AbstractOwnableSynchronizer
	protected final void setExclusiveOwnerThread(Thread t) {
        exclusiveOwnerThread = t;
    }
	
	// AbstractQueuedSynchronizer
	public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
	
	// ReentrantLock.NonfairSync
	protected final boolean tryAcquire(int acquires) {
		return nonfairTryAcquire(acquires);
	}

 通过原子的比较并设置操作,如果成功设置,说明锁是空闲的,当前线程获得锁,并把当前线程设置为锁拥有者;否则,调用acquire方法。

如果尝试以独占的方式获得锁失败,那么就把当前线程封装为一个Node,加入到等待队列中;如果加入队列成功,接下来检查当前线程的节点是否应该等待(挂起),如果当前线程所处节点的前一节点的等待状态小于0,则通过LockSupport挂起当前线程;无论线程是否被挂起,或者挂起后被激活,都应该返回当前线程的中断状态,如果处于中断状态,需要中断当前线程。

最后tryAcquire调用到最开始的那段非公平锁的代码中,处理逻辑如下:

1.如果锁状态空闲(state=0),且通过原子的比较并设置操作,那么当前线程获得锁,并把当前线程设置为锁拥有者;

2.如果锁状态空闲,且原子的比较并设置操作失败,那么返回false,说明尝试获得锁失败;

3.否则,检查当前线程与锁拥有者线程是否相等(表示一个线程已经获得该锁,再次要求该锁,这种情况叫可重入锁),如果相等,维护锁状态,并返回true;

4.如果不是以上情况,说明锁已经被其他的线程持有,直接返回false;

 

再看addWaiter方法

// AbstractQueuedSynchronizer
	private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

 如果tail节点不为null,说明队列不为空,则把新节点加入到tail的后面,返回当前节点,否则进入enq进行处理;

// AbstractQueuedSynchronizer
	private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

 如果tail节点为null,说明队列为空,需要建立一个虚拟的头节点,并把封装了当前线程的节点设置为尾节点;另外一种情况的发生,是由于在(1)中的compareAndSetTail可能会出现失败,这里采用for的无限循环,是要保证当前线程能够正确进入等待队列;

	// AbstractQueuedSynchronizer
 	final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

 如果当前节点是队列的头结点(如果第一个节点是虚拟节点,那么第二个节点实际上就是头结点了),就尝试在此获取锁tryAcquire(arg)。如果成功就将头结点设置为当前节点(不管第一个结点是否是虚拟节点),返回中断状态。

检测当前节点是否应该park,如果应该park就挂起当前线程并且返回当前线程中断状态。

        /** waitStatus value to indicate thread has cancelled */
	static final int CANCELLED =  1;
	/** waitStatus value to indicate successor's thread needs unparking */
	static final int SIGNAL    = -1;
	/** waitStatus value to indicate thread is waiting on condition */
	static final int CONDITION = -2;
	/**
	 * waitStatus value to indicate the next acquireShared should
	 * unconditionally propagate
	 */
	static final int PROPAGATE = -3;
	
	private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
	
	private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

 1.如果前一个节点的等待状态waitStatus<0,也就是前面的节点还没有获得到锁,那么返回true,表示当前节点(线程)就应该park()了。否则进行(2)。 

2.如果前一个节点的等待状态waitStatus>0,也就是前一个节点被CANCELLED了,那么就将前一个节点去掉,递归此操作直到所有前一个节点的waitStatus<=0,进行(4)。否则进行(3)。 

3.前一个节点等待状态waitStatus=0,修改前一个节点状态位为SINGAL,表示后面有节点等待你处理,需要根据它的等待状态来决定是否该park()。进行(4)。 

4.返回false,表示线程不应该park()。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics