`

Java并发的四种风味:Thread、Executor、ForkJoin和Actor

 
阅读更多

这篇文章讨论了Java应用中并行处理的多种方法。从自己管理Java线程,到各种更好几的解决方法,Executor服务、ForkJoin 框架以及计算中的Actor模型。

Java并发编程的4种风格:Threads,Executors,ForkJoin和Actors

我们生活在一个事情并行发生的世界。自然地,我们编写的程序也反映了这个特点,它们可以并发的执行。当然除了Python代码(译者注:链接里面讲述了Python的全局解释器锁,解释了原因),不过你仍然可以使用Jython在JVM上运行你的程序,来利用多处理器电脑的强大能力。

然而,并发程序的复杂程度远远超出了人类大脑的处理能力。相比较而言,我们简直弱爆了:我们生来就不是为了思考多线程程序、评估并发访问有限资源以及预测哪里会发生错误或者瓶颈。

面对这些困难,人类已经总结了不少并发计算的解决方案和模型。这些模型强调问题的不同部分,当我们实现并行计算时,可以根据问题做出不同的选择。

在这篇文章中,我将会用对同一个问题,用不同的代码来实现并发的解决方案;然后讨论这些方案有哪些好的地方,有哪些缺陷,可能会有什么样的陷阱在等着你。

我们将介绍下面几种并发处理和异步代码的方式:

• 裸线程

• Executors和Services

• ForkJoin框架和并行流

• Actor模型

为了更加有趣一些,我没有仅仅通过一些代码来说明这些方法,而是使用了一个共同的任务,因此每一节中的代码差不多都是等价的。另外,这些代码仅仅是展示用的,初始化的代码并没有写出来,并且它们也不是产品级的软件示例。

对了,最后一件事:在文章最后,有一个小调查,关于你或者你的组织正在使用哪种并发模式。为了你的工程师同胞们,请填一下调查!

任务

任务:实现一个方法,它接收一条消息和一组字符串作为参数,这些字符串与某个搜索引擎的查询页面对应。对每个字符串,这个方法发出一个http请求来查询消息,并返回第一条可用的结果,越快越好。

如果有错误发生,抛出一个异常或者返回空都是可以的。我只是尝试避免为了等待结果而出现无限循环。

简单说明:这次我不会真正深入到多线程如何通讯的细节,或者深入到Java内存模型。如果你迫切地想了解这些,你可以看我前面的文章利用JCStress测试并发

那么,让我们从最直接、最核心的方式来在JVM上实现并发:手动管理裸线程。

方法1:使用“原汁原味”的裸线程

解放你的代码,回归自然,使用裸线程!线程是并发最基本的单元。Java线程本质上被映射到操作系统线程,并且每个线程对象对应着一个计算机底层线程。

自然地,JVM管理着线程的生存期,而且只要你不需要线程间通讯,你也不需要关注线程调度。

每个线程有自己的栈空间,它占用了JVM进程空间的指定一部分。

线程的接口相当简明,你只需要提供一个Runnable,调用.start()开始计算。没有现成的API来结束线程,你需要自己来实现,通过类似boolean类型的标记来通讯。

在下面的例子中,我们对每个被查询的搜索引擎,创建了一个线程。查询的结果被设置到AtomicReference,它不需要锁或者其他机制来保证只出现一次写操作。开始吧!

1
2
3
4
5
6
7
8
9
10
11
private static String getFirstResult(String question, List<String> engines) {
 AtomicReference<String> result = new AtomicReference<>();
 for(String base: engines) {
   String url = base + question;
   new Thread(() -> {
     result.compareAndSet(null, WS.url(url).get());
   }).start();
 }
 while(result.get() == null); // wait for some result to appear
 return result.get();
}

使用裸线程的主要优点是,你很接近并发计算的操作系统/硬件模型,并且这个模型非常简单。多个线程运行,通过共享内存通讯,就是这样。

自己管理线程的最大劣势是,你很容易过分的关注线程的数量。线程是很昂贵的对象,创建它们需要耗费大量的内存和时间。这是一个矛盾,线程太少,你不能获得良好的并发性;线程太多,将很可能导致内存问题,调度也变得更复杂。

然而,如果你需要一个快速和简单的解决方案,你绝对可以使用这个方法,不要犹豫。

方法2:认真对待Executor和CompletionService

另一个选择是使用API来管理一组线程。幸运的是,JVM为我们提供了这样的功能,就是Executor接口。Executor接口的定义非常简单:

1
2
3
4
5
public interface Executor {
 
void execute(Runnable command);
 
}

它隐藏了如何处理Runnable的细节。它仅仅说,“开发者!你只是一袋肉,给我任务,我会处理它!”

更酷的是,Executors类提供了一组方法,能够创建拥有完善配置的线程池和executor。我们将使用newFixedThreadPool(),它创建预定义数量的线程,并不允许线程数量超过这个预定义值。这意味着,如果所有的线程都被使用的话,提交的命令将会被放到一个队列中等待;当然这是由executor来管理的。

在它的上层,有ExecutorService管理executor的生命周期,以及CompletionService会抽象掉更多细节,作为已完成任务的队列。得益于此,我们不必担心只会得到第一个结果。

下面service.take()的一次调用将会只返回一个结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static String getFirstResultExecutors(String question, List<String> engines) {
 ExecutorCompletionService<String> service = new ExecutorCompletionService<String>(Executors.newFixedThreadPool(4));
 
 for(String base: engines) {
   String url = base + question;
   service.submit(() -> {
     return WS.url(url).get();
   });
 }
   try {
     return service.take().get();
   }
   catch(InterruptedException | ExecutionException e) {
     return null;
   }
}

如果你需要精确的控制程序产生的线程数量,以及它们的精确行为,那么executor和executor服务将是正确的选择。例如,需要仔细考虑的一个重要问题是,当所有线程都在忙于做其他事情时,需要什么样的策略?增加线程数量或者不做数量限制?把任务放入到队列等待?如果队列也满了呢?无限制的增加队列大小?

感谢JDK,已经有很多配置项回答了这些问题,并且有着直观的名字,例如上面的Executors.newFixedThreadPool(4)。

线程和服务的生命周期也可以通过选项来配置,使资源可以在恰当的时间关闭。唯一的不便之处是,对新手来说,配置选项可以更简单和直观一些。然而,在并发编程方面,你几乎找不到更简单的了。

总之,对于大型系统,我个人认为使用executor最合适。

方法3:通过并行流,使用ForkJoinPool (FJP)

Java 8中加入了并行流,从此我们有了一个并行处理集合的简单方法。它和lambda一起,构成了并发计算的一个强大工具。

如果你打算运用这种方法,那么有几点需要注意。首先,你必须掌握一些函数编程的概念,它实际上更有优势。其次,你很难知道并行流实际上是否使用了超过一个线程,这要由流的具体实现来决定。如果你无法控制流的数据源,你就无法确定它做了什么。

另外,你需要记住,默认情况下是通过ForkJoinPool.commonPool()实现并行的。这个通用池由JVM来管理,并且被JVM进程内的所有线程共享。这简化了配置项,因此你不用担心。

1
2
3
4
5
6
7
8
private static String getFirstResult(String question, List<String> engines) {
 // get element as soon as it is available
 Optional<String> result = engines.stream().parallel().map((base) -> {
   String url = base + question;
   return WS.url(url).get();
 }).findAny();
 return result.get();
}

看上面的例子,我们不关心单独的任务在哪里完成,由谁完成。然而,这也意味着,你的应用程序中可能存在一些停滞的任务,而你却无法不知道。在另一篇关于并行流的文章中,我详细地描述了这个问题。并且有一个变通的解决方案,虽然它并不是世界上最直观的方案。

ForkJoin是一个很好的框架,由比我更聪明的人来编写和预先配置。因此当我需要写一个包含并行处理的小型程序时,它是我的第一选择。

它最大的缺点是,你必须预见到它可能产生的并发症。如果对JVM没有整体上的深入了解,这很难做到。这只能来自于经验。

方法4:雇用一个Actor

Actor模型是对我们本文中所探讨的方法的一个奇怪的补充。JDK中没有actor的实现;因此你必须引用一些实现了actor的库。

简短地说,在actor模型中,你把一切都看做是一个actor。一个actor是一个计算实体,就像上面第一个例子中的线程,它可以从其他actor那里接收消息,因为一切都是actor。

在应答消息时,它可以给其他actor发送消息,或者创建新的actor并与之交互,或者只改变自己的内部状态。

相当简单,但这是一个非常强大的概念。生命周期和消息传递由你的框架来管理,你只需要指定计算单元是什么就可以了。另外,actor模型强调避免全局状态,这会带来很多便利。你可以应用监督策略,例如免费重试,更简单的分布式系统设计,错误容忍度等等。

下面是一个使用Akka Actors的例子。Akka Actors有Java接口,是最流行的JVM Actor库之一。实际上,它也有Scala接口,并且是Scala目前默认的actor库。Scala曾经在内部实现了actor。不少JVM语言都实现了actor,比如Fantom。这些说明了Actor模型已经被广泛接受,并被看做是对语言非常有价值的补充。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
static class Message {
 String url;
 Message(String url) {this.url = url;}
}
static class Result {
 String html;
 Result(String html) {this.html = html;}
}
 
static class UrlFetcher extends UntypedActor {
 
 @Override
 public void onReceive(Object message) throws Exception {
   if (message instanceof Message) {
     Message work = (Message) message;
     String result = WS.url(work.url).get();
     getSender().tell(new Result(result), getSelf());
   } else {
     unhandled(message);
   }
 }
}
 
static class Querier extends UntypedActor {
 private String question;
 private List<String> engines;
 private AtomicReference<String> result;
 
 public Querier(String question, List<String> engines, AtomicReference<String> result) {
 
   this.question = question;
   this.engines = engines;
   this.result = result;
 }
 
 @Override public void onReceive(Object message) throws Exception {
   if(message instanceof Result) {
     result.compareAndSet(null, ((Result) message).html);
     getContext().stop(self());
   }
   else {
     for(String base: engines) {
       String url = base + question;
       ActorRef fetcher = this.getContext().actorOf(Props.create(UrlFetcher.class), "fetcher-"+base.hashCode());
       Message m = new Message(url);
       fetcher.tell(m, self());
     }
   }
 }
}
 
private static String getFirstResultActors(String question, List<String> engines) {
 ActorSystem system = ActorSystem.create("Search");
 AtomicReference<String> result = new AtomicReference<>();
 
 final ActorRef q = system.actorOf(
   Props.create((UntypedActorFactory) () -> new Querier(question, engines, result)), "master");
 q.tell(new Object(), ActorRef.noSender());
 
 while(result.get() == null);
 return result.get();
}

Akka actor在内部使用ForkJoin框架来处理工作。这里的代码很冗长。不要担心。大部分代码是消息类Message和Result的定义,然后是两个不同的actor:Querier用来组织所有的搜索引擎,而URLFetcher用来从给定的URL获取结果。这里代码行比较多是因为我不愿意把很多东西写在同一行上。Actor模型的强大之处来自于Props对象的接口,通过接口我们可以为actor定义特定的选择模式,定制的邮箱地址等。结果系统也是可配置的,只包含了很少的活动件。这是一个很好的迹象!

使用Actor模型的一个劣势是,它要求你避免全局状态,因此你必须小心的设计你的应用程序,而这可能会使项目迁移变得很复杂。同时,它也有不少优点,因此学习一些新的范例和使用新的库是完全值得的。

反馈时间:你使用什么?

你最常用的并发方式是什么?你理解它背后的计算模式是什么吗?仅仅使用一个包含Job或者后台任务对象的框架来自动地为你的代码添加异步计算能力?

为了收集更多信息,以找出我是否应该继续更深入地讲解一些不同的并发模式,例如,写一篇关于Akka如何工作,以及它Java接口的优点和缺点,我创建了一个简单的调查。亲爱的读者,请填一下调查表。我非常感谢你的互动!

总结

这篇文章中我们讨论了在Java应用中添加并行的几种不同方法。从我们自己管理Java线程开始,我们逐渐地发现更高级的解决方案,执行不同的executor服务、ForkJoin框架和actor计算模型。

不知道当你面临真实问题时该如何选择?它们都有各自的优缺点,你需要在直观和易用性、配置和增加/减少机器性能等方面做出选择。

原文链接: Oleg Shelajev 翻译: ImportNew.com shenggordon
译文链接: http://www.importnew.com/14506.html
分享到:
评论

相关推荐

    Java并发的四种风味ThreadExecutorFork

    本文将深入探讨Java并发的四种风味:Thread、Executor、ForkJoin以及Actor模型,旨在帮助Java开发者更好地理解和运用这些并发工具。 首先,我们来看最基本的线程(Thread)机制。在Java中,线程是程序执行的基本...

    掌握并发的钥匙:Java Executor框架深度解析

    本文将深入探讨Java的Executor框架,这是一个用于管理和执行并发任务的强大工具。通过理解其核心概念和工作原理,开发者可以更有效地利用多核处理器的优势,提高应用程序的性能和响应能力。 ## Java的主要特点回顾 ...

    java并发编程实战源码,java并发编程实战pdf,Java

    4. **线程池**:Java的Executor框架提供了一种管理线程的方式,通过ThreadPoolExecutor可以创建线程池,有效控制并发线程的数量,避免系统资源的过度消耗。 5. **并发集合**:Java的并发集合类库,如...

    java并发编程:Executor、Executors、ExecutorService.docx

    Java并发编程中的Executor、Executors和ExecutorService是Java并发编程框架的重要组成部分,它们为开发者提供了高效管理和控制线程执行的工具。以下是对这些概念的详细解释: 1. Executor: Executor是一个接口,它...

    java 并发编程的艺术pdf清晰完整版 源码

    4. **线程池**:Java的Executor框架是管理线程的重要工具,书中会详细介绍ThreadPoolExecutor的工作原理,以及如何配置和使用线程池来优化并发性能。 5. **并发设计模式**:书中可能会介绍一些常见的并发设计模式,...

    Java 并发核心编程

    Java的`java.util.concurrent.Executor`框架提供了一种灵活的方式来管理和控制线程池中的任务执行: 1. **ThreadPoolExecutor**: 提供了一个可配置的线程池实现。 2. **ScheduledExecutorService**: 支持定时任务和...

    JAVA并发编程艺术pdf版

    《JAVA并发编程艺术》是Java开发者深入理解和掌握并发编程的一本重要著作,它涵盖了Java并发领域的核心概念和技术。这本书详细阐述了如何在多线程环境下有效地编写高效、可靠的代码,对于提升Java程序员的技能水平...

    Java 并发编程实战.pdf

    - **Java并发机制**:Java提供了丰富的并发机制来支持多线程程序的编写,包括但不限于`Thread`类、`Runnable`接口、`Callable`接口以及`Executor`框架等。 - **锁机制**:Java中的锁机制主要包括synchronized关键字...

    java并发编程实战.zip

    1. **Java并发基础**:介绍Java并发编程的基础知识,包括线程的创建与使用、线程的状态模型(新建、运行、阻塞、等待、终止)以及Java中的Thread类和Runnable接口。 2. **同步机制**:讲解了Java中的基本同步机制,...

    java并发编程书籍

    Java并发编程是软件开发中的一个关键领域,尤其是在大型企业级应用和分布式系统中。通过学习相关的书籍,开发者可以深入理解如何有效地设计和实现高效的多线程应用程序,避免并发问题,如竞态条件、死锁、活锁等。...

    java并发编程内部分享PPT

    Java并发编程是Java开发中的重要领域,特别是在多核处理器和分布式系统中,高效地利用并发可以极大地提升程序的性能和响应速度。这份“java并发编程内部分享PPT”显然是一个深入探讨这一主题的资料,旨在帮助开发者...

    Java并发编程实践(Java Concurrency in Practice) (中英版)

    《Java并发编程实践》是Java开发者深入理解和应用并发编程的权威指南,这本书全面覆盖了Java并发编程的各种核心概念和技术,旨在帮助程序员编写出高效、安全的并发代码。书中的内容既包括理论知识,也包含丰富的实战...

    java并发编程经典书籍(英文版)

    Java并发编程是Java开发者必须掌握的关键技能之一,尤其是在多核处理器和分布式系统广泛使用的今天。以下是对标题和描述中提及的两本经典书籍——《Concurrent Programming in Java》和《Java Concurrency in ...

    一本经典的多线程书籍 Java并发编程 设计原则与模式 第二版 (英文原版)

    此外,还会讲解Java中实现并发的基础设施,如Thread类、Runnable接口以及ExecutorService等。 2. **同步机制**:详细解析了Java中的同步工具,如`synchronized`关键字、volatile变量、java.util.concurrent包中的...

    Java并发编程实战

    5.3 阻塞队列和生产者-消费者模式 5.3.1 示例:桌面搜索 5.3.2 串行线程封闭 5.3.3 双端队列与工作密取 5.4 阻塞方法与中断方法 5.5 同步工具类 5.5.1 闭锁 5.5.2 FutureTask 5.5.3 信号量 5.5.4 栅栏 5.6...

    Java 并发编程实战(高清带目录).zip

    2. **Java并发API**:详细讲解了Java并发库中的关键类和接口,如Thread、Runnable、ExecutorService、Future、Semaphore、CyclicBarrier、CountDownLatch、Exchanger以及Lock(ReentrantLock、ReadWriteLock)等,...

    Java并发编程:设计原则与模式(第二版)-3PDF

    《Java并发编程:设计原则与模式(第二版)》是一本深入探讨Java平台上的多线程和并发编程的权威著作。这本书旨在帮助开发者理解和掌握如何有效地编写可扩展且高效的并发程序。以下是书中涵盖的一些关键知识点: 1....

    java并发书籍xxxxxxx

    8. **Executor框架**:Java 5引入的Executor框架简化了线程池的管理和线程的创建,通过ThreadPoolExecutor可以创建自定义线程池,控制并发程度。 9. **Future和Callable**:Future代表异步计算的结果,Callable接口...

Global site tag (gtag.js) - Google Analytics