`
ykdsg
  • 浏览: 17164 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

java concurrent (1): 锁机制

 
阅读更多

JDK5之前多线程的锁都是使用synchronized ,JDK 5中的锁是接口java.util.concurrent.locks.Lock。另外java.util.concurrent.locks.ReadWriteLock提供了一对可供读写并发的锁。ReentrantLock是java.util.concurrent.locks中的一个可重入锁类。在高竞争条件下有更好的性能,且可以中断。深入剖析ReentrantLock的源码有助于我们了解线程调度,锁实现,中断,信号触发等底层机制,实现更好的并发程序。

先来看ReentrantLock最常用的代码lock

 public void lock() {
        sync.lock();
    }

代码很简单,直接调用成员变量sync的lock方法

 /** Synchronizer providing all implementation mechanics */
    private final Sync sync;

Sync 是ReentrantLock的抽象静态内部类继承了AbstractQueuedSynchronizer ,后面会看到很多操作其实都是通过AbstractQueuedSynchronizer 来实现的,AbstractQueuedSynchronizer 是一个很重要的类型,concurrent 包里很多实现都依赖他。完整的设计思想可以参考http://gee.cs.oswego.edu/dl/papers/aqs.pdf 。FairSync和NonFairSync则是具体的子类,分别对应了公平锁和非公平锁。其实这两个都差不多,了解其中一个去看另一个其实差不多。实际中公平锁吞吐量比非公平锁小很多,所以以下分析以非公平锁为例。 在说具体的实现前不得不说AbstractQueuedSynchronizer,最重要的两个数据成员当前锁状态和等待链表都是由它来实现的。

/**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;

    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;

    /**
     * The synchronization state.
     */
    private volatile int state;

state记录了当前锁被锁定的次数。如果为0则未被锁定。加锁通过更改状态实现,而更改状态主要由函数compareAndSetState实现,调用cas原语以保证操作的原子性。Node是静态内部类,重要的字段如下

/**
     * 节点的等待状态,一个节点可能位于以下几种状态:
     * CANCELLED = 1: 节点操作因为超时或者对应的线程被interrupt。节点不应该不留在此状态,一旦达到此状态将从CHL队列中踢出。
     * SIGNAL = -1: 节点的继任节点是(或者将要成为)BLOCKED状态(例如通过LockSupport.park()操作),因此一个节点一旦被释放(解锁)或者取消就需要唤醒(LockSupport.unpack())它的继任节点。
     * CONDITION = -2:表明节点对应的线程因为不满足一个条件(Condition)而被阻塞。
     * 0: 正常状态,新生的非CONDITION节点都是此状态。
     * 非负值标识节点不需要被通知(唤醒)。
     */
    volatile int waitStatus;
	/**
     * 此节点的前一个节点。节点的waitStatus依赖于前一个节点的状态。
     */
    volatile Node prev;

    /**
     * 此节点的后一个节点。后一个节点是否被唤醒(uppark())依赖于当前节点是否被释放。
     */
    volatile Node next;

    /**
     * 节点绑定的线程。
     */
    volatile Thread thread;

    /**
     * 下一个等待条件(Condition)的节点,由于Condition是独占模式,
     * 因此这里有一个简单的队列来描述Condition上的线程节点。
     */
    Node nextWaiter;

而另一个重要的属性则在AbstractQueuedSynchronizer 的父类AbstractOwnableSynchronizer中。

/**
     * The current owner of exclusive mode synchronization.
     */
    private transient Thread exclusiveOwnerThread;

了解这些基础之后,来看NonFairSync 的 lock函数

/**
	 * Performs lock.  Try immediate barge, backing up to normal
	 * acquire on failure.
	 */
	final void lock() {
		//如果锁没有被任何线程锁定,则用cas方式进行抢占
		if (compareAndSetState(0, 1))
			//如果获取锁成功则设定当前线程为锁的拥有者 
			setExclusiveOwnerThread(Thread.currentThread());
		else
			//如果锁已经被占用,则尝试加锁,
			acquire(1);
	}

这里说下acquir方法,这个方法由AbstractQueuedSynchronizer 提供

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
1.如果tryAcquire(arg)成功,那就没有问题,已经拿到锁,整个lock()过程就结束了。如果失败进行操作2。

2.addWaiter创建一个独占节点(Node)并且此节点加入CHL队列末尾(稍后分析)。进行操作3。

3.acquireQueued自旋尝试获取锁,失败根据前一个节点来决定是否挂起(park()),直到成功获取到锁。进行操作4。

4.selfInterrupt如果当前线程已经中断过,那么就中断当前线程(清除中断位)。

这里有点复杂,接下来会一步一步分析。

tryAcquire 被NonFairSync override,直接调用 Sync.nonfairTryAcquire,代码如下

 /**
         * Performs non-fair tryLock.  tryAcquire is
         * implemented in subclasses, but both need nonfair
         * try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
	    //如果锁空闲则尝试锁定
            if (c == 0) {
		//获取成功则设当前线程为锁拥有者  
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
	    //若当前线程为锁拥有者则直接修改锁状态计数  
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
有点似曾相识的感觉,跟NonFairSync 的 lock函数有点类似。如果该方法失败返回false,也就是tryAcquire失败,进行acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 。先来看addWaiter函数

/**
     * Creates and enqueues node for given thread and mode.
     *
     * @param current the thread
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
将当前线程封装成一个node然后放入AbstractQueuedSynchronizer的node队列。这里考虑了队列为空和多线程并发的情况,所以处理的比较纠结,不过代码倒是不复杂,耐心看可以理解。上面是节点如队列的一部分。当前仅当队列不为空并且将新节点插入尾部成功后直接返回新节点。否则进入enq(Node)进行操作。

/**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
	    //如果为空就创建头结点
            if (t == null) { // Must initialize
                Node h = new Node(); // Dummy header
                h.next = node;
                node.prev = h;
                if (compareAndSetHead(h)) {
                    tail = node;
                    return h;
                }
            }
	    //如果这个时候因为并发,队列已经非空,那就把当前的node放入队尾
            else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }


接着就是调用acquireQueued 方法,让线程进入禁用状态,并在每次被唤醒时尝试获取锁,失败则继续禁用线程。

/**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
		// 如果当前node是head的直接后继则尝试获取锁  
		// 这里不会和等待队列中其它线程发生竞争,但会和尝试获取锁且尚未进入等待队列的线程发生竞争。这是非公平锁和公平锁的一个重要区别。  
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
		// 如果不是head的后继或获取锁失败,则检查是否要禁用当前线程  
                // 是则禁用,直到被lock.release唤醒或线程中断  
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
    }
shouldParkAfterFailedAcquire做了一件很重要的事:根据状态对等待队列进行清理,并设置等待信号。这里需要先说明一下waitStatus,它是AbstractQueuedSynchronizer的静态内部类Node的成员变量,用于记录Node对应的线程等待状态.等待状态在刚进入队列时都是0,如果等待被取消则被设为Node.CANCELLED,若线程释放锁时需要唤醒等待队列里的其它线程则被置为Node.SIGNAL,还有一种状态Node.CONDITION这里先不讨论。

 /**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
		*如果前一个节点的等待状态waitStatus 被设置为SIGNAL,也就是前面的节点还没有获得到锁,
		*那么返回true,表示当前节点(线程)就应该park()了
             */
            return true;
        if (ws > 0) {
            /*
             * 如果前一个节点的等待状态waitStatus>0,也就是前一个节点被CANCELLED了,
	     *那么就将前一个节点去掉,递归此操作直到所有前一个节点的waitStatus<=0。
             */
			do {
				node.prev = pred = pred.prev;
			} while (pred.waitStatus > 0);
			pred.next = node;
        } else { //等于0的时候
			//前一个节点等待状态waitStatus=0,修改前一个节点状态位为SINGAL,
			//表示后面有节点等待处理,需要根据它的等待状态来决定是否该park()
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        } 
		/* 
		 *这里一定要返回false,有可能前置结点这时已经释放了锁,
		 *但因其 waitStatus在释放锁时还未被置为SIGNAL而未触发唤醒等待线程操作,
		 *因此必须通过return false来重新尝试一次获取锁  
		*/
		return false;
    }
如果shouldParkAfterFailedAcquire 返回true那么会调用parkAndCheckInterrupt 。实现如下,很简单,直接禁用线程,并等待被唤醒或中断发生。对java中Thread.interrupted()都作了什么不甚了解的要做功课。这里线程即被堵塞,醒来时会重试获取锁,失败则继续堵塞。即使Thread.interrupted()也无法中断。那些想在等待时间过长时中断退出的线程可以调用ReentrantLoc.lockInterruptibly()。

/**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

分享到:
评论

相关推荐

    java concurrent 精简源码

    Java中的锁机制包括内置锁(synchronized)和显式锁(Lock),如`ReentrantLock`。内置锁简单易用,而显式锁提供了更细粒度的控制,支持公平性、可中断和尝试加锁等特性。 8. **并发模式** 并发编程中有多种设计...

    Java并发编程:设计原则与模式(Concurrent.Programming.in.Java)(中英版)

    例如,使用无锁数据结构或原子操作(`java.util.concurrent.atomic`包)。 3. **避免死锁、活锁和饥饿**:理解并预防这些并发问题至关重要。死锁发生在两个或多个线程相互等待对方释放资源导致僵局;活锁是线程不断...

    Concurrent Programming in Java™: Design Principles and Patterns 2nd

    5. **锁工具的使用**:探索Java中提供的各种锁工具类,如`java.util.concurrent.locks`包下的类。 #### 六、状态依赖性 这部分内容可能进一步探讨了如何根据对象的状态来控制并发行为,以及如何处理状态变化引起的...

    Java锁机制详解.pdf

    Java锁机制是Java多线程编程中的核心概念之一,其主要目的是确保在多线程环境下,多个线程能够安全地访问共享资源,避免数据不一致的问题。Java锁机制的发展历经了多个版本的改进,尤其是Java 5.0引入的显示锁...

    Java Concurrent Programming

    Java并发工具包`java.util.concurrent.locks`提供了更高级别的锁工具,如`ReentrantLock`、`ReadWriteLock`等,这些工具提供了更灵活的锁管理方式,有助于提高程序的并发性和可维护性。 #### 七、并发处理实践 ...

    Java并发编程:设计原则与模式(第二版)-3

    2. **并发控制**:Java提供了多种并发控制机制,如synchronized关键字、volatile变量、java.util.concurrent包下的锁和同步工具类(如ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier)。这些机制用于解决...

    java concurrent 包 详细解析

    10. **Lock锁**:除了传统的`synchronized`关键字,Java并发包还提供了`ReentrantLock`(可重入锁)和`ReadWriteLock`(读写锁),提供了更细粒度的锁控制和更高的并发性。 理解和掌握这些概念及工具,对于编写高效...

    探索Java并发的基石:同步机制的全面解析

    2. **ReentrantLock类**:位于`java.util.concurrent.locks`包中,提供了一个更高级别的锁机制,相比于`synchronized`关键字,它提供了更多的灵活性,比如公平锁和非公平锁的选择、可中断的等待等。 ```java ...

    java并发编程:设计原则与模式.rar

    Java并发工具类库(java.util.concurrent)是并发编程中的另一个重要主题,包括Atomic类(提供原子操作)、Semaphore(信号量)、CountDownLatch(计数器门锁)、CyclicBarrier(循环栅栏)和Exchanger(交换器)等...

    面向Java锁机制的字节码自动重构框架.zip

    Java锁机制是多线程编程中的关键组成部分,用于控制对共享资源的访问,确保并发环境下的数据一致性。本文将深入探讨Java锁机制,并基于提供的"面向Java锁机制的字节码自动重构框架"来讨论其背后的原理和应用。 在...

    javaconcurrent源码-java_concurrent:javaconcurrent包源代码学习,及相关实践示例

    本资源——`java_concurrent` 源码,提供了对Java并发包的深入学习材料以及实践示例,旨在帮助开发者深入理解并发编程背后的机制。 在`java.util.concurrent`包中,有几个重要的类和接口,它们构成了Java并发编程的...

    Concurrent Programming in Java

    Java的并发库提供了一系列工具和API,如`java.util.concurrent`包,帮助开发者有效地管理并发任务。本书主要涵盖以下几个方面: 1. **线程基础**:书中首先介绍了线程的基本概念,包括如何创建和管理线程,以及线程...

    atlassian-util-concurrent-0.0.12.jar.zip

    - **锁和同步机制**:库中包含了各种锁和同步工具,如读写锁、信号量、条件变量等,这些工具可以帮助开发者实现更复杂的并发控制逻辑,避免数据竞争和死锁。 - **并发容器**:扩展了Java的并发集合,比如线程安全...

    concurrent programming in java(doug lea)

    锁机制是保证并发操作正确性的关键技术,Doug Lea在书中对Java中的各种锁(如synchronized关键字、ReentrantLock等)进行了详细的介绍。状态依赖是指多个线程操作共享状态时需要考虑的问题,包含关系和分割问题则...

    使用Java并发编程Concurrent Programming Using Java

    - **同步机制**:为了防止多个线程同时访问共享资源导致的数据不一致问题,Java提供了多种同步机制,如`synchronized`关键字、`ReentrantLock`等。 - **不可变对象**:使用不可变对象可以简化多线程编程,因为不可变...

    java-concurrent:Java并发

    `java.util.concurrent.atomic`包包含了一系列原子类,如`AtomicInteger`、`AtomicLong`和`AtomicReference`等,它们提供了原子操作,可以在不使用锁的情况下保证线程安全。 7. **并发工具类** `java.util....

    java-concurrent:java中的并发

    在Java中,`java.util.concurrent.locks.ReentrantLock`是实现锁的一种高级机制,它是线程安全的,并且具有与`synchronized`关键字相似的功能,但提供了更灵活的使用方式。可重入锁的名字"Reentrant"来源于它支持...

    java锁机制详解.pdf

    Java锁机制是多线程编程中的核心概念,用于控制对共享资源的并发访问,防止数据的不一致性。Java提供了多种锁机制,包括内置锁(也称为监视器锁)和显式锁。本文将详细解析Java锁机制及其应用。 1. **内置锁(监视...

Global site tag (gtag.js) - Google Analytics