`
manzhizhen
  • 浏览: 294146 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

信号量Semaphore初探

    博客分类:
  • Java
阅读更多

         信号量Semaphorejava.util.concurrent包下一个常用的同步工具类,它维护了一个许可集,可以理解成资源数,可以通过acquire操作来获取一个资源,并通过release来释放一个资源,但需要注意的是,release来释放资源前不一定要先通过acquire来获取一个资源,如果不断的release而不进行acquire将导致资源数虚增,所以一定得我们自己来保证使用的正确性。

         我们经常用信号量来管理可重复使用的资源,比如数据库连接、线程等,因为这些资源都有着可预估的上限,所以我们在初始化Semaphore时设定的许可数和我们需要管理的资源数一致,获取一个资源时就通过acquire来获取一个许可,如果没有可用资源,则acquire将阻塞,释放一个资源时,通过release来释放一个许可。用Semaphore来限制操作的并发访问程度也类似。

        但有一点需要注意,Semaphore没有直接提供更新许可总数的方法,虽然你可以单独通过acquirerelease来特意的减少或增加许可的总量,但这样做会让人感到奇怪。那么为什么Semaphore没有单独同重设信号量数量的方法呢?直接把AQSsetState方法暴露出来不就行的吗?因为setState操作如果发生在在某些使用该Semaphore的线程还没有走完整个信号量的获取和释放的流程时,将会直接导致state值的不准确。现在说可能让人不太理解,不用担心,看完本文,你就懂了。有人会想到当需要修改许可总数时,我再重新new一个Semaphore出来不就行了?比如像下面:

private volatile Semaphore jdbcConnection = new Semaphore(10);

public void resetJdbcConnection(int jdbcConnectionAmount) {

    jdbcConnection = new Semaphore(jdbcConnectionAmount);

}

这里通过提供resetJdbcConnection方法来让外部可以修改jdbcConnection的许可数目,注意,这里的jdbcConnection必须是volatile这样暴力修改是有风险的,因为你在修改时jdbcConnection很可能正在被使用,比如它进行了jdbcConnection.acquire()操作后,你把jdbcConnection给更换成另一个新的Semaphore,所以在你使用jdbcConnection.release()来释放一个许可时,是在新的Semaphore进行release操作,所以新的Semaphore的许可数量被莫名的+1了。有一种解决办法是用局部变量记录下操作acquire时的Semaphore,并在进行release时使用该局部变量来进行release,这样保证了acquirerelease是在同一个Semaphore上操作,这种方法简单有效,适合绝大多数场景。之所以说Semaphore没有直接提供更新许可总数的方法,是因为Semaphore确有一个永久减少许可总数的方法,即:

protected void reducePermits(int reduction) {

    if (reduction < 0) throw new IllegalArgumentException();

    sync.reducePermits(reduction);

}

很显然,它需要子类来继承使用,别妄想将reduction传入负数来使许可数增加,因为sync.reducePermits(reduction);中对reduction的值做了限制:

final void reducePermits(int reductions) {

    for (;;) {

        int current = getState();

        int next = current - reductions;

        if (next > current) // underflow

            throw new Error("Permit count underflow");

        if (compareAndSetState(current, next))

            return;

    }

}

有人会好奇,为啥Semaphore提供了减少许可数的入口,但未提供增加许可数的入口?这个我暂时还未找到原因。

 

    现在我们来分析Semaphore的内部实现,像绝大多数并发工具类一样,Semaphore也依赖于AQSAbstractQueuedSynchronizer),关于AQS的简单分析,可以参考我的另一篇博文:http://manzhizhen.iteye.com/blog/2305890。你会很自然的想到,初始化时许可的总数就是保存在AQSstate属性中的:

Sync(int permits) {

    setState(permits);

}

 

final int getPermits() {

    return getState();

}

SyncSemaphore中实现的AQS的内部类,我们现在来看下Sync中关于非公平获取信号量和释放信号量的两个方法的实现:

final int nonfairTryAcquireShared(int acquires) {

    for (;;) {

        int available = getState();

        int remaining = available - acquires;

        if (remaining < 0 ||

            compareAndSetState(available, remaining))

            return remaining;

    }

}

 

protected final boolean tryReleaseShared(int releases) {

    for (;;) {

        int current = getState();

        int next = current + releases;

        if (next < current) // overflow

            throw new Error("Maximum permit count exceeded");

        if (compareAndSetState(current, next))

            return true;

    }

}

nonfairTryAcquireShared方法中可以看出,当需要获取acquires数量的许可时,先看可用的许可够不够,如果不够(remaining < 0),则直接返回还差的许可数的负数值,如果够,则从可用许可数中减去acquires,并返回剩余可用的许可数。由于nonfairTryAcquireShared是非阻塞的,所以它直接在Semaphore中的tryAcquire非阻塞方法中使用:

public boolean tryAcquire() {

    return sync.nonfairTryAcquireShared(1) >= 0;

}

public boolean tryAcquire(int permits) {

    if (permits < 0) throw new IllegalArgumentException();

    return sync.nonfairTryAcquireShared(permits) >= 0;

}

 

可见,对于tryAcquire的非阻塞方法,Semaphore的公平模式和非公平模式下的实现都是一样的,它在能获取到足够许可时不需要进入队列而是直接拿到走人,不能获取足够许可时就直接返回,是非公平的!注意,对于tryAcquire的阻塞方法还是有公平和非公平之分的。tryReleaseShared是用来释放许可数,我们可以看内部实现也很简单,就是不断的进行CAS操作指导成功将releases数目的许可数加回到当前可用许可数中。前面已经说过,Seamphore无法去校验你获取许可和释放许可是否一一对应,因为获取和释放都是直接在AQSstate上操作的,所以操作一段时间后,连AQS自己都忘记最初的state值是啥了,所以当然无法在中途来校验获取和释放是否正确,即使知道state的初始值,也很难在交错的获取和释放许可的操作过程中做一致性检查。

        看过SemaphoreAPI文档都知道Semaphore中由公平(fair)和非公平(nonfair)两种模式,这两种模式在Semaphore创建时确定,中途不能修改,默认是非公平的。Semaphore的构造函数如下:

public Semaphore(int permits, boolean fair) {

    sync = fair ? new FairSync(permits) : new NonfairSync(permits);

}

从名字就可以看出,FairSync代表公平的Sync实现,而NonfairSync代表非公平的Sync实现,从名字也看出内部类FairSyncNonfairSync都继承自上面提到的抽象类Sync那么公平和非公平的含义到底是什么呢?看过我讲述AQS博文的朋友应该知道,我说过,只要线程由于获取资源数失败而进入队列中后,就一定得等前面的节点获取完锁才有机会尝试获取锁,也就是说在AQS队列中的线程绝对是公平的,因为队列本来就是先进先出,即先到先得。但如果你想获取的资源数现在就有,那么即使现在队列中有线程排队在等,你也可以不用进入队列而直接拿到你想要的资源,这就是非公平!所以,你很自然的想到,公平的做法就是不管此时有没有可用的许可,只要队列中有线程在等,你就得给我乖乖去排队,没错,公平就是这样的!我们先看非公平的NonfairSync实现:

static final class NonfairSync extends Sync {

    private static final long serialVersionUID = -2694183684443567898L;

 

    NonfairSync(int permits) {

        super(permits);

    }

 

    protected int tryAcquireShared(int acquires) {

        return nonfairTryAcquireShared(acquires);

    }

}

tryAcquireShared方法调用的nonfairTryAcquireShared是在父类Sync中实现的,前面已经给出了代码实现,这里就不在多说了。我们再看看公平的FairSync实现:

static final class FairSync extends Sync {

    private static final long serialVersionUID = 2014338818796000944L;

 

    FairSync(int permits) {

        super(permits);

    }

 

    protected int tryAcquireShared(int acquires) {

        for (;;) {

            if (hasQueuedPredecessors())

                return -1;

            int available = getState();

            int remaining = available - acquires;

            if (remaining < 0 ||

                compareAndSetState(available, remaining))

                return remaining;

        }

    }

}

FairSync的获取许可数方法tryAcquireSharednonfairTryAcquireShared比起来唯一的特别之处就是hasQueuedPredecessors方法的调用,而hasQueuedPredecessorsAQS中实现的:

public final boolean hasQueuedPredecessors() {

    // The correctness of this depends on head being initialized

    // before tail and on head.next being accurate if the current

    // thread is first in queue.

    Node t = tail; // Read fields in reverse initialization order

    Node h = head;

    Node s;

    return h != t &&

        ((s = h.next) == null || s.thread != Thread.currentThread());

}

从方法名和实现可以看出,hasQueuedPredecessors主要用来判断AQS队列中是否还有等待的线程节点,如果有,并且不是当前的线程节点,则返回true,所以,对于tryAcquireShared来说,此时都不进行资源获取的尝试就直接返回-1表明资源获取失败了。

 

现在许可数、公平和非公平都介绍完了,我们来看看获取许可数的相关方法。acquire将会在获取许可之前一直阻塞,或者被中断,我们看其内部实现:

public void acquire() throws InterruptedException {

    sync.acquireSharedInterruptibly(1);

}

 

public void acquire(int permits) throws InterruptedException {

    if (permits < 0) throw new IllegalArgumentException();

    sync.acquireSharedInterruptibly(permits);

}

 可以看出,它直接调用了AQS中的acquireSharedInterruptibly的实现,acquireSharedInterruptibly的实现在我的AQS博文中已经分析过,这里不再阐述。我们再来看看获取信号的限时阻塞方法tryAcquire的实现:

public boolean tryAcquire(long timeout, TimeUnit unit)

    throws InterruptedException {

    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));

}

public boolean tryAcquire(int permits, long timeout, TimeUnit unit)

    throws InterruptedException {

    if (permits < 0) throw new IllegalArgumentException();

    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));

}

可见,Semaphore也没对其有特别的实现,而是直接调用AQS中的tryAcquireSharedNanos方法,tryAcquireSharedNanos在我的AQS博文中已经做过简单的介绍,这里不再阐述。

 

       可以看出,只要能弄懂AQS,你就可以明白Semaphore的核心实现!

分享到:
评论

相关推荐

    使用信号量(Semaphore)实现线程的同步

    信号量(Semaphore)是操作系统中一种重要的同步机制,它用于管理多个线程对共享资源的访问,以防止并发执行时产生竞态条件。在多线程编程中,当多个线程试图同时访问一个有限的资源时,信号量可以帮助我们协调这些...

    信号量Semaphore了解过吗?

    信号量Semaphore,这是一个在多线程编程中至关重要的同步机制,尤其在操作系统设计和并发编程领域,它扮演着协调多个线程对共享资源访问的角色。面试中被问及但回答不出,可能会显示出对并发控制理解的不足,因此,...

    python线程信号量semaphore使用解析

    多线程同时运行,能提高程序的运行效率,但是并非线程越多越好,而semaphore信号量可以通过内置计数器来控制同时运行线程的数量,启动线程(消耗信号量)内置计数器会自动减一,线程结束(释放信号量)内置计数器会自动...

    Java 信号量Semaphore的实现

    Java 信号量Semaphore的实现 Java 信号量Semaphore是Java并发编程中的一种机制,用于控制多个线程的并发执行。Semaphore的实现主要是通过计数器来实现的,每个Semaphore对象都维护着一个计数器,用于记录当前可用的...

    信号量同步等待机制 semaphore wait-and-signal

    ### 信号量同步等待机制(Semaphore Wait-and-Signal) #### 一、引言 在多线程编程中,临界资源的访问管理是一项至关重要的任务。为了确保数据的一致性和程序的正确执行,必须采取有效的机制来防止多个线程同时...

    c++多线程同步——信号量

    信号量(Semaphore)是实现多线程同步的一种有效工具,常用于控制对共享资源的访问。在这个名为"Mthread11"的MFC工程中,我们可以看到如何在C++环境中应用信号量来解决多线程间的同步问题。 首先,我们需要理解什么...

    信号量(Semaphore)

    信号量(Semaphore)哲学家进餐问题(the dining philosophers problem)---------------------------程序

    信号量的应用

    信号量可以分类为整型信号量(integer semaphore)和记录型信号量(record semaphore)。整型信号量是最简单的信号量,记录型信号量则在整型信号量的基础上增加了一个进程等待队列。 信号量的创建需要使用 semget ...

    JAVA多线程--信号量(Semaphore)_.docx

    JAVA多线程--信号量(Semaphore) 信号量(Semaphore)是一种多线程环境下的设施,负责协调各个线程,以保证它们能够正确、合理地使用公共资源。从概念上讲,信号量维护了一个许可集。 信号量的类型有两种:单值信号...

    c语言信号量的使用实例

    ### c语言信号量的使用实例 #### 一、信号量的基本概念 信号量是一种用于解决进程间同步问题的机制,在多线程或多进程环境中尤为重要。它通过控制共享资源的访问来避免竞态条件,确保数据的一致性。信号量本质上是...

    semaphore控制多线程循序执行

    Semaphore(信号量)是一种经典的同步机制,它源自于荷兰计算机科学家Edsger W. Dijkstra提出的银行家算法。本示例中,我们将深入探讨如何使用Semaphore来控制多线程的循序执行。 Semaphore是一种计数信号量,它...

    rt-thread信号量_holecev_RT-Thread_rtthread信号量_信号量_

    在RT-Thread中,信号量(Semaphore)是多线程间同步和资源管理的重要工具,它允许线程之间进行协作,以实现对共享资源的有序访问。"holecev"可能是作者或项目的别名,而"RT-Thread_rtthread信号量_信号量_"则是强调...

    Java信号量Semaphore

    Semaphore  Semaphore分为单值和多值两种,前者只能被一个线程获得,...单个信号量的Semaphore对象可以实现互斥锁的功能,并且可以是由一个线程获得了“锁”,再由另一个线程释放“锁”,这可应用于死锁恢复的一些场

    JAVA 多线程之信号量(Semaphore)实例详解

    **JAVA 多线程之信号量Semaphore实例详解** 在Java多线程编程中,信号量Semaphore是一种非常重要的同步工具,用于控制对公共资源的访问。Semaphore类位于`java.util.concurrent`包下,它允许我们限制同时访问特定...

    vxworks多任务(task和semaphore信号量及事件)例程.zip

    资源包含了vxworks多任务(task和semaphore信号量及事件)的相关编程例子,通过创建多任务的简单例子,帮助大家了解taskSpawn函数、信号量semaphore、以及事件event的使用方法。

    QT 下 信号量使用

    在QT框架中,信号量(Semaphore)是一种非常重要的同步机制,它源于进程间通信(IPC)的概念,并在多线程编程中广泛使用。信号量允许我们控制对共享资源的访问,确保同一时间只有一个线程或者有限数量的线程能够访问...

    易语言多线程控制:信号量控制线程数量

    信号量(Semaphore)是一种经典的同步机制,用于控制对共享资源的访问。在易语言中,我们可以利用其提供的信号量API来实现线程间的同步与互斥。信号量值可以用来表示资源的可用数量,当线程试图获取一个资源时,如果...

    MFC关于信号量的例子

    在IT行业中,多线程编程是提升程序性能和并发处理能力的重要手段,而信号量(Semaphore)则是多线程同步中的关键概念。本篇将详细探讨MFC(Microsoft Foundation Classes)框架下如何利用信号量来实现线程间的同步与...

    Minix中实现信号量

    信号量(Semaphore)是操作系统中一种重要的同步机制,用于解决进程间的资源竞争问题。在Minix中实现信号量对于理解和优化多进程协作至关重要。 信号量是一种计数器,可以用于保护共享资源。在Minix中,信号量分为...

Global site tag (gtag.js) - Google Analytics