信号量Semaphore是java.util.concurrent包下一个常用的同步工具类,它维护了一个许可集,可以理解成资源数,可以通过acquire操作来获取一个资源,并通过release来释放一个资源,但需要注意的是,release来释放资源前不一定要先通过acquire来获取一个资源,如果不断的release而不进行acquire将导致资源数虚增,所以一定得我们自己来保证使用的正确性。
我们经常用信号量来管理可重复使用的资源,比如数据库连接、线程等,因为这些资源都有着可预估的上限,所以我们在初始化Semaphore时设定的许可数和我们需要管理的资源数一致,获取一个资源时就通过acquire来获取一个许可,如果没有可用资源,则acquire将阻塞,释放一个资源时,通过release来释放一个许可。用Semaphore来限制操作的并发访问程度也类似。
但有一点需要注意,Semaphore没有直接提供更新许可总数的方法,虽然你可以单独通过acquire或release来特意的减少或增加许可的总量,但这样做会让人感到奇怪。那么为什么Semaphore没有单独同重设信号量数量的方法呢?直接把AQS的setState方法暴露出来不就行的吗?因为setState操作如果发生在在某些使用该Semaphore的线程还没有走完整个信号量的获取和释放的流程时,将会直接导致state值的不准确。现在说可能让人不太理解,不用担心,看完本文,你就懂了。有人会想到当需要修改许可总数时,我再重新new一个Semaphore出来不就行了?比如像下面:
private volatile Semaphore jdbcConnection = new Semaphore(10);
public void resetJdbcConnection(int jdbcConnectionAmount) {
jdbcConnection = new Semaphore(jdbcConnectionAmount);
}
这里通过提供resetJdbcConnection方法来让外部可以修改jdbcConnection的许可数目,注意,这里的jdbcConnection必须是volatile,这样暴力修改是有风险的,因为你在修改时jdbcConnection很可能正在被使用,比如它进行了jdbcConnection.acquire()操作后,你把jdbcConnection给更换成另一个新的Semaphore,所以在你使用jdbcConnection.release()来释放一个许可时,是在新的Semaphore进行release操作,所以新的Semaphore的许可数量被莫名的+1了。有一种解决办法是用局部变量记录下操作acquire时的Semaphore,并在进行release时使用该局部变量来进行release,这样保证了acquire和release是在同一个Semaphore上操作,这种方法简单有效,适合绝大多数场景。之所以说Semaphore没有直接提供更新许可总数的方法,是因为Semaphore确有一个永久减少许可总数的方法,即:
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
很显然,它需要子类来继承使用,别妄想将reduction传入负数来使许可数增加,因为sync.reducePermits(reduction);中对reduction的值做了限制:
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
有人会好奇,为啥Semaphore提供了减少许可数的入口,但未提供增加许可数的入口?这个我暂时还未找到原因。
现在我们来分析Semaphore的内部实现,像绝大多数并发工具类一样,Semaphore也依赖于AQS(AbstractQueuedSynchronizer),关于AQS的简单分析,可以参考我的另一篇博文:http://manzhizhen.iteye.com/blog/2305890。你会很自然的想到,初始化时许可的总数就是保存在AQS的state属性中的:
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
Sync是Semaphore中实现的AQS的内部类,我们现在来看下Sync中关于非公平获取信号量和释放信号量的两个方法的实现:
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
从nonfairTryAcquireShared方法中可以看出,当需要获取acquires数量的许可时,先看可用的许可够不够,如果不够(remaining < 0),则直接返回还差的许可数的负数值,如果够,则从可用许可数中减去acquires,并返回剩余可用的许可数。由于nonfairTryAcquireShared是非阻塞的,所以它直接在Semaphore中的tryAcquire非阻塞方法中使用:
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
可见,对于tryAcquire的非阻塞方法,Semaphore的公平模式和非公平模式下的实现都是一样的,它在能获取到足够许可时不需要进入队列而是直接拿到走人,不能获取足够许可时就直接返回,是非公平的!注意,对于tryAcquire的阻塞方法还是有公平和非公平之分的。tryReleaseShared是用来释放许可数,我们可以看内部实现也很简单,就是不断的进行CAS操作指导成功将releases数目的许可数加回到当前可用许可数中。前面已经说过,Seamphore无法去校验你获取许可和释放许可是否一一对应,因为获取和释放都是直接在AQS的state上操作的,所以操作一段时间后,连AQS自己都忘记最初的state值是啥了,所以当然无法在中途来校验获取和释放是否正确,即使知道state的初始值,也很难在交错的获取和释放许可的操作过程中做一致性检查。
看过Semaphore的API文档都知道Semaphore中由公平(fair)和非公平(nonfair)两种模式,这两种模式在Semaphore创建时确定,中途不能修改,默认是非公平的。Semaphore的构造函数如下:
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
从名字就可以看出,FairSync代表公平的Sync实现,而NonfairSync代表非公平的Sync实现,从名字也看出内部类FairSync和NonfairSync都继承自上面提到的抽象类Sync。那么公平和非公平的含义到底是什么呢?看过我讲述AQS博文的朋友应该知道,我说过,只要线程由于获取资源数失败而进入队列中后,就一定得等前面的节点获取完锁才有机会尝试获取锁,也就是说在AQS队列中的线程绝对是公平的,因为队列本来就是先进先出,即先到先得。但如果你想获取的资源数现在就有,那么即使现在队列中有线程排队在等,你也可以不用进入队列而直接拿到你想要的资源,这就是非公平!所以,你很自然的想到,公平的做法就是不管此时有没有可用的许可,只要队列中有线程在等,你就得给我乖乖去排队,没错,公平就是这样的!我们先看非公平的NonfairSync实现:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
tryAcquireShared方法调用的nonfairTryAcquireShared是在父类Sync中实现的,前面已经给出了代码实现,这里就不在多说了。我们再看看公平的FairSync实现:
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
FairSync的获取许可数方法tryAcquireShared和nonfairTryAcquireShared比起来唯一的特别之处就是hasQueuedPredecessors方法的调用,而hasQueuedPredecessors是AQS中实现的:
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
从方法名和实现可以看出,hasQueuedPredecessors主要用来判断AQS队列中是否还有等待的线程节点,如果有,并且不是当前的线程节点,则返回true,所以,对于tryAcquireShared来说,此时都不进行资源获取的尝试就直接返回-1表明资源获取失败了。
现在许可数、公平和非公平都介绍完了,我们来看看获取许可数的相关方法。acquire将会在获取许可之前一直阻塞,或者被中断,我们看其内部实现:
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
可以看出,它直接调用了AQS中的acquireSharedInterruptibly的实现,acquireSharedInterruptibly的实现在我的AQS博文中已经分析过,这里不再阐述。我们再来看看获取信号的限时阻塞方法tryAcquire的实现:
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
可见,Semaphore也没对其有特别的实现,而是直接调用AQS中的tryAcquireSharedNanos方法,tryAcquireSharedNanos在我的AQS博文中已经做过简单的介绍,这里不再阐述。
可以看出,只要能弄懂AQS,你就可以明白Semaphore的核心实现!
相关推荐
信号量(Semaphore)是操作系统中一种重要的同步机制,它用于管理多个线程对共享资源的访问,以防止并发执行时产生竞态条件。在多线程编程中,当多个线程试图同时访问一个有限的资源时,信号量可以帮助我们协调这些...
信号量Semaphore,这是一个在多线程编程中至关重要的同步机制,尤其在操作系统设计和并发编程领域,它扮演着协调多个线程对共享资源访问的角色。面试中被问及但回答不出,可能会显示出对并发控制理解的不足,因此,...
多线程同时运行,能提高程序的运行效率,但是并非线程越多越好,而semaphore信号量可以通过内置计数器来控制同时运行线程的数量,启动线程(消耗信号量)内置计数器会自动减一,线程结束(释放信号量)内置计数器会自动...
Java 信号量Semaphore的实现 Java 信号量Semaphore是Java并发编程中的一种机制,用于控制多个线程的并发执行。Semaphore的实现主要是通过计数器来实现的,每个Semaphore对象都维护着一个计数器,用于记录当前可用的...
### 信号量同步等待机制(Semaphore Wait-and-Signal) #### 一、引言 在多线程编程中,临界资源的访问管理是一项至关重要的任务。为了确保数据的一致性和程序的正确执行,必须采取有效的机制来防止多个线程同时...
信号量(Semaphore)是实现多线程同步的一种有效工具,常用于控制对共享资源的访问。在这个名为"Mthread11"的MFC工程中,我们可以看到如何在C++环境中应用信号量来解决多线程间的同步问题。 首先,我们需要理解什么...
信号量(Semaphore)哲学家进餐问题(the dining philosophers problem)---------------------------程序
信号量可以分类为整型信号量(integer semaphore)和记录型信号量(record semaphore)。整型信号量是最简单的信号量,记录型信号量则在整型信号量的基础上增加了一个进程等待队列。 信号量的创建需要使用 semget ...
JAVA多线程--信号量(Semaphore) 信号量(Semaphore)是一种多线程环境下的设施,负责协调各个线程,以保证它们能够正确、合理地使用公共资源。从概念上讲,信号量维护了一个许可集。 信号量的类型有两种:单值信号...
### c语言信号量的使用实例 #### 一、信号量的基本概念 信号量是一种用于解决进程间同步问题的机制,在多线程或多进程环境中尤为重要。它通过控制共享资源的访问来避免竞态条件,确保数据的一致性。信号量本质上是...
Semaphore(信号量)是一种经典的同步机制,它源自于荷兰计算机科学家Edsger W. Dijkstra提出的银行家算法。本示例中,我们将深入探讨如何使用Semaphore来控制多线程的循序执行。 Semaphore是一种计数信号量,它...
在RT-Thread中,信号量(Semaphore)是多线程间同步和资源管理的重要工具,它允许线程之间进行协作,以实现对共享资源的有序访问。"holecev"可能是作者或项目的别名,而"RT-Thread_rtthread信号量_信号量_"则是强调...
Semaphore Semaphore分为单值和多值两种,前者只能被一个线程获得,...单个信号量的Semaphore对象可以实现互斥锁的功能,并且可以是由一个线程获得了“锁”,再由另一个线程释放“锁”,这可应用于死锁恢复的一些场
**JAVA 多线程之信号量Semaphore实例详解** 在Java多线程编程中,信号量Semaphore是一种非常重要的同步工具,用于控制对公共资源的访问。Semaphore类位于`java.util.concurrent`包下,它允许我们限制同时访问特定...
资源包含了vxworks多任务(task和semaphore信号量及事件)的相关编程例子,通过创建多任务的简单例子,帮助大家了解taskSpawn函数、信号量semaphore、以及事件event的使用方法。
在QT框架中,信号量(Semaphore)是一种非常重要的同步机制,它源于进程间通信(IPC)的概念,并在多线程编程中广泛使用。信号量允许我们控制对共享资源的访问,确保同一时间只有一个线程或者有限数量的线程能够访问...
信号量(Semaphore)是一种经典的同步机制,用于控制对共享资源的访问。在易语言中,我们可以利用其提供的信号量API来实现线程间的同步与互斥。信号量值可以用来表示资源的可用数量,当线程试图获取一个资源时,如果...
在IT行业中,多线程编程是提升程序性能和并发处理能力的重要手段,而信号量(Semaphore)则是多线程同步中的关键概念。本篇将详细探讨MFC(Microsoft Foundation Classes)框架下如何利用信号量来实现线程间的同步与...
信号量(Semaphore)是操作系统中一种重要的同步机制,用于解决进程间的资源竞争问题。在Minix中实现信号量对于理解和优化多进程协作至关重要。 信号量是一种计数器,可以用于保护共享资源。在Minix中,信号量分为...