本文的主要内容是认识CountDownLatch、Semaphore、FutureTask的源码,这三个类在AQS中都用的是共享模式的,而且是可中断的,不同的是各自的请求锁和释放锁的操作,先说他们的不同点,在来说相同点。
首先看看CountDownLatch(闭锁),这个类在构建的时候初始一个阀值N,每调用一次countDown()后,N将减一直到为0时,线程才可以通过,下面来看看请求锁的代码
public int tryAcquireShared(int acquires) {
return getState() == 0? 1 : -1;
}
这里的state就是构建的时候指定的阀值,当state=0时,线程可以通过,否则进入同步队列等待。再来看看释放锁的操作
public boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
每执行一次countDown()方法就将state减一,如果state=0就返回ture,那么他将唤醒等待队列中第一个等待者。将这两部分串起来理解就是这个类的特点了。
接下来看看Semaphore(信号量),这个类有两种状态:公平和非公平,这里介绍的是非公平的实现。信号量的意义是指在构建的时候指定许可数N,在超过N个线程请求后,线程将进入同步队列等待,当然它也提供了增加许可的方法,现在来看看请求许可的代码
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
这是一个很典型的非阻塞算法,每次请求的时候将当前许可数减一,如果剩余许可remaining>=0,将执行cas操作,然后返回remaining,如果剩余许可大于等于0,线程可通过,否则进入同步队列等待。
再来看看释放许可的代码
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int p = getState();
if (compareAndSetState(p, p + releases))
return true;
}
}
将许可数state增加指定的值,cas操作成功后返回,否则重试。将这两部分的代码结合起来理解就是信号量要表达的意思了。
接下来看看FutureTask,这是一种异步任务的实现,它的特点就是在任务执行完之前去获取任务的结果,线程将进入同步队列等待直到任务完成,如果任务已完成直接获得结果。
在FutureTask的实现中,它自己定义了三个状态
/** State value representing that task is running */
private static final int RUNNING = 1;
/** State value representing that task ran */
private static final int RAN = 2;
/** State value representing that task was cancelled */
private static final int CANCELLED = 4;
具体的含义有英文解释,定义的值是1、2、4是为了方便位运算,下面来看看请求锁的代码
/**
* Implements AQS base acquire to succeed if ran or cancelled
*/
protected int tryAcquireShared(int ignore) {
return innerIsDone()? 1 : -1;
}
方法innerIsDone()表示任务是否完成,包括RAN和CANCELLED,任务在刚新建的时候的状态是为0,执行的过程中为
1,执行完为2,被删除后状态为4。下面再来看看"释放锁"的代码
/**
* Implements AQS base release to always signal after setting
* final done status by nulling runner thread.
*/
protected boolean tryReleaseShared(int ignore) {
runner = null;
return true;
}
始终返回true,结合这两部分代码就可以理解FutureTask的特点了。
最后来看看他们的相同部分:共享模式的入队操作,代码如下
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) { //标记1
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
break;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
// Arrive here only if interrupted
cancelAcquire(node);
throw new InterruptedException();
}
代码中同独占模式最大的不同点在于标记1的位置,条件判断中变量r在闭锁中表示state是否为0(true返回1,否则为-1),信号量中表示剩余信号量的值,FutureTask中表示任务是否完成(true返回1,否则返回-1),接下来看看
setHeadAndPropagate(node, r)的代码
private void setHeadAndPropagate(Node node, int propagate) {
setHead(node);
if (propagate > 0 && node.waitStatus != 0) {
/*
* Don't bother fully figuring out successor. If it
* looks null, call unparkSuccessor anyway to be safe.
*/
Node s = node.next;
if (s == null || s.isShared())
unparkSuccessor(node);
}
}
将当前节点设置为头结点,如果有足够的许可,并且后面还有等待节点就唤醒下一个节点,这一点和独占模式比起来就有一点共享的意思,比如CountDownLatch,只要countDown次数足够全部的等待线程都会唤醒;Semaphore,只要还有信号量就会继续唤醒等待线程;FutureTask,只要任务完成,所有的等待线程都将唤醒。
这里的中断机制是,线程被中断后,先把节点从同步队列中出队后抛出中断异常,java中中断就是这种模式的:线程A中断线程B后,由线程B选择适当的时候响应中断,这个操作由线程B自己处理的。
以上三个类都是AQS在共享模式下的应用。
分享到:
相关推荐
有以下类的实例: ThreadPool ScheduledThread CyclicBarrier BlockingQueue CountDownLatch FutureTask CompletionService Semaphore
CountDownLatch,CyclicBarrier,Semaphore源码解析.mp4 提前完成任务之FutureTask使用.mp4 Future设计模式实现(实现类似于JDK提供的Future).mp4 Future源码解读.mp4 ForkJoin框架详解.mp4 同步容器与并发容器.mp4 ...
4. **同步工具类**:CountDownLatch、CyclicBarrier、Semaphore和FutureTask等工具类为并发编程提供了更多的协调机制。CountDownLatch用于一次性释放多个线程,CyclicBarrier则允许一组线程等待所有线程到达屏障点后...
java forkjoin 源码 JDK源码学习: Java 容器 ArrayList LinkedList PriorityQueue ...CountDownLatch CyclicBarrier Semaphore ForkJoin FutureTask BlockingQueue Spring AOP IOC 面向面经复习
AQS相关应用(CountDownLatch、CyclicBarrier、Semaphore等),executor(ThreadPoolExecutor、ScheduledThreadPoolExecutor、FutureTask等),collection(ConcurrentHashMap、CopyOnWriteArrayList等), ...
- 并发工具类:Semaphore,CyclicBarrier,CountDownLatch,FutureTask等。 6. **IO流** - 字节流与字符流:InputStream、OutputStream、Reader、Writer的区别。 - 流的分类:节点流与处理流,输入流与输出流。 ...
- CountDownLatch的放行是由第三方(持有计数器的线程)控制,而CyclicBarrier的放行是由参与等待的线程自身控制。 - CountDownLatch的放行条件是计数器的值大于等于线程数,而CyclicBarrier的放行条件是计数器值...
CountDownLatch (减少计数器) CyclicBarrier(加法计数器) Semaphore(信号量,流量控制) ReentrantReadWriteLock (读写锁) BlockingQueue(阻塞队列) 线程池 池化技术 线程池的优势 线程池的特点 线程池三大...
第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57分钟 | 第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future...
第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57分钟 | 第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future...
第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57分钟 | 第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future...
第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57分钟 | 第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future...
5. **CountDownLatch**:`CountDownLatch`是一个计数器,通常用于让一个线程等待其他线程完成操作。计数器减到零时,所有等待的线程都可以继续执行。`TestCountDownLatch.java`可能展示了如何使用`CountDownLatch`来...
在JUC中,核心组件包括`ExecutorService`、`Semaphore`、`CountDownLatch`、`CyclicBarrier`、`Future`、`BlockingQueue`等。这些组件提供了丰富的线程管理、同步、通信和协调机制。 1. `ExecutorService`:它是...
- **FutureTask**:代表一个异步计算的结果,它实现了`Future`接口,允许查询计算是否完成,获取结果或者取消计算。 2. **使用场景** - **CountDownLatch**:适用于需要等待多个线程执行完毕后再继续的场景,如...
这个库包含了许多实用的工具类,如`FutureTask`、`CountDownLatch`、`Semaphore`等,这些工具类可以帮助开发者更好地处理并发问题,提高程序的性能和可扩展性。 主要特点如下: 1. 兼容性:这个库兼容JDK 1.5及以上...
如线程池(ExecutorService)、并发容器(ConcurrentHashMap、CopyOnWriteArrayList等)、锁机制(ReentrantLock、Semaphore等)、原子类(AtomicInteger、AtomicReference等)以及并发工具类(CountDownLatch、...
还需要考虑并发策略,如同步机制(synchronized、Lock)、并发容器(ConcurrentHashMap、CopyOnWriteArrayList)等,以及异常处理和线程间的通信(wait/notify、CountDownLatch、CyclicBarrier、Semaphore)。...