应用场景:服务端要处理大量的客户端的请求,并且处理请求耗费较长的时间。这时就需要使用并发处理。多线程是一种方法,这里使用Akka框架处理并发。(以下代码在Groovy1.7.5、akka-actors-1.2下运行成功)
这里有三个角色:Client、Master、Worker
Client傻乎乎地发同步请求给Master,一直等到结果返回客户端才离开。
Master接收客户端发来的请求,然后将请求交给Worker处理,处理完成之后将结果返回给Client。
Worker负责具体的业务处理,它耗费的事件比较长。
所以这里的关键在于Master,如果Master线性地“接收请求——调用Worker处理得到返回结果——将结果返回”,这样的系统必将歇菜。
使用Akka可以方便地将它变成并行地。
先看看Client,模拟同时多个客户端给Master发请求
import akka.actor.ActorRef
import static akka.actor.Actors.remote
/**
* User: 谢炜
* Date: 11-10-4
* Time: 下午7:36
*/
class HelloClient implements Runnable {
int seq
String serviceName
HelloClient(int seq, String serviceName) {
this.seq = seq
this.serviceName = serviceName
}
void run() {
ActorRef actor = remote().actorFor(serviceName, "10.68.15.113", 9999);
String str = "Hello--" + seq
println "请求-----${str}"
Object res = actor.sendRequestReply(str)
println "返回-----${res}"
}
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(new HelloClient(i, "hello-service"))
thread.start() //同时启动5个客户端请求Master
}
}
}
真正干活的Worker:
import akka.actor.UntypedActor
/**
* User: 谢炜
* Date: 11-10-5
* Time: 上午9:27
*/
class HelloWorker extends UntypedActor { //Worker是一个Actor,需要实现onReceive方法
@Override
void onReceive(Object o) {
println "Worker 收到消息----" + o
if (o instanceof String) {
String result = doWork(o) //调用真实的处理方法
getContext().replyUnsafe(result)//将结果返回给Master
}
}
//Worker处理其实很简单,仅仅将参数字符串改造一下而已。只不过使其sleep了20秒,让它变得“耗时较长”
String doWork(String str) {
Thread.sleep(1000 * 20)
return "result----" + str + " 。"
}
}
负责并发调度的Master:
import akka.actor.ActorRef
import akka.actor.Actors
import akka.actor.UntypedActor
import akka.actor.UntypedActorFactory
import akka.dispatch.Future
import akka.dispatch.Futures
import java.util.concurrent.Callable
/**
* User: 谢炜
* Date: 11-10-5
* Time: 上午9:35
*/
class HelloMaster extends UntypedActor {
@Override
void onReceive(Object o) {
println "Master接收到Work消息:" + o
def clientChannel = getContext().channel() //客户端链接Channel
//启动worker actor
ActorRef worker = Actors.actorOf(new UntypedActorFactory() {
public UntypedActor create() {
return new HelloWorker();
}
}).start();
//这里实现真正的并发
Future f1 = Futures.future(new Callable() {
Object call() {
def result = worker.sendRequestReply(o) //将消息发给worker actor,让Worker处理业务,同时得到返回结果
worker.stop()
println "Worker Return----" + result
clientChannel.sendOneWay(result) //将结果返回给客户端
return result
}
})
println "Future call over"
}
public static void main(String[] args) { //启动Master进程,绑定IP、端口和服务
Actors.remote().start("10.68.15.113", 9999).register(
"hello-service",
Actors.actorOf(HelloMaster.class));
}
}
看看客户端的调用日志
请求-----Hello--4
请求-----Hello--1
请求-----Hello--3
请求-----Hello--0
请求-----Hello--2
[GENERIC] [11-10-6 下午9:49] [RemoteClientConnected(akka.remote.netty.NettyRemoteSupport@195b6aad,/10.68.15.113:9999)]
[GENERIC] [11-10-6 下午9:49] [RemoteClientStarted(akka.remote.netty.NettyRemoteSupport@195b6aad,/10.68.15.113:9999)]
返回-----result----Hello--0 。
返回-----result----Hello--1 。
返回-----result----Hello--2 。
返回-----result----Hello--4 。
返回-----result----Hello--3 。
服务端的日志:
[GENERIC] [11-10-6 下午9:49] [RemoteServerClientConnected(akka.remote.netty.NettyRemoteSupport@5a4fdf11,Some(/10.68.15.113:53462))]
Master接收到Work消息:Hello--1
Future call over
Master接收到Work消息:Hello--2
Future call over
Worker 收到消息----Hello--1
[DEBUG] [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-13] [HelloWorker] started
[DEBUG] [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-13] [HelloWorker] started
Worker 收到消息----Hello--2
Master接收到Work消息:Hello--0
Future call over
Master接收到Work消息:Hello--3
Worker 收到消息----Hello--0
[DEBUG] [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-7] [HelloWorker] started
Future call over
Master接收到Work消息:Hello--4
[DEBUG] [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-7] [HelloWorker] started
Worker 收到消息----Hello--3
Future call over
Worker 收到消息----Hello--4
[DEBUG] [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-7] [HelloWorker] started
Worker 将消息Hello--1处理完成
Worker 将消息Hello--2处理完成
Worker Return----result----Hello--2 。
Worker Return----result----Hello--1 。
[DEBUG] [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-13] [HelloWorker] stopping
Worker 将消息Hello--0处理完成
[DEBUG] [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-3] [HelloWorker] stopping
Worker Return----result----Hello--0 。
[DEBUG] [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-23] [HelloWorker] stopping
Worker 将消息Hello--4处理完成
Worker 将消息Hello--3处理完成
Worker Return----result----Hello--4 。
Worker Return----result----Hello--3 。
[DEBUG] [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-11] [HelloWorker] stopping
[DEBUG] [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-10] [HelloWorker] stopping
可以从服务端日志看到,Master接收到Work消息后onReceive就结束了(函数最后打印Future call over),一连接收了5个消息,然后Worker才收到消息并处理。最后消息处理完成好后f1的call才收到Worker Return的消息。
这里使用Future实现并发。
如果不使用Future:
def result = worker.sendRequestReply(o) //将消息发给worker actor
println "Worker Return----" + result
getContext().replyUnsafe(result) // 将worker返回的消息回复给客户端
这就成了同步处理(第一个消息处理完后才接收并处理第二个消息)。
如果在Future后调用了f1.await()或f1.get(),也成同步的了,因为await将等待worker返回后再继续往下执行。
Future f1 = Futures.future(new Callable() {
Object call() {
def result = worker.sendRequestReply(o) //将消息发给worker actor
worker.stop()
println "Worker Return----" + result
clientChannel.sendOneWay(result)
return result
}
})
println "Future call over" + f1.get()
服务器日志如下:
[GENERIC] [11-10-6 下午10:06] [RemoteServerStarted(akka.remote.netty.NettyRemoteSupport@7e566633)]
[DEBUG] [11-10-6 下午10:06] [main] [HelloMaster] started
[GENERIC] [11-10-6 下午10:07] [RemoteServerClientConnected(akka.remote.netty.NettyRemoteSupport@7e566633,Some(/10.68.15.113:53571))]
Master接收到Work消息:Hello--0
[DEBUG] [11-10-6 下午10:07] [akka:event-driven:dispatcher:global-3] [HelloWorker] started
Worker 收到消息----Hello--0
Worker 将消息Hello--0处理完成
[DEBUG] [11-10-6 下午10:07] [akka:event-driven:dispatcher:global-5] [HelloWorker] stopping
Worker Return----result----Hello--0 。
Future call overresult----Hello--0 。
Master接收到Work消息:Hello--2
Worker 收到消息----Hello--2
[DEBUG] [11-10-6 下午10:07] [akka:event-driven:dispatcher:global-3] [HelloWorker] started
Worker 将消息Hello--2处理完成
Worker Return----result----Hello--2 。
Future call overresult----Hello--2 。
Master接收到Work消息:Hello--3
[DEBUG] [11-10-6 下午10:07] [akka:event-driven:dispatcher:global-10] [HelloWorker] stopping
[DEBUG] [11-10-6 下午10:07] [akka:event-driven:dispatcher:global-3] [HelloWorker] started
Worker 收到消息----Hello--3
Worker 将消息Hello--3处理完成
Worker Return----result----Hello--3 。
Future call overresult----Hello--3 。
Master接收到Work消息:Hello--4
[DEBUG] [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-14] [HelloWorker] stopping
[DEBUG] [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-3] [HelloWorker] started
Worker 收到消息----Hello--4
Worker 将消息Hello--4处理完成
Worker Return----result----Hello--4 。
Future call overresult----Hello--4 。
Master接收到Work消息:Hello--1
[DEBUG] [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-18] [HelloWorker] stopping
[DEBUG] [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-3] [HelloWorker] started
Worker 收到消息----Hello--1
Worker 将消息Hello--1处理完成
Worker Return----result----Hello--1 。
Future call overresult----Hello--1 。
[DEBUG] [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-21] [HelloWorker] stopping
Master接收到Work消息:Hello--6
[DEBUG] [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-24] [HelloWorker] started
Worker 收到消息----Hello--6
Worker 将消息Hello--6处理完成
Worker Return----result----Hello--6 。
Future call overresult----Hello--6 。
Master接收到Work消息:Hello--5
[DEBUG] [11-10-6 下午10:09] [akka:event-driven:dispatcher:global-26] [HelloWorker] stopping
Worker 收到消息----Hello--5
[DEBUG] [11-10-6 下午10:09] [akka:event-driven:dispatcher:global-24] [HelloWorker] started
需要注意的是,Akka默认使用环境变量%AKKA_HOME%\config\akka.conf配置,默认配置是client的read-timeout = 10(客户端连接10秒后将自动断开,这时服务端再给客户端发消息就发布了了。报RemoteServerWriteFailed异常),可以将值设为0,将一直连着不断开。
actor的timeout默认为5秒,也太短了,延长(不能设为0,0为总是超时).
分享到:
相关推荐
Akka 是一个强大的开源工具包和运行时,用于构建高度并发、...通过理解和熟练使用上述概念,你可以有效地利用Akka构建复杂的分布式应用程序。在实际项目中,还需要阅读官方文档、参与社区讨论和实践来深化理解和应用。
12. **Stream和Flow**:Akka Stream提供了一种处理数据流的方式,它可以高效地处理大量数据,同时保持系统资源的利用率。 在"akka"这个压缩包文件中,很可能包含了这些概念的示例代码,通过阅读和运行这些代码,你...
JActors则是专门针对Java设计的Actor库,它实现了Actor模型,帮助开发者以更简单的方式编写并发程序。 IBM作为一个全球知名的科技公司,也可能在他们的产品或服务中采用了Actor模型。例如,他们可能在分布式计算、...
- **分布式键值存储**:通过 Akka 构建分布式键值存储系统,展示了如何使用 Actor 来管理数据和实现容错机制。 - **文章解析服务**:构建了一个文章解析服务,演示了如何利用 Akka 的并发和分布式特性来提高处理效率...
1. **Actor系统**:Akka的核心是Actor模型,它是一种处理并发和分布式计算的抽象方式。每个Actor都是一个独立的执行单元,有自己的邮箱来接收消息,并且只允许单线程访问,确保了线程安全。Actor系统是这些Actor的...
Akka 提供了一种简单而强大的方式来处理这些问题,它允许开发者利用 Actor 模型来构建高并发、分布式的系统。通过 Akka,开发者可以更容易地构建出能够自动恢复故障、适应负载变化的系统。 **1.3 如何开始使用 Akka...
通过Actor模型,Akka简化了并发和异步编程的复杂性,并提供了一种优雅的方式来构建分布式的、容错的应用系统。 #### 1.2 Actors与Actor系统 Actor模型是Akka的核心概念之一。Actor是一种封装了状态和行为的对象,...
Actor 与 Future 的使用。 第 3 章 传递消息:消息传递模式。 第 4 章 Actor 的生命周期—处理状态与错误:Actor 生命周期、监督机制、Stash/ Unstash、Become/Unbecome 以及有限自动机。 第 5 章 纵向扩展:并发...
异步处理在Akka中至关重要,它通过Future和Promise对象实现,允许代码在不等待结果的情况下继续执行,提高了系统的吞吐量。代码示例将展示如何使用这些工具进行异步编程,并处理回调和组合异步操作。 Akka还提供了...
Actor 与 Future 的使用。 第 3 章 传递消息:消息传递模式。 第 4 章 Actor 的生命周期—处理状态与错误:Actor 生命周期、监督机制、Stash/ Unstash、Become/Unbecome 以及有限自动机。 第 5 章 纵向扩展:并发...
1. **Actor模型**:Akka的核心是Actor模型,这是一种处理并发和分布式计算的抽象方式。每个Actor都是一个独立的实体,有自己的状态和行为,通过消息传递与其他Actor交互。这种模型有助于减少共享状态,从而降低同步...
本 系列 中以前的文章介绍了如何通过以下方式实现并发性: 并行地在多个数据集上执行...您将了解如何使用 actor 模型的 Akka 实现。(Akka 是一个构建并发和分布式 JVM 应用程序的工具包和运行时。)请参阅 参考资
这部分会讲解Akka中用于异步编程的工具,如Future和Agent,它们是处理并发任务和状态共享的有力工具。 5.1网络集群规范(Networking Cluster Specification) 集群规范定义了如何建立和管理分布式Akka集群,以及...
Akka是基于Actor模型的库,提供了强大的并发和分布式处理能力。它包括Actor、Stream、HTTP服务器等功能,广泛应用于构建高可用、高伸缩性的应用。 学习Scala并发编程,除了阅读指定的《Scala编程》或《Scala编程...
- **Future和Actor配合使用**:结合Future和Actor处理异步消息。 - **直接使用Future**:直接利用Future处理异步任务。 - **Future连接方法**:组合多个Future。 - **Future和for配合使用**:利用for表达式处理...
总结起来,Akka的无状态未来和Actor模型是实现高效并发和异步处理的强大工具,而de.flapdoodle.embed.mongo则为我们提供了一个便捷的本地MongoDB运行环境。结合这两个技术,开发者可以构建出高性能、可扩展的Java和...
了解和掌握Scala的Actor,对于理解和使用Spark等大数据处理框架至关重要,因为Spark内部大量使用了Akka Actor来实现高效的并行计算。通过学习和实践,可以提高编写高效、可维护的并发程序的技能。
描述部分简单地提到这是一个幻灯片形式的学习资料,主要关注 Scala 中的并发抽象,并且提到了 `concurrent.Future[T]`、Akka 的 `actor` 和 RxScala 的 `Observable`。此外,还提示可以通过特定渠道访问这些内容并...
2. **异步API**: 扩展提供了丰富的异步操作接口,如get、set、remove等,这些操作都以Future返回,使得开发者可以在不阻塞主线程的情况下,优雅地处理数据存取任务。 3. **文档序列化**: 支持自动将Java或Scala对象...