假设有一个很耗时的运算,单台机器已经没法满足需求,这时你可以想到由多台计算机协作完成。具体怎么做呢。
举个很简单的例子,假设这个耗时的运算是从1加到100000,你现在有两台服务器,可以让这两台服务器分别完成从1加到50000,和从50001加到100000,然后本机完成这两个结果之和。
两台服务器分别启动两个akka Server,同时还有一个CalcActor。这个计算actor接收两个参数:Integer start和Integer end,可以从start一直加到end,最后将结果返回给发送者:getSender().tell(result)。
@Log4j class CalcActor extends UntypedActor { @Override void onReceive(Object message) { log.debug "CalcActor received: ${message}----self:${getSelf()},sender:${getSender()}" if (message instanceof String) { String[] args = message.split(",") int start = Integer.parseInt(args[0]) int end = Integer.parseInt(args[1]) double result = 0d println("start calc:" + start + " upto " + end) start.upto(end) { result += it } sleep(5000) //模拟还要额外耗时5秒 println("result:" + result) getSender().tell(result) } else { unhandled(message) } } } 两个服务器分别为:
AkkaServerApp serverA = new AkkaServerApp("sc", "10.68.3.122", 8888, "calc") //AkkaSystemName为sc,ip为10.68.3.122,端口为8888,serviceName为calc。 AkkaServerApp serverA = new AkkaServerApp("sp", "10.68.3.124", 8888, "calc")//AkkaSystemName为sp,ip为10.68.3.124,端口为8888,serviceName为calc。
主要的代码在客户端:
public static void main(String[] args) throws Exception { final AkkaServerApp app = new AkkaServerApp("xwc", "127.0.0.1", 6666, "client");//客户端akka配置 ActorRef remoteCalcA1 = app.getSystem().actorOf(new Props(CalcActor.class)..withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcA1");//将CalcActor发布到远程10.68.3.122上 ActorRef remoteCalcA2 = app.getSystem().actorOf(new Props(CalcActor.class)..withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcA2");//将CalcActor发布到远程10.68.3.124上 final List<Future<Double>> frs = new ArrayList<Future<Double>>();//异步返回结果Future存放在list中 //tell只请求,是否响应它完全不知道。ask是请求,并明确知道未来会相应。 // remoteCalcA.tell("1,10000", app.getServerActor()); // remoteCalcB.tell("10001,20000", app.getServerActor()); Future f1 = akka.pattern.Patterns.ask(remoteCalcA1, "1,50000", 150000);//让远程122计算从1加到50000,超时时间为150秒 Future f2 = akka.pattern.Patterns.ask(remoteCalcA1, "50001,100000", 150000);//并发地让远程124计算从50001加到100000,超时时间150秒 frs.add(f1); frs.add(f2); Future<Iterable<Double>> future = Futures.sequence(frs, app.getSystem().dispatcher());将未来返回的结果转换成Future<Iterable<Double>> Future<Double> fr = future.map(new Mapper<Iterable<Double>, Double>() { @Override public Double apply(Iterable<Double> parameter) { Double result = 0d; for (Double s : parameter) {//计算两个服务器返回的结果 result += s; } return result; } }); fr.onSuccess(new OnSuccess<Double>() { @Override public void onSuccess(Double result) { System.out.println("云计算返回结果-----" + result); } }); }
还可以让服务器并发处理:把给从1加到50000的任务分成5个线程并行处理:1..10000,10001..20000,20001..30000,30001..40000,40001..50000,这样能更好地提高效率。
如果按上面的方法仅仅是发布多个remote actor:
ActorRef remoteCalcAn = app.getSystem().actorOf(new Props(CalcActor.class)..withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcAn");
是没法提高效率的,因为这时的CalcActor是单线程的,它只会先接收1..10000,处理完后再接收10001..20000并处理。。。。。
使其能够并行处理很简单,创建remoteActor时加上withRoute即可:
ActorRef remoteCalcAn = app.getSystem().actorOf(new Props(CalcActor.class).withRouter(new RoundRobinRouter(5)).withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcAn"); //RoundRobinRouter的参数5可以理解为分配5个线程并行处理
代码跟上面基本相同
public static void main(String[] args) throws Exception { final AkkaServerApp app = new AkkaServerApp("xwc", "127.0.0.1", 6666, "client"); ActorRef remoteCalcA1 = app.getSystem().actorOf(new Props(CalcActor.class).withRouter(new RoundRobinRouter(4)).withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcA1"); ActorRef remoteCalcB1 = app.getSystem().actorOf(new Props(CalcActor.class).withRouter(new RoundRobinRouter(4)).withDeploy(new Deploy(new RemoteScope(new Address("akka", "sp", "10.68.3.124", 8888)))), "clientCalcB1"); final List<Future<Double>> frs = new ArrayList<Future<Double>>(); Future f1 = akka.pattern.Patterns.ask(remoteCalcA1, "1,10000", 150000); Future f2 = akka.pattern.Patterns.ask(remoteCalcA1, "10001,20000", 150000); Future f3 = akka.pattern.Patterns.ask(remoteCalcA1, "20001,30000", 150000); Future f4 = akka.pattern.Patterns.ask(remoteCalcA1, "30001,40000", 150000); Future f5 = akka.pattern.Patterns.ask(remoteCalcB1, "40001,50000", 150000); Future f6 = akka.pattern.Patterns.ask(remoteCalcB1, "50001,60000", 150000); Future f7 = akka.pattern.Patterns.ask(remoteCalcB1, "60001,70000", 150000); Future f8 = akka.pattern.Patterns.ask(remoteCalcB1, "70001,80000", 150000); frs.add(f1); frs.add(f2); frs.add(f3); frs.add(f4); frs.add(f5); frs.add(f6); frs.add(f7); frs.add(f8); Future<Iterable<Double>> future = Futures.sequence(frs, app.getSystem().dispatcher()); Future<Double> fr = future.map(new Mapper<Iterable<Double>, Double>() { @Override public Double apply(Iterable<Double> parameter) { Double result = 0d; for (Double s : parameter) { result += s; } return result; } }); fr.onSuccess(new OnSuccess<Double>() { @Override public void onSuccess(Double result) { System.out.println("云计算返回从1加到80000的结果-----" + result); } }); } http://m.oschina.net/blog/81118
相关推荐
Akka是一个用于构建分布式、高并发、容错性应用程序的开源库和运行时环境,它建立在Actor模型上。在2014年版本的《Akka实战》一书中,作者深入讲解了Akka的核心概念、设计思想以及如何利用Akka构建实际的应用程序。 ...
2. **Akka C2**(Component Management for Akka):Akka 是一个用 Scala 编写的开源框架,用于构建高度并行、分布式和反应式的应用程序。C2(Component Container)是 Akka 的一个扩展,它提供了更高级别的组件管理...
Akka is a distributed computing toolkit that enables developers to build correct concurrent and distributed applications using Java and Scala with ease, applications that scale across servers and ...
2. **消息传递**:Actor之间的通信是通过发送和接收消息进行的,这些消息可以是任何.NET对象。Akka.NET提供了`IActorRef`接口,代表了一个Actor的引用,用于发送消息。使用`Tell`方法非阻塞地发送消息,而`Ask`方法...
2. Akka ActorSystem:创建和配置ActorSystem以支持远程调用。 3. 配置文件:编写`application.conf`以启用远程部署和指定TCP设置。 4. 服务端和客户端Actor:定义Actor类,实现消息处理逻辑。 5. 远程Actor引用:在...
2. **Actor**:Actor是Akka的基本执行单元,它通过消息传递与外界通信,每个Actor都有自己的邮箱来接收消息,保证了线程安全。 3. **消息传递**:Actor之间通过异步消息传递进行通信,这种模式避免了共享状态,降低...
Akka in Action shows you how to build message-oriented systems with Akka. This comprehensive, hands-on tutorial introduces each concept with a working example. You’ll start with the big picture of ...
Akka 2.10-2.314 版本是针对Scala 2.10.x平台的特定版本,它包含了该框架从2.10到2.3.14的改进和更新。 在Akka 2.3.14中,有几个关键的知识点值得深入探讨: 1. **Actor系统**:Akka的核心是Actor模型,它是一种...
2. Akka是为Java程序员提供的并发解决方案,兼容其他JVM语言。 3. Akka提供了大量易用的API,可以帮助开发者更高效地编写代码。 在【部分内容】中,提到了Akka文档的结构,包括: - 介绍了Akka的基本概念和教程。 ...
2. Streams:Akka Streams是一个处理数据流的API,支持背压,可以处理高吞吐量的实时数据流。 3. Persistence:允许Actor保存其状态并在故障后恢复,实现持久化。 4. Cluster:用于构建分布式应用,支持节点间的自动...
akka-kryo-serialization, 基于Kryo的Akka序列化 akka-kryo-serialization-- Scala 和Akka基于kryo的序列化程序这个库为 Scala 和Akka提供定制的基于kryo的序列化程序。 它可以用于更高效的akka远程处理。它还可以...
### Akka 学习入门实践知识点详解 #### 一、Akka 概述 - **定义**:Akka 是一个用于构建高度并发、分布式、容错性应用的工具包,适用于 Java 和 Scala 开发者。它基于 Actor 模型,支持响应式编程范式。 - **目标**...
### 2. 分布式数据(Distributed Data) Akka.NET 的分布式数据模块(Distributed Data)提供了一种在分布式环境中安全地共享和同步数据的方式。这个模块基于 CRDT(Conflict-free Replicated Data Types)算法,...
Akka是一个用Scala和Java编写的开源工具包和运行时,用于构建并发、分布式和容错的事件驱动应用程序。Akka基于Actor模型,这是一种并发模型,可以在多核CPU系统上有效地进行扩展。在给出的文件中,提到了Akka V2.3.6...
2. **Actor系统**:Akka的核心是Actor系统,它是一个包含多个相互协作的Actor的容器。每个Actor都有自己的状态和行为,并通过异步消息与其他Actor通信,降低了系统间的依赖。 3. **Actor模型**:Actor模型是一种...
第 2 章 Actor 与并发:响应式编程。Actor 与 Future 的使用。 第 3 章 传递消息:消息传递模式。 第 4 章 Actor 的生命周期—处理状态与错误:Actor 生命周期、监督机制、Stash/ Unstash、Become/Unbecome 以及有限...
Akka之所以使用Scala语言编写,是因为Scala的强大特性和其与JVM的无缝集成,使得Akka能够充分利用JVM的优势并发挥出Scala的函数式编程和面向对象编程的双重优势。同时,Akka也提供了对Java的全面支持,开发者可以...
#### 2. Akka的核心概念:Actor模型 - **Actor模型**:在Akka中,Actor是最基本的并发单元。每个Actor都独立运行并且通过消息传递与其他Actor进行通信。 - **Actor系统(Actor System)**:Actor系统的概念类似于...