`
bruce008
  • 浏览: 173082 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

java.util.concurrent 包下的 Synchronizer 框架

阅读更多

看完书 java concurrency in practice 当然是想找点啥好玩的东东玩玩。 当看到了Doug Lee 的论文 << The java.util.concurrent Synchronizer Framework >> 大呼来的太晚喔, 前段时间看那个ReentrantLock 的代码真的是痛苦啊,不过现在也不晚不是。  呵呵, 上菜:这个框架的核心是一个AbstractQueuedSynchronizer 类 (下面简称AQS)  它基本上的思路是:

 

  •     采用Template Method Pattern.  它实现了non-contended 的synchronization 算法;
  •     继承 它的Subclass  一般不直接作为Synchronzier, 而是作为私有的实现 被用来delegate.  比如 他举了个例子:

     class Mutex implements Lock, java.io.Serializable {

   // Our internal helper class
   private static class Sync extends AbstractQueuedSynchronizer {
     .....
   }

   // The sync object does all the hard work. We just forward to it.
   private final Sync sync = new Sync();

   public void lock()                { sync.acquire(1); }
   public boolean tryLock()          { return sync.tryAcquire(1); }
   public void unlock()              { sync.release(1); }
   public Condition newCondition()   { return sync.newCondition(); }
   public boolean isLocked()         { return sync.isHeldExclusively(); }
   public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
   public void lockInterruptibly() throws InterruptedException {
     sync.acquireInterruptibly(1);
   }
   public boolean tryLock(long timeout, TimeUnit unit)
       throws InterruptedException {
     return sync.tryAcquireNanos(1, unit.toNanos(timeout));
   }
 }

 

在这个 AQS 类中 提供了 两大类方法 

      acquire   其中还包括  非阻塞的 tryAcquire;  带 time out 的;可以通过interruption 来cancellality 的。

 

      release   相对简单点, 因为在调用release 方法的时候基本上暗含了个意思当前的线程是获得了锁的。  

 

在JUC 包中没有为各种Synchronizer 类提供统一的API 比如 Lock.lock, Semaphore.acquire, CountDownLatch.await  and  FutureTask.get 都是映射到这个AQS 的 acquire 方法。 

 

要实现一个Synchronizer 的基本思路非常直接的 

  acquire 操作

    采用CAS 操作去更新同步状态  如果不成功  {

enqueue 当前线程

        block 当前线程

}

   dequeue  当前线程  if it was queued      // 此时应该是当前线程被别的线程 release 来唤醒的。

 

 

release 操作 :

    更新同步状态, 此时可以直接 set 而不通过 CAS。

    假如 等待队列中还有线程  unblock  one or more

 

 

要支持这些操作 需要 三类基本的组件:

 1,   Atomically managing synchronization state。   这个通过  

private   volatile  int   state;

         在处理acquire 的时候基本是通过用CAS 实现的  compareAndSetState 来

 

 2,   Block /Unblock 线程 。 这个是通过 JUC 里面的一个封装类 LockSupport.park / unpark 来实现的。LockSupport 都deleget 到了 Unsafte 对应的方法

 

3, 维护 一个队列来存放 等待线程。 这个是通过一个 CLH queue 的变种。 它是一种linked queue 的通过 head 和tail 。 

 

三个里面最复杂的就是这个 CLH queue 的维护了, queue 中节点被定义为  :

   static final class Node {

    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;

    Node nextWaiter;
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
        }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
  }
 

 

AQS 中定义了 

 

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
 

这个方法通常在 acquire 的时候如果 tryAcquire 失败就会将insert 一个 Node 到 tail 之后, 并且 node.prev 指向先前的 tail。 当然这个是在 compareAndSetTail(pred, node)  返回true 的时候比较简单, 否则就进入 enq(node) 方法 差不多也是这个逻辑 insert 一个 Node 到 tail 之后。

 

 

在 1.6.0_25 版本里面AQS 就没有在 release 的时候 通过唤醒等待队列里面head指向的线程, 然后然后该线程是在 方法里面继续resume, 基本上会 

 

if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }。
 就是将head的下一个节点设为Head, 原来的head 呢 p.next = null,  p 不再引用任何对象, gc 就会发生作用。  这个基本上会的例外是啥呢, 新线程acquire 调用 tryAcquire 抢占成功 。 这个线程就只有继续park 了。 

基本上来讲 有了 这个 AQS  , 我们要想实现一个 Synchronizer 就比较简单了,  最简单的情况下  treAcquire , tryRelease 实现下。AQS 里面提供的需要继承的方法不是采用abstract 的方式 而是用抛出 notImplementedException的 方式, 如果像那些tryAcquireShared 之类的 在 exclusive 模式下根本就不会被调用到,  我们也就根本就不必override。  比如 先前我们所的那个 Mutex 我们只需要定义它的内部类 Sync

 

 

// Our internal helper class
   private static class Sync extends AbstractQueuedSynchronizer {
     // Report whether in locked state
     protected boolean isHeldExclusively() {
       return getState() == 1;
     }

     // Acquire the lock if state is zero
     public boolean tryAcquire(int acquires) {
       assert acquires == 1; // Otherwise unused
       if (compareAndSetState(0, 1)) {
         setExclusiveOwnerThread(Thread.currentThread());
         return true;
       }
       return false;
     }

     // Release the lock by setting state to zero
     protected boolean tryRelease(int releases) {
       assert releases == 1; // Otherwise unused
       if (getState() == 0) throw new IllegalMonitorStateException();
       setExclusiveOwnerThread(null);
       setState(0);
       return true;
     }

     // Provide a Condition
     Condition newCondition() { return new ConditionObject(); }

   
   }
 

 

这个框架是非常精巧的,  还有很多地方比如那个 ConditionObject 里面涉及到另外一个单独的condition queue。只有再慢慢的啃了。   

 

另外贴一张本人手工画的图,非常潦草多包涵了。  

 

 

AbstractQueuedSynchronizer

 

 

 

 

 

 

 

分享到:
评论

相关推荐

    The java.util.concurrent Synchronizer Framework

    ### Java.util.concurrent.Synchronizer框架详解 #### 一、引言与背景 随着Java技术的发展,多线程编程成为了一项重要的技术需求。为了更好地支持并发编程,Java平台在J2SE 1.5版本中引入了`java.util.concurrent`...

    The java. util. concurrent synchronizer framework.pdf

    AQS(AbstractQueuedSynchronizer)是Java.util.concurrent包中同步器的基础框架,它的核心设计思想与实现方法在Doug Lea先生的这篇论文中有详细的介绍。论文详细阐述了AQS框架的原理、设计、实现、应用以及性能等...

    The java.util.concurrent synchronizer framework.pdf

    文档标题“java.util.concurrent同步器框架”和描述“Doug Lea的java.util.concurrent同步器框架”表明本文将探讨由Doug Lea所撰写的关于Java并发编程中同步器框架的内容。文档中提到了AbstractQueuedSynchronizer类...

    java5 thread

    Java5新增了三个多线程相关的包,分别是`java.util.concurrent`、`java.util.concurrent.atomic`和`java.util.concurrent.locks`。这些包提供了大量高级的并发控制工具和数据结构,使得开发者可以更加高效和安全地...

    hibernatesynchronizer_3.1.9 for JDK1.4

    在JDK1.4的时代,多线程编程虽然已经得到广泛应用,但并发控制的机制相对较为原始,比如仅依赖于`synchronized`关键字和`java.util.concurrent`库中的有限工具。`hibernatesynchronizer`的出现,就是为了增强...

    Java并发安全控制(培训)

    1. **原子变量类**:`java.util.concurrent.atomic`包中的类提供了基本类型的线程安全版本,例如`AtomicInteger`。 2. **内部锁**:`synchronized`关键字可以用于同步代码块或方法,简化同步逻辑。 3. **显式锁**...

    Synchronizer-Threads

    2. **Lock接口**:Java.util.concurrent.locks包中的Lock接口提供了比synchronized更细粒度的锁控制。与synchronized不同,Lock需要显式地获取和释放锁。这使得Lock具有更高的灵活性,例如可重入性、公平性选择以及...

    Java并发编程实战

    14.6 java.util.concurrent同步器类中的 AQS257 14.6.1 ReentrantLock257 14.6.2 Semaphore与CountDownLatch258 14.6.3 FutureTask259 14.6.4 ReentrantReadWriteLock259 第15章 原子变量与非阻塞同步机制261 ...

    Java实习生面试复习(七):synchronized和ReentrantLock的学习

    相对而言,`ReentrantLock`是Java `java.util.concurrent.locks`包中提供的一个可重入锁,它是`Lock`接口的实现之一。`ReentrantLock`基于AQS(Abstract Queued Synchronizer,队列同步器)进行实现,支持公平锁和非...

Global site tag (gtag.js) - Google Analytics