JDK5之前多线程的锁都是使用synchronized ,JDK 5中的锁是接口java.util.concurrent.locks.Lock。另外java.util.concurrent.locks.ReadWriteLock提供了一对可供读写并发的锁。ReentrantLock是java.util.concurrent.locks中的一个可重入锁类。在高竞争条件下有更好的性能,且可以中断。深入剖析ReentrantLock的源码有助于我们了解线程调度,锁实现,中断,信号触发等底层机制,实现更好的并发程序。
先来看ReentrantLock最常用的代码lock
public void lock() {
sync.lock();
}
代码很简单,直接调用成员变量sync的lock方法
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
Sync 是ReentrantLock的抽象静态内部类继承了AbstractQueuedSynchronizer ,后面会看到很多操作其实都是通过AbstractQueuedSynchronizer 来实现的,AbstractQueuedSynchronizer 是一个很重要的类型,concurrent 包里很多实现都依赖他。完整的设计思想可以参考http://gee.cs.oswego.edu/dl/papers/aqs.pdf
。FairSync和NonFairSync则是具体的子类,分别对应了公平锁和非公平锁。其实这两个都差不多,了解其中一个去看另一个其实差不多。实际中公平锁吞吐量比非公平锁小很多,所以以下分析以非公平锁为例。 在说具体的实现前不得不说AbstractQueuedSynchronizer,最重要的两个数据成员当前锁状态和等待链表都是由它来实现的。
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
/**
* The synchronization state.
*/
private volatile int state;
state记录了当前锁被锁定的次数。如果为0则未被锁定。加锁通过更改状态实现,而更改状态主要由函数compareAndSetState实现,调用cas原语以保证操作的原子性。Node是静态内部类,重要的字段如下
/**
* 节点的等待状态,一个节点可能位于以下几种状态:
* CANCELLED = 1: 节点操作因为超时或者对应的线程被interrupt。节点不应该不留在此状态,一旦达到此状态将从CHL队列中踢出。
* SIGNAL = -1: 节点的继任节点是(或者将要成为)BLOCKED状态(例如通过LockSupport.park()操作),因此一个节点一旦被释放(解锁)或者取消就需要唤醒(LockSupport.unpack())它的继任节点。
* CONDITION = -2:表明节点对应的线程因为不满足一个条件(Condition)而被阻塞。
* 0: 正常状态,新生的非CONDITION节点都是此状态。
* 非负值标识节点不需要被通知(唤醒)。
*/
volatile int waitStatus;
/**
* 此节点的前一个节点。节点的waitStatus依赖于前一个节点的状态。
*/
volatile Node prev;
/**
* 此节点的后一个节点。后一个节点是否被唤醒(uppark())依赖于当前节点是否被释放。
*/
volatile Node next;
/**
* 节点绑定的线程。
*/
volatile Thread thread;
/**
* 下一个等待条件(Condition)的节点,由于Condition是独占模式,
* 因此这里有一个简单的队列来描述Condition上的线程节点。
*/
Node nextWaiter;
而另一个重要的属性则在AbstractQueuedSynchronizer 的父类AbstractOwnableSynchronizer中。
/**
* The current owner of exclusive mode synchronization.
*/
private transient Thread exclusiveOwnerThread;
了解这些基础之后,来看NonFairSync 的 lock函数
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
//如果锁没有被任何线程锁定,则用cas方式进行抢占
if (compareAndSetState(0, 1))
//如果获取锁成功则设定当前线程为锁的拥有者
setExclusiveOwnerThread(Thread.currentThread());
else
//如果锁已经被占用,则尝试加锁,
acquire(1);
}
这里说下acquir方法,这个方法由AbstractQueuedSynchronizer 提供
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
1.如果tryAcquire(arg)成功,那就没有问题,已经拿到锁,整个lock()过程就结束了。如果失败进行操作2。
2.addWaiter创建一个独占节点(Node)并且此节点加入CHL队列末尾(稍后分析)。进行操作3。
3.acquireQueued自旋尝试获取锁,失败根据前一个节点来决定是否挂起(park()),直到成功获取到锁。进行操作4。
4.selfInterrupt如果当前线程已经中断过,那么就中断当前线程(清除中断位)。
这里有点复杂,接下来会一步一步分析。
tryAcquire 被NonFairSync override,直接调用 Sync.nonfairTryAcquire,代码如下
/**
* Performs non-fair tryLock. tryAcquire is
* implemented in subclasses, but both need nonfair
* try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果锁空闲则尝试锁定
if (c == 0) {
//获取成功则设当前线程为锁拥有者
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//若当前线程为锁拥有者则直接修改锁状态计数
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
有点似曾相识的感觉,跟NonFairSync
的 lock函数有点类似。如果该方法失败返回false,也就是tryAcquire失败,进行acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 。先来看addWaiter函数
/**
* Creates and enqueues node for given thread and mode.
*
* @param current the thread
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
将当前线程封装成一个node然后放入AbstractQueuedSynchronizer的node队列。这里考虑了队列为空和多线程并发的情况,所以处理的比较纠结,不过代码倒是不复杂,耐心看可以理解。
上面是节点如队列的一部分。当前仅当队列不为空并且将新节点插入尾部成功后直接返回新节点。否则进入enq(Node)进行操作。
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//如果为空就创建头结点
if (t == null) { // Must initialize
Node h = new Node(); // Dummy header
h.next = node;
node.prev = h;
if (compareAndSetHead(h)) {
tail = node;
return h;
}
}
//如果这个时候因为并发,队列已经非空,那就把当前的node放入队尾
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
接着就是调用
acquireQueued 方法,让线程进入禁用状态,并在每次被唤醒时尝试获取锁,失败则继续禁用线程。
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 如果当前node是head的直接后继则尝试获取锁
// 这里不会和等待队列中其它线程发生竞争,但会和尝试获取锁且尚未进入等待队列的线程发生竞争。这是非公平锁和公平锁的一个重要区别。
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
// 如果不是head的后继或获取锁失败,则检查是否要禁用当前线程
// 是则禁用,直到被lock.release唤醒或线程中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
}
shouldParkAfterFailedAcquire做了一件很重要的事:根据状态对等待队列进行清理,并设置等待信号。这里需要先说明一下waitStatus,它是AbstractQueuedSynchronizer的静态内部类Node的成员变量,用于记录Node对应的线程等待状态.等待状态在刚进入队列时都是0,如果等待被取消则被设为Node.CANCELLED,若线程释放锁时需要唤醒等待队列里的其它线程则被置为Node.SIGNAL,还有一种状态Node.CONDITION这里先不讨论。
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
*如果前一个节点的等待状态waitStatus 被设置为SIGNAL,也就是前面的节点还没有获得到锁,
*那么返回true,表示当前节点(线程)就应该park()了
*/
return true;
if (ws > 0) {
/*
* 如果前一个节点的等待状态waitStatus>0,也就是前一个节点被CANCELLED了,
*那么就将前一个节点去掉,递归此操作直到所有前一个节点的waitStatus<=0。
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { //等于0的时候
//前一个节点等待状态waitStatus=0,修改前一个节点状态位为SINGAL,
//表示后面有节点等待处理,需要根据它的等待状态来决定是否该park()
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
/*
*这里一定要返回false,有可能前置结点这时已经释放了锁,
*但因其 waitStatus在释放锁时还未被置为SIGNAL而未触发唤醒等待线程操作,
*因此必须通过return false来重新尝试一次获取锁
*/
return false;
}
如果shouldParkAfterFailedAcquire 返回true那么会调用
parkAndCheckInterrupt
。实现如下,很简单,直接禁用线程,并等待被唤醒或中断发生。对java中Thread.interrupted()都作了什么不甚了解的要做功课。这里线程即被堵塞,醒来时会重试获取锁,失败则继续堵塞。即使Thread.interrupted()也无法中断。那些想在等待时间过长时中断退出的线程可以调用ReentrantLoc.lockInterruptibly()。
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
分享到:
相关推荐
Java中的锁机制包括内置锁(synchronized)和显式锁(Lock),如`ReentrantLock`。内置锁简单易用,而显式锁提供了更细粒度的控制,支持公平性、可中断和尝试加锁等特性。 8. **并发模式** 并发编程中有多种设计...
例如,使用无锁数据结构或原子操作(`java.util.concurrent.atomic`包)。 3. **避免死锁、活锁和饥饿**:理解并预防这些并发问题至关重要。死锁发生在两个或多个线程相互等待对方释放资源导致僵局;活锁是线程不断...
5. **锁工具的使用**:探索Java中提供的各种锁工具类,如`java.util.concurrent.locks`包下的类。 #### 六、状态依赖性 这部分内容可能进一步探讨了如何根据对象的状态来控制并发行为,以及如何处理状态变化引起的...
Java锁机制是Java多线程编程中的核心概念之一,其主要目的是确保在多线程环境下,多个线程能够安全地访问共享资源,避免数据不一致的问题。Java锁机制的发展历经了多个版本的改进,尤其是Java 5.0引入的显示锁...
Java并发工具包`java.util.concurrent.locks`提供了更高级别的锁工具,如`ReentrantLock`、`ReadWriteLock`等,这些工具提供了更灵活的锁管理方式,有助于提高程序的并发性和可维护性。 #### 七、并发处理实践 ...
2. **并发控制**:Java提供了多种并发控制机制,如synchronized关键字、volatile变量、java.util.concurrent包下的锁和同步工具类(如ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier)。这些机制用于解决...
10. **Lock锁**:除了传统的`synchronized`关键字,Java并发包还提供了`ReentrantLock`(可重入锁)和`ReadWriteLock`(读写锁),提供了更细粒度的锁控制和更高的并发性。 理解和掌握这些概念及工具,对于编写高效...
2. **ReentrantLock类**:位于`java.util.concurrent.locks`包中,提供了一个更高级别的锁机制,相比于`synchronized`关键字,它提供了更多的灵活性,比如公平锁和非公平锁的选择、可中断的等待等。 ```java ...
Java并发工具类库(java.util.concurrent)是并发编程中的另一个重要主题,包括Atomic类(提供原子操作)、Semaphore(信号量)、CountDownLatch(计数器门锁)、CyclicBarrier(循环栅栏)和Exchanger(交换器)等...
Java锁机制是多线程编程中的关键组成部分,用于控制对共享资源的访问,确保并发环境下的数据一致性。本文将深入探讨Java锁机制,并基于提供的"面向Java锁机制的字节码自动重构框架"来讨论其背后的原理和应用。 在...
本资源——`java_concurrent` 源码,提供了对Java并发包的深入学习材料以及实践示例,旨在帮助开发者深入理解并发编程背后的机制。 在`java.util.concurrent`包中,有几个重要的类和接口,它们构成了Java并发编程的...
Java的并发库提供了一系列工具和API,如`java.util.concurrent`包,帮助开发者有效地管理并发任务。本书主要涵盖以下几个方面: 1. **线程基础**:书中首先介绍了线程的基本概念,包括如何创建和管理线程,以及线程...
- **锁和同步机制**:库中包含了各种锁和同步工具,如读写锁、信号量、条件变量等,这些工具可以帮助开发者实现更复杂的并发控制逻辑,避免数据竞争和死锁。 - **并发容器**:扩展了Java的并发集合,比如线程安全...
锁机制是保证并发操作正确性的关键技术,Doug Lea在书中对Java中的各种锁(如synchronized关键字、ReentrantLock等)进行了详细的介绍。状态依赖是指多个线程操作共享状态时需要考虑的问题,包含关系和分割问题则...
- **同步机制**:为了防止多个线程同时访问共享资源导致的数据不一致问题,Java提供了多种同步机制,如`synchronized`关键字、`ReentrantLock`等。 - **不可变对象**:使用不可变对象可以简化多线程编程,因为不可变...
`java.util.concurrent.atomic`包包含了一系列原子类,如`AtomicInteger`、`AtomicLong`和`AtomicReference`等,它们提供了原子操作,可以在不使用锁的情况下保证线程安全。 7. **并发工具类** `java.util....
在Java中,`java.util.concurrent.locks.ReentrantLock`是实现锁的一种高级机制,它是线程安全的,并且具有与`synchronized`关键字相似的功能,但提供了更灵活的使用方式。可重入锁的名字"Reentrant"来源于它支持...
Java锁机制是多线程编程中的核心概念,用于控制对共享资源的并发访问,防止数据的不一致性。Java提供了多种锁机制,包括内置锁(也称为监视器锁)和显式锁。本文将详细解析Java锁机制及其应用。 1. **内置锁(监视...