线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:
|
|
|
|
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
|
|
|
|
|
- corePoolSize
线程池维护线程的最少数量
- maximumPoolSiz
线程池维护线程的最大数量
- keepAliveTime
线程池维护线程所允许的空闲时间
- unit
线程池维护线程所允许的空闲时间的单位
- workQueue
线程池所使用的缓冲队列
- handler
线程池对拒绝任务的处理策略
一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是 Runnable类型对象的run()方法。
当一个任务通过execute(Runnable)方法欲添加到线程池时:
- 如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
- 如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
- 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
- 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。
也就是:处理任务的优先级为:
核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
unit可选的参数为java.util.concurrent.TimeUnit中的几个静态属性:
NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。
workQueue我常用的是:java.util.concurrent.ArrayBlockingQueue
handler有四个选择:
- ThreadPoolExecutor.AbortPolicy()
抛出java.util.concurrent.RejectedExecutionException异常
- ThreadPoolExecutor.CallerRunsPolicy()
由调用者执行这个任务
- ThreadPoolExecutor.DiscardOldestPolicy()
抛弃旧的任务
- ThreadPoolExecutor.DiscardPolicy()
抛弃当前的任务
二、一般用法举例
|
|
|
|
package cn.simplelife.exercise;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestThreadPool {
private static int produceTaskSleepTime = 2;
public static void main(String[] args) {
//构造一个线程池
ThreadPoolExecutor producerPool = new ThreadPoolExecutor(2, 4, 0,
TimeUnit.SECONDS, new ArrayBlockingQueue(3),
new ThreadPoolExecutor.DiscardOldestPolicy());
//每隔produceTaskSleepTime的时间向线程池派送一个任务。
int i=1;
while(true){
try {
Thread.sleep(produceTaskSleepTime);
String task = "task@ " + i;
System.out.println("put " + task);
producerPool.execute(new ThreadPoolTask(task));
i++;
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
|
|
|
|
|
|
|
|
|
package cn.simplelife.exercise;
import java.io.Serializable;
/**
* 线程池执行的任务
* @author hdpan
*/
public class ThreadPoolTask implements Runnable,Serializable{
//JDK1.5中,每个实现Serializable接口的类都推荐声明这样的一个ID
private static final long serialVersionUID = 0;
private static int consumeTaskSleepTime = 2000;
private Object threadPoolTaskData;
ThreadPoolTask(Object tasks){
this.threadPoolTaskData = tasks;
}
//每个任务的执行过程,现在是什么都没做,除了print和sleep,:)
public void run(){
System.out.println("start .."+threadPoolTaskData);
try {
//便于观察现象,等待一段时间
Thread.sleep(consumeTaskSleepTime);
} catch (Exception e) {
e.printStackTrace();
}
threadPoolTaskData = null;
}
}
|
**************下面属于Brian Goetz (brian@quiotix.com)Quiotix所写**********
Java™ 5.0 第一次让使用 Java 语言开发非阻塞算法成为可能,java.util.concurrent
包充分地利用了这个功能。非阻塞算法属于并发算法,它们可以安全地派生它们的线程,不通过锁定派生,而是通过低级的原子性的硬件原生形式 —— 例如比较和交换。非阻塞算法的设计与实现极为困难,但是它们能够提供更好的吞吐率,对生存问题(例如死锁和优先级反转)也能提供更好的防御。在这期的 Java 理论与实践 中,并发性大师 Brian Goetz 演示了几种比较简单的非阻塞算法的工作方式。
<!----><!----><!---->
在不只一个线程访问一个互斥的变量时,所有线程都必须使用同步,否则就可能会发生一些非常糟糕的事情。Java 语言中主要的同步手段就是 synchronized
关键字(也称为内在锁),它强制实行互斥,确保执行 synchronized
块的线程的动作,能够被后来执行受相同锁保护的 synchronized
块的其他线程看到。在使用得当的时候,内在锁可以让程序做到线程安全,但是在使用锁定保护短的代码路径,而且线程频繁地争用锁的时候,锁定可能成为相当繁重的操作。
在 “流行的原子” 一文中,我们研究了原子变量,原子变量提供了原子性的读-写-修改操作,可以在不使用锁的情况下安全地更新共享变量。原子变量的内存语义与 volatile 变量类似,但是因为它们也可以被原子性地修改,所以可以把它们用作不使用锁的并发算法的基础。
非阻塞的计数器
清单 1 中的 Counter
是线程安全的,但是使用锁的需求带来的性能成本困扰了一些开发人员。但是锁是必需的,因为虽然增加看起来是单一操作,但实际是三个独立操作的简化:检索值,给值加 1,再写回值。(在 getValue
方法上也需要同步,以保证调用 getValue
的线程看到的是最新的值。虽然许多开发人员勉强地使自己相信忽略锁定需求是可以接受的,但忽略锁定需求并不是好策略。)
在多个线程同时请求同一个锁时,会有一个线程获胜并得到锁,而其他线程被阻塞。JVM 实现阻塞的方式通常是挂起阻塞的线程,过一会儿再重新调度它。由此造成的上下文切换相对于锁保护的少数几条指令来说,会造成相当大的延迟。
清单 1. 使用同步的线程安全的计数器
public final class Counter {
private long value = 0;
public synchronized long getValue() {
return value;
}
public synchronized long increment() {
return ++value;
}
}
|
清单 2 中的 NonblockingCounter
显示了一种最简单的非阻塞算法:使用 AtomicInteger
的 compareAndSet()
(CAS)方法的计数器。compareAndSet()
方法规定 “将这个变量更新为新值,但是如果从我上次看到这个变量之后其他线程修改了它的值,那么更新就失败”(请参阅 “流行的原子” 获得关于原子变量以及 “比较和设置” 的更多解释。)
清单 2. 使用 CAS 的非阻塞算法
public class NonblockingCounter {
private AtomicInteger value;
public int getValue() {
return value.get();
}
public int increment() {
int v;
do {
v = value.get();
while (!value.compareAndSet(v, v + 1));
return v + 1;
}
}
|
原子变量类之所以被称为原子的,是因为它们提供了对数字和对象引用的细粒度的原子更新,但是在作为非阻塞算法的基本构造块的意义上,它们也是原子的。非阻塞算法作为科研的主题,已经有 20 多年了,但是直到 Java 5.0 出现,在 Java 语言中才成为可能。
现代的处理器提供了特殊的指令,可以自动更新共享数据,而且能够检测到其他线程的干扰,而 compareAndSet()
就用这些代替了锁定。(如果要做的只是递增计数器,那么 AtomicInteger
提供了进行递增的方法,但是这些方法基于 compareAndSet()
,例如 NonblockingCounter.increment()
)。
非阻塞版本相对于基于锁的版本有几个性能优势。首先,它用硬件的原生形态代替 JVM 的锁定代码路径,从而在更细的粒度层次上(独立的内存位置)进行同步,失败的线程也可以立即重试,而不会被挂起后重新调度。更细的粒度降低了争用的机会,不用重新调度就能重试的能力也降低了争用的成本。即使有少量失败的 CAS 操作,这种方法仍然会比由于锁争用造成的重新调度快得多。
NonblockingCounter
这个示例可能简单了些,但是它演示了所有非阻塞算法的一个基本特征 —— 有些算法步骤的执行是要冒险的,因为知道如果 CAS 不成功可能不得不重做。非阻塞算法通常叫作乐观算法,因为它们继续操作的假设是不会有干扰。如果发现干扰,就会回退并重试。在计数器的示例中,冒险的步骤是递增 —— 它检索旧值并在旧值上加一,希望在计算更新期间值不会变化。如果它的希望落空,就会再次检索值,并重做递增计算。
非阻塞堆栈
非阻塞算法稍微复杂一些的示例是清单 3 中的 ConcurrentStack
。ConcurrentStack
中的 push()
和 pop()
操作在结构上与 NonblockingCounter
上相似,只是做的工作有些冒险,希望在 “提交” 工作的时候,底层假设没有失效。push()
方法观察当前最顶的节点,构建一个新节点放在堆栈上,然后,如果最顶端的节点在初始观察之后没有变化,那么就安装新节点。如果 CAS 失败,意味着另一个线程已经修改了堆栈,那么过程就会重新开始。
清单 3. 使用 Treiber 算法的非阻塞堆栈
public class ConcurrentStack<E> {
AtomicReference<Node<E>> head = new AtomicReference<Node<E>>();
public void push(E item) {
Node<E> newHead = new Node<E>(item);
Node<E> oldHead;
do {
oldHead = head.get();
newHead.next = oldHead;
} while (!head.compareAndSet(oldHead, newHead));
}
public E pop() {
Node<E> oldHead;
Node<E> newHead;
do {
oldHead = head.get();
if (oldHead == null)
return null;
newHead = oldHead.next;
} while (!head.compareAndSet(oldHead,newHead));
return oldHead.item;
}
static class Node<E> {
final E item;
Node<E> next;
public Node(E item) { this.item = item; }
}
}
|
性能考虑
在轻度到中度的争用情况下,非阻塞算法的性能会超越阻塞算法,因为 CAS 的多数时间都在第一次尝试时就成功,而发生争用时的开销也不涉及线程挂起和上下文切换,只多了几个循环迭代。没有争用的 CAS 要比没有争用的锁便宜得多(这句话肯定是真的,因为没有争用的锁涉及 CAS 加上额外的处理),而争用的 CAS 比争用的锁获取涉及更短的延迟。
在高度争用的情况下(即有多个线程不断争用一个内存位置的时候),基于锁的算法开始提供比非阻塞算法更好的吞吐率,因为当线程阻塞时,它就会停止争用,耐心地等候轮到自己,从而避免了进一步争用。但是,这么高的争用程度并不常见,因为多数时候,线程会把线程本地的计算与争用共享数据的操作分开,从而给其他线程使用共享数据的机会。(这么高的争用程度也表明需要重新检查算法,朝着更少共享数据的方向努力。)“流行的原子” 中的图在这方面就有点儿让人困惑,因为被测量的程序中发生的争用极其密集,看起来即使对数量很少的线程,锁定也是更好的解决方案。
非阻塞的链表
目前为止的示例(计数器和堆栈)都是非常简单的非阻塞算法,一旦掌握了在循环中使用 CAS,就可以容易地模仿它们。对于更复杂的数据结构,非阻塞算法要比这些简单示例复杂得多,因为修改链表、树或哈希表可能涉及对多个指针的更新。CAS 支持对单一指针的原子性条件更新,但是不支持两个以上的指针。所以,要构建一个非阻塞的链表、树或哈希表,需要找到一种方式,可以用 CAS 更新多个指针,同时不会让数据结构处于不一致的状态。
在链表的尾部插入元素,通常涉及对两个指针的更新:“尾” 指针总是指向列表中的最后一个元素,“下一个” 指针从过去的最后一个元素指向新插入的元素。因为需要更新两个指针,所以需要两个 CAS。在独立的 CAS 中更新两个指针带来了两个需要考虑的潜在问题:如果第一个 CAS 成功,而第二个 CAS 失败,会发生什么?如果其他线程在第一个和第二个 CAS 之间企图访问链表,会发生什么?
对于非复杂数据结构,构建非阻塞算法的 “技巧” 是确保数据结构总处于一致的状态(甚至包括在线程开始修改数据结构和它完成修改之间),还要确保其他线程不仅能够判断出第一个线程已经完成了更新还是处在更新的中途,还能够判断出如果第一个线程走向 AWOL,完成更新还需要什么操作。如果线程发现了处在更新中途的数据结构,它就可以 “帮助” 正在执行更新的线程完成更新,然后再进行自己的操作。当第一个线程回来试图完成自己的更新时,会发现不再需要了,返回即可,因为 CAS 会检测到帮助线程的干预(在这种情况下,是建设性的干预)。
这种 “帮助邻居” 的要求,对于让数据结构免受单个线程失败的影响,是必需的。如果线程发现数据结构正处在被其他线程更新的中途,然后就等候其他线程完成更新,那么如果其他线程在操作中途失败,这个线程就可能永远等候下去。即使不出现故障,这种方式也会提供糟糕的性能,因为新到达的线程必须放弃处理器,导致上下文切换,或者等到自己的时间片过期(而这更糟)。
清单 4 的 LinkedQueue
显示了 Michael-Scott 非阻塞队列算法的插入操作,它是由 ConcurrentLinkedQueue
实现的:
清单 4. Michael-Scott 非阻塞队列算法中的插入
public class LinkedQueue <E> {
private static class Node <E> {
final E item;
final AtomicReference<Node<E>> next;
Node(E item, Node<E> next) {
this.item = item;
this.next = new AtomicReference<Node<E>>(next);
}
}
private AtomicReference<Node<E>> head
= new AtomicReference<Node<E>>(new Node<E>(null, null));
private AtomicReference<Node<E>> tail = head;
public boolean put(E item) {
Node<E> newNode = new Node<E>(item, null);
while (true) {
Node<E> curTail = tail.get();
Node<E> residue = curTail.next.get();
if (curTail == tail.get()) {
if (residue == null) /* A */ {
if (curTail.next.compareAndSet(null, newNode)) /* C */ {
tail.compareAndSet(curTail, newNode) /* D */ ;
return true;
}
} else {
tail.compareAndSet(curTail, residue) /* B */;
}
}
}
}
}
|
像许多队列算法一样,空队列只包含一个假节点。头指针总是指向假节点;尾指针总指向最后一个节点或倒数第二个节点。图 1 演示了正常情况下有两个元素的队列:
图 1. 有两个元素,处在静止状态的队列
如 清单 4 所示,插入一个元素涉及两个指针更新,这两个更新都是通过 CAS 进行的:从队列当前的最后节点(C)链接到新节点,并把尾指针移动到新的最后一个节点(D)。如果第一步失败,那么队列的状态不变,插入线程会继续重试,直到成功。一旦操作成功,插入被当成生效,其他线程就可以看到修改。还需要把尾指针移动到新节点的位置上,但是这项工作可以看成是 “清理工作”,因为任何处在这种情况下的线程都可以判断出是否需要这种清理,也知道如何进行清理。
队列总是处于两种状态之一:正常状态(或称静止状态,图 1 和 图 3)或中间状态(图 2)。在插入操作之前和第二个 CAS(D)成功之后,队列处在静止状态;在第一个 CAS(C)成功之后,队列处在中间状态。在静止状态时,尾指针指向的链接节点的 next 字段总为 null,而在中间状态时,这个字段为非 null。任何线程通过比较 tail.next
是否为 null,就可以判断出队列的状态,这是让线程可以帮助其他线程 “完成” 操作的关键。
图 2. 处在插入中间状态的队列,在新元素插入之后,尾指针更新之前
插入操作在插入新元素(A)之前,先检查队列是否处在中间状态,如 清单 4 所示。如果是在中间状态,那么肯定有其他线程已经处在元素插入的中途,在步骤(C)和(D)之间。不必等候其他线程完成,当前线程就可以 “帮助” 它完成操作,把尾指针向前移动(B)。如果有必要,它还会继续检查尾指针并向前移动指针,直到队列处于静止状态,这时它就可以开始自己的插入了。
第一个 CAS(C)可能因为两个线程竞争访问队列当前的最后一个元素而失败;在这种情况下,没有发生修改,失去 CAS 的线程会重新装入尾指针并再次尝试。如果第二个 CAS(D)失败,插入线程不需要重试 —— 因为其他线程已经在步骤(B)中替它完成了这个操作!
图 3. 在尾指针更新后,队列重新处在静止状态
幕后的非阻塞算法
如果深入 JVM 和操作系统,会发现非阻塞算法无处不在。垃圾收集器使用非阻塞算法加快并发和平行的垃圾搜集;调度器使用非阻塞算法有效地调度线程和进程,实现内在锁。在 Mustang(Java 6.0)中,基于锁的 SynchronousQueue
算法被新的非阻塞版本代替。很少有开发人员会直接使用 SynchronousQueue
,但是通过 Executors.newCachedThreadPool()
工厂构建的线程池用它作为工作队列。比较缓存线程池性能的对比测试显示,新的非阻塞同步队列实现提供了几乎是当前实现 3 倍的速度。在 Mustang 的后续版本(代码名称为 Dolphin)中,已经规划了进一步的改进。
结束语
非阻塞算法要比基于锁的算法复杂得多。开发非阻塞算法是相当专业的训练,而且要证明算法的正确也极为困难。但是在 Java 版本之间并发性能上的众多改进来自对非阻塞算法的采用,而且随着并发性能变得越来越重要,可以预见在 Java 平台的未来发行版中,会使用更多的非阻塞算法。
分享到:
相关推荐
30 Java5 多线程实践.mht 31 Java 理论与实践 并发集合类.mht 32 Java 理论与实践 构建一个更好的 HashMap.mht 33 Java 理论与实践 JDK 5_0 中更灵活、更具可伸缩性的锁定机制.mht 34 Java 理论与实践 流行的...
在本项目“Java多线程与线程安全实践-基于Http协议的断点续传”中,我们将深入探讨如何利用Java的多线程机制实现HTTP协议下的断点续传功能,这对于大文件下载或上传的场景尤为实用。 断点续传是一种允许用户在中断...
在本项目中,Java被选为实现多线程OCR服务器的语言,因为它提供了丰富的多线程API,如`java.util.concurrent`包,可以方便地创建和管理并发任务。 【项目结构与实现】 “HeliosServer”可能包含以下组件: 1. **...
《基于Java开发的多线程下载工具》 在IT领域,高效的文件下载是不可或缺的一环,尤其是对于大文件,多线程下载技术可以显著提升下载速度。本项目是一款基于Java语言开发的多线程下载工具,它实现了HTTP和FTP协议的...
java.util.concurrent 多线程框架 java.util.concurrent 多线程框架是 Java 语言中用于多线程编程的库。该库提供了多种线程池实现、并发集合、同步器、lock 等多种机制,以便开发者更方便地编写高效、可靠的多线程...
"基于Java多线程同步的安全性研究" 本文主要研究了基于Java多线程同步的安全性问题,讨论了Java多线程同步机制的实现方法和安全性问题的解决方法。文章首先介绍了Java多线程同步的必要性和重要性,然后讨论了Java多...
Java中的多线程编程则基于这种操作系统级别的并发模型。 Java虚拟机(JVM)为每一个Java应用程序启动一个进程,而在这个进程中,所有的代码执行都是通过线程来完成的。默认情况下,Java程序的main方法在一个称为...
在Java编程中,多线程查询数据库是一种常见的优化策略,特别是在处理大数据量或者需要并行执行多个查询时。本文将详细探讨如何利用Java的多线程技术和线程池来实现并发查询数据库,以及相关的文件`BatchDataUtil....
Java多线程是Java编程语言中一个非常重要的概念,它允许开发者在一个程序中创建多个执行线程并行运行,以提高程序的执行效率和响应速度。在Java中,线程的生命周期包含五个基本状态,分别是新建状态(New)、就绪...
在本毕业设计中,主题聚焦于使用Java进行多线程编程和线程安全的实践,特别是在基于HTTP协议的断点续传功能的设计与实现上。这个项目旨在加深对Java并发编程的理解,以及如何利用网络协议解决实际问题。下面将详细...
本主题将深入探讨如何使用Java的并发包(java.util.concurrent)来实现多线程对数据库数据的批量处理,包括增、删、改等操作。 首先,我们需要了解Java中的线程基础。线程是程序执行的最小单位,一个进程可以包含多...
Java并发编程是Java平台中的重要特性,它提供了一套强大且高效的工具,使得开发者能够创建多线程程序,实现高效并行处理。本资源“java concurrent 精简源码”着重关注Java并发库(java.util.concurrent)的核心概念...
此外,Java 5引入了java.util.concurrent并发包,提供了更高级的线程管理工具,如ExecutorService、ThreadPoolExecutor和Future。ExecutorService允许我们创建线程池,有效地管理线程生命周期,避免频繁创建和销毁...
因此,如果多个线程需要访问同一资源,需要使用`synchronized`关键字或者`java.util.concurrent`包下的工具来保证数据一致性,防止数据竞争。 ### 7. 异常处理和关闭资源 在Java Socket编程中,确保在完成操作后...
在Java编程中,多线程是一项关键特性,它允许程序同时执行多个任务,极大地提高了效率。在这个特定的项目中,“java用多线程进行排序算法的比较”关注的是如何利用多线程技术来实现和比较不同的排序算法,尤其是快速...
本文将深入探讨Java中的多线程概念,以及如何通过实践来提升对这一主题的理解。 首先,我们要明白线程的基本概念。线程是操作系统分配CPU时间的基本单元,一个进程可以包含一个或多个线程。在Java中,我们可以创建...
在Java中,我们可以利用`java.util.concurrent`包中的工具来实现多线程快速排序。线程池如`ExecutorService`可以管理和控制线程的创建和销毁,避免过多线程导致的系统资源浪费。`Future`接口可以用来获取异步任务的...
Java多线程编程是Java开发中的...以上内容只是《Java多线程编程核心技术》教程中的一部分核心知识点,实际学习中还需要结合具体示例和实践来深入理解和掌握。通过学习,开发者可以编写出高效、稳定的多线程Java程序。
在本资料“基于java的多线程程序死锁检查 JCarder.zip”中,我们将探讨如何利用JCarder工具来检测和预防Java多线程死锁。 首先,让我们理解死锁的基本条件。在Java中,当满足以下四个条件时,就可能出现死锁: 1. ...
本项目“基于Java实现的多线程与线程安全实践-基于Http协议的断点续传”深入探讨了如何利用Java的多线程特性以及Http协议来实现断点续传功能。断点续传是一种在网络上传输大文件时,允许在传输中断后从上次中断的...