1. 背景知识
现在的操作系统都是支持多任务的,多任务可通过多进程或多线程的方式去实现,进程和线程的对比就不在这里说了,在多任务的调度上操作系统采取抢占式和协作式两种方式,抢占式是指操作系统给每个任务一定的执行时间片,在到达这个时间片后如任务仍然未释放对CPU的占用,那么操作系统将强制释放,这是目前多数操作系统采取的方式;协作式是指操作系统按照任务的顺序来分配CPU,每个任务执行过程中除非其主动释放,否则将一直占据CPU,这种方式非常值得注意的是一旦有任务占据CPU不放,会导致其他任务”饿死”的现象,因此操作系统确实不太适合采用这种方式。
说完操作系统多任务的调度方式后,来看看通常程序是如何实现支持高并发的,一种就是典型的基于操作系统提供的多进程或多线程机制,每个任务占据一个进程或一个线程,当任务中有IO等待等动作时,则将进程或线程放入待调度队列中,这种方式是目前大多数程序采取的方式,这种方式的坏处在于如想支持高的并发量,就不得不创建很多的进程或线程,而进程和线程都是要消耗不少系统资源的,另外一方面,进程或线程创建太多后,操作系统需要花费很多的时间在进程或线程的切换上,切换动作需要做状态保持和恢复,这也会消耗掉很多的系统资源;另外一种方式则是每个任务不完全占据一个进程或线程,当任务执行过程中需要进行IO等待等动作时,任务则将其所占据的进程或线程释放,以便其他任务使用这个进程或线程,这种方式的好处在于可以减少所需要的原生的进程或线程数,并且由于操作系统不需要做进程或线程的切换,而是自行来实现任务的切换,其成本会较操作系统切换低,这种方式也就是本文的重点,Coroutine方式,又称协程方式,这种方式在目前的大多数语言中都有支持。
各种语言在实现Coroutine方式的支持时,多数都采用了Actor Model来实现,Actor Model简单来说就是每个任务就是一个Actor,Actor之间通过消息传递的方式来进行交互,而不采用共享的方式,Actor可以看做是一个轻量级的进程或线程,通常在一台4G内存的机器上,创建几十万个Actor是毫无问题的,Actor支持Continuations,即对于如下代码:
Actor
act方法
进行一些处理
创建并执行另外一个Actor
通过消息box阻塞获取另一个Actor执行的结果
继续基于这个结果进行一些处理
在支持Continuations的情况下,可以做到消息box阻塞时并不是进程或线程级的阻塞,而只是Actor本身的阻塞,并且在阻塞时可将所占据的进程或线程释放给其他Actor使用,Actor Model实现最典型的就是erLang了。
对于Java应用而言,传统方式下为了支持高并发,由于一个线程只能用于处理一个请求,即使是线程中其实有很多IO中断、锁等待也同样如此,因此通常的做法是通过启动很多的线程来支撑高并发,但当线程过多时,就造成了CPU需要消耗不少的时间在线程的切换上,从而出现瓶颈,按照上面对Coroutine的描述,Coroutine的方式理论上而言能够大幅度的提升Java应用所能支撑的并发量。
2. 在Java中使用Coroutine
Java尚不能从语言层次上支持Coroutine,也许Java 7能够支持,目前已经有了一个测试性质的版本<!--[if !supportFootnotes]-->[1]<!--[endif]-->,在Sun JDK 7尚未正式发布的情况下如希望在Java中使用Coroutine,Scala或Kilim是可以做的选择,来分别看下。
Scala是现在很火的语言之一,Twitter消息中间件基于Scala编写更是让Scala名声鹊起,除了在语法方面所做出的改进外,其中一个最突出的特色就是Scala Actor,Scala Actor是Scala用于实现Coroutine的方式,先来具体看看Scala在Coroutine支持实现的关键概念。
Actor
Scala Actor可以看做是一个轻量级的Java Thread,其使用方式和Java Thread基本也一致,继承Actor,实现act方法,启动时也是调用start方法,但和Java Thread不同的是,Scala Actor可等待外部发送过来的消息,并进行相应的处理。
Actor的消息发送机制
发送消息到Actor的方式有异步、Future两种方式,异步即指发送后立即返回,继续后续流程,使用异步发送的方法为:actor ! MessageObject,其中消息对象可以为任何类型,并且Scala还支持一种称为case Object的对象,便于在收到消息时做pattern matching。
Future方式是指阻塞线程等待消息处理的结果,使用Future方式发送的方法为:actor !! MessageObject,在等待结果方面,Scala支持不限时等待,限时等待以及等待多个Future或个别Future完成,使用方法如下:
val ft=actor !! MessageObject // Future方式发送消息
val result=ft() // 不限时等待
val results=awaitAll(500,ft1,ft2,ft3) // 限时等待多个Future返回值
val results=awaitEither(ft1,ft2) // 等待个别future完成
接收消息方通过reply方法返回Future方式所等待的结果。
Actor的消息接收机制
当代码处于Actor的act方法或Actor环境(例如为Actor的act方法调用过来的代码)中时,可通过以下两种方式来接收外部发送给Actor的消息:一为receive方式,二为react方式,代码例子如下:
receive{
case MessageObject(args) => doHandle(args)
}
react{
case MessageObject(args) => doHandle(args)
}
receive和react的差别在于receive需要阻塞当前Java线程,react则仅为阻塞当前Actor,但并不会阻塞Java线程,因此react模式更适合于充分发挥coroutine带来的原生线程数减少的好处,但react模式有个缺点是react不支持返回。
receive和react都有限时接收的方式,方法为:receiveWithin(timeout)、reactWithin(timeout),超时的消息通过case TIMEOUT的方式来接收。
下面来看基于Scala Actor实现并发处理请求的一个简单例子。
class Processor extends Actor{
def act(){
loop{
react{
case command:String => doHandle(command)
}
}
}
def doHandle(command:String){
// 业务逻辑处理
}
}
当需要并发执行此Processor时,在处理时需要的仅为调用以下代码:
val processor=new Processor()
processor.start
processor ! “Hello”
从以上说明来看,要在旧的应用中使用Scala还是会有一些成本,部署运行则非常简单,在Scala IDE Plugin编写了上面的scala代码后,即生成了java class文件,可直接在jvm中运行。
Kilim是由剑桥的两位博士开发的一个用于在Java中使用Coroutine的框架,Kilim基于Java语法,先来看看Kilim中的关键概念。
Task
可以认为Task就是Actor,使用方式和Java Thread基本相同,只是继承的为Task,覆盖的为execute方法,启动也是调用task的start方法。
Task的消息发送机制
Kilim中通过Mailbox对象来发送消息,Mailbox的基本原则为可以有多个消息发送者,但只能有一个消息接收者,发送的方式有同步发送、异步发送和阻塞线程方式的同步发送三种,同步发送是指保证一定能将消息放入发送队列中,如当前发送队列已满,则等待到可用为止,阻塞的为当前Task;异步发送则是尝试将消息放入发送队列一次,如失败,则返回false,成功则返回true,不会阻塞Task;阻塞线程方式的同步发送是指阻塞当前线程,并保证将消息发送给接收者,三种方式的使用方法如下:
mailbox.put(messageObject); // 同步发送
mailbox.putnb(messageObject); // 异步发送
mailbox.putb(messageObject); // 阻塞线程方式发送
Task的消息接收机制
Kilim中通过Mailbox来接收消息,接收消息的方式有同步接收、异步接收以及阻塞线程方式的同步接收三种,同步接收是指阻塞当前Task,直到接收到消息才返回;异步接收是指立刻返回Mailbox中的消息,有就返回,没有则返回null;阻塞线程方式的同步接收是指阻塞当前线程,直到接收到消息才返回,使用方法如下:
mailbox.get(); // 同步接收,传入long参数表示等待的超时时间,单位为毫秒
mailbox.getnb(); // 异步接收,立刻返回
mailbox.getb(); // 阻塞线程方式接收
下面来看基于Kilim实现并发处理请求的一个简单例子。
public class Processor extends Task{
private String command;
public Processor(String command){
this.command=command;
}
public void execute() throws Pausable,Exception{
// 业务逻辑处理
}
}
在处理时,仅需调用以下代码:
Task processor=new Processor(command);
processor.start();
从以上代码来看,Kilim对于Java人员而言学习门槛更低,但对于需要采用coroutine方式执行的代码在编译完毕后,还需要采用Kilim的kilim.tools.Weaver类来对这些已编译出来的class文件做织入,运行时需要用织入后生成的class文件才行,织入的方法为:java kilim.tools.Weaver –d [织入后生成的class文件存放的目录] [需要织入的类文件所在的目录],目前尚没有Kilim IDE Plugin可用,因此weaver这个过程还是比较的麻烦。
上面对Scala和Kilim做了一个简单的介绍,在实际Java应用中使用Coroutine时,通常会出现以下几种典型的更复杂的使用场景,由于Actor模式本身就是异步的,因此其天然对异步场景支持的就非常好,更多的问题会出现在以下几个同步场景上,分别来看看基于Scala、Kilim如何来实现。
Actor同步调用
Actor同步调用是经常会出现的使用场景,主要为Actor发送消息给其他的Actor处理,并等待结果才能继续。
n Scala
对于这种情况,在Scala 2.7.7中,目前可采取的为以下两种方法:
一种为通过Future方式发送消息来实现:
class Processor(command:String) extends Actor{
def act(){
val actor=new NetSenderActor()
val ft=actor !! command
println(ft())
}
}
class NetSenderActor extends Actor{
def act(){
case command:String => {
reply(“received command:”+command)
}
}
}
第二种为通过receive的方式来实现:
class Processor(command:String) extends Actor{
def act(){
val actor=new NetSenderActor()
actor ! command
var senderResult=””
receive{
case result:String => {
senderResult=result
}
}
println(senderResult)
}
}
class NetSenderActor extends Actor{
def act(){
case command:String => {
sender ! (“received command:”+command)
}
}
}
但这两种方式其实都不好,因为这两种方式都会造成当前Actor的线程阻塞,这也是因为目前Scala版本对continuations尚不支持的原因,Scala 2.8版本将提供continuations的支持,希望到时能有不需要阻塞Actor线程实现上述需求的方法。
还有一种常见的场景是Actor调一段普通的Scala类,然后那个类中进行了一些处理,并调用了其他Actor,此时在该类中如需要等待Actor的返回结果,也可使用上面两种方法。
n Kilim
在Kilim中要实现Task之间的同步调用非常简单,代码如下:
public class TaskA extends Task{
public void execute() throws Pausable,Exception{
Mailbox<Object> result=new Mailbox<Object>();
Task task=new TaskB(result);
task.start();
Object resultObject=result.get();
System.out.println(resultObject);
}
}
public class TaskB extends Task{
private Mailbox<Object> result;
public TaskB(Mailbox<Object> result){
this.result=result;
}
public void execute() throws Pausable,Exception{
result.put(“result from TaskB”);
}
}
Kilim的Mailbox.get并不会阻塞线程,因此这种方式是完全满足需求的。
普通Java代码同步调用Actor
由于已有的应用是普通的Java代码,经常会出现这样的场景,就是希望实现在这些Java代码中同步的调用Actor,并等待Actor的返回结果,但由于Scala和Kilim都强调首先必须是在Actor或Task的环境下才行,因此此场景更佳的方式应为Scala Actor(Kilim Task) à Java Code à Scala Actor(Kilim Task),这种场景在对已有的应用中会是最常出现的,来看看在Scala和Kilim中如何应对这样的需求。
n Scala
目前Scala中如希望在Java Code中调用Scala Actor,并等待其返回结果,暂时还没办法,做法只能改为从Java Code中去调一个Scala的Object,然后在这个Object中调用Actor,并借助上面提到的receive或future的方法来获取返回值,最后将这个返回值返回Java Code。
n Kilim
目前Kilim中如希望实现上面的需求,其实非常简单,只需要在Java Code的方法上加上Throw Pausable,然后通过mailbox.get来等待Kilim Task返回的结果即可,在Kilim中只要调用栈上的每个方法都有Throw Pausable,就可在这些方法上做等待返回这类的同步操作。
从上面这两个最常见的需求来看,无疑Kilim更符合需求,但要注意的是对于Kilim而言,如果出现Task call nonpausable method call pausable method这样的状况时,pausable method中如果想执行阻塞当前Task的操作,是无法做到的,只能改造成Task (在mailbox上做等待,并传递mailbox给后续步骤) call nonpausable method (传递mailbox) call pausable method (将逻辑转为放入一个Task中,并将返回值放入传递过来的mailbox),这种状况在面对spring aop、反射调用等现象时就会出现了,目前kilim 0.6的版本尚未提供更透明的使用方法,不过据kilim作者提供的一个试用版本,其中已经有了对于反射调用的透明化的支持,暂时在目前只能采用上述方法,迁移成本相对较大,也许以后的kilim版本会考虑这样的场景,提供相应的方法来降低迁移的成本。
3. 性能、所能支撑的并发量对比
在对Scala、Kilim有了这些了解后,来具体看看采用Scala、Kilim后与传统Java方式在性能、所能支撑的并发量上的对比。
测试模型
采用一个比较简单的模型进行测试,具体为有4个线程,这4个线程分别接收到了一定数量的请求,每个请求需要交给另外一个线程去执行,这个线程所做的动作为循环10次获取另外一个线程的执行结果,此执行线程所做的动作为循环1000次拼接一个字符串,然后返回。
实现代码
由于目前Scala版本对Continuation支持不够好,但上面的场景中又有此类需求,所以导致Scala版本的代码写的比较麻烦一些。
实现代码以及可运行的环境请从此处下载:http://www.bluedavy.com/open/benchmark.zip到此为止,基本上对Java中使用Coroutine的相关知识做了一个介绍,总结而言,采用Coroutine方式可以很好的绕开需要启动太多线程来支撑高并发出现的瓶颈,提高Java应用所能支撑的并发量,但在开发模式上也会带来变化,并且需要特别注意不能造成线程被阻塞的现象,从开发易用和透明迁移现有Java应用两个角度而言目前Coroutine方式还有很多不足,但相信随着越来越多的人在Java中使用Coroutine,其易用性必然是能够得到提升的。
相关推荐
总结起来,尽管Java自身尚未在语言层面内置协程支持,但通过使用像Scala和Kilim这样的第三方工具,开发者可以在Java环境中实现类似协程的功能,以提升并发性能。随着技术的发展,未来的Java版本可能会考虑引入对协程...
此外,协程中的通信通常通过通道(Channel)或其他高级并发构造实现,避免了竞态条件和死锁的问题。 源码分析是理解这两种机制的关键。对于线程,我们可以查看`Thread`类的源码,了解其启动、运行和同步的实现原理...
在实际应用中,我们可以通过以下步骤使用Java协程: 1. 创建协程上下文,比如基于`Executors.newFixedThreadPool`的线程池。 2. 使用协程启动器启动协程,传入协程体。 3. 在协程体中,编写`suspend`函数并调用`...
Quasar 使用 Java 的注解处理器和字节码操纵技术,使得协程能够在 Java 平台上无缝运行。通过使用 Quasar,开发者可以创建 Fiber,这是一种用户态的轻量级线程,它能够在无需操作系统支持的情况下进行上下文切换。 ...
3. **Suspension Points**:在协程中,你可以定义挂起点,让协程暂停执行,直到被恢复。 4. **Channels**:类似于线程间的管道,用于协程间的数据通信。 在实际应用中,Amareya Java 协程可以帮助优化并发编程,...
Java项目:在Android中使用Model-View-Intent(MVI)架构模式下对Kotlin协程的性能分析 概述:本文通过对Kotlin协程的性能分析,比较了在Android中使用Model-View-Intent(MVI)架构模式下Kotlin协程和RxJava ...
在Java中使用Akka,你需要导入相关的依赖库,并创建actor系统。ActorSystem是Akka的核心,它负责管理所有的actor。你可以通过创建一个ActorSystem实例来启动你的actor系统: ```java import akka.actor.ActorSystem...
协程提供了优雅的异常处理方式,如果协程中抛出异常,它会被捕获并在协程作用域的上下文中处理。例如: ```kotlin scope.launch { try { val result = async(Dispatchers.IO) { performNetworkRequest() }.await...
在Java中实现协程通常需要借助字节码增强技术,因为Java原生并不支持协程。 本示例"基于Java字节码增强技术实现的协程Demo"可能包含以下几个关键知识点: 1. **字节码理解**:Java字节码是JVM(Java虚拟机)能够...
协程Coroutine和Kilim是两个与并发编程相关的概念,主要在Java开发环境中被讨论。在现代软件系统中,多线程和并发处理是提升性能和响应能力的关键技术。协程提供了一种轻量级的并发模型,它比传统的线程更高效,内存...
然而,`runBlocking`不应该在协程中使用,因为它的设计初衷是为了在常规代码块中阻塞线程以等待协程完成。 协程的取消是合作性的,这意味着它允许计算代码在需要的时候进行取消。为了使资源能够正确关闭,我们通常...
在Kotlin中,我们可以使用Kotlin的协程来异步加载网络数据,避免阻塞UI线程。 2. 使用Retrofit:定义一个接口,该接口包含需要的HTTP请求方法,如GET、POST等,并使用注解指定URL和参数。Retrofit会自动生成实现这...
在Java中,可以这样使用: ```java KotlinClass kotlinObj = new KotlinClass(10); int incrementedValue = kotlinObj.increment(); System.out.println(incrementedValue); ``` Java调用Kotlin的函数时,无需额外...
雅各布提供Java协程的轻量级库用法添加对jacob-core的依赖< dependency> < groupId>com.guujiang</ groupId> < artifactId>jacob-core</ artifactId> < version>0.0.1</ version></ dependency>写一个包含一个...
4. **错误处理**:集成异常处理机制,确保在协程中发生错误时能够正确地捕获和处理。 5. **并发控制**:提供同步原语,如互斥锁、信号量等,用于控制并发访问资源。 6. **性能优化**:尽可能减少线程创建和销毁的...
java重量级源码Kotlin 协程等待 OkHttp3 的扩展 这是一个小型库,提供await()扩展以与 Kotlin 协程集成 以实施为基础。 需要 Kotlin 1.3+ 取决于所以不需要更新到需要 Java 8+ 或 Android 5+ 用法 // Create OkHttp ...
10. 从代码片段中可以看出,libco库支持使用C++、Java、golang等多种编程语言的接口,显示出其跨语言的兼容性。 11. libco协程库的使用涉及到栈(stack)和环境(环境)的管理,这是实现协程机制不可或缺的部分。 ...
在实际的转换过程中,工具可能会遇到挑战,比如处理Java的异常处理机制(try-catch-finally)到Python的异常处理(try-except-finally),或者将Java的多线程代码转换为Python的并发模型(如线程、进程或协程)。...
这里的`suspend`关键字表示这个函数将在协程中运行,可以被挂起。 然后,我们创建一个`NetworkRepository`,它是业务逻辑层,负责处理网络请求。在这个类中,我们可以使用协程来发起网络请求,并使用`try-catch`...
协程是一种轻量级的线程,它在执行过程中可以暂停和恢复,相比传统的线程,协程具有更少的上下文切换开销,更适合于I/O密集型任务。以下将详细解析阿里JDK协程架构的关键设计与实现原理。 1. **协程基础概念**:...