`
m635674608
  • 浏览: 5043615 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

Akka2

    博客分类:
  • java
 
阅读更多

假设有一个很耗时的运算,单台机器已经没法满足需求,这时你可以想到由多台计算机协作完成。具体怎么做呢。

举个很简单的例子,假设这个耗时的运算是从1加到100000,你现在有两台服务器,可以让这两台服务器分别完成从1加到50000,和从50001加到100000,然后本机完成这两个结果之和。

 

两台服务器分别启动两个akka Server,同时还有一个CalcActor。这个计算actor接收两个参数:Integer start和Integer end,可以从start一直加到end,最后将结果返回给发送者:getSender().tell(result)。

@Log4j
class CalcActor extends UntypedActor {
    @Override
    void onReceive(Object message) {
        log.debug "CalcActor received: ${message}----self:${getSelf()},sender:${getSender()}"
        if (message instanceof String) {
            String[] args = message.split(",")
            int start = Integer.parseInt(args[0])
            int end = Integer.parseInt(args[1])
            double result = 0d
            println("start calc:" + start + " upto " + end)
            start.upto(end) {
                result += it
            }
            sleep(5000) //模拟还要额外耗时5秒
            println("result:" + result)
            getSender().tell(result)
        } else {
            unhandled(message)
        }
    }
}
两个服务器分别为:
AkkaServerApp serverA = new AkkaServerApp("sc", "10.68.3.122", 8888, "calc") //AkkaSystemName为sc,ip为10.68.3.122,端口为8888,serviceName为calc。

    AkkaServerApp serverA = new AkkaServerApp("sp", "10.68.3.124", 8888, "calc")//AkkaSystemName为sp,ip为10.68.3.124,端口为8888,serviceName为calc。

主要的代码在客户端:
    public static void main(String[] args) throws Exception {

        final AkkaServerApp app = new AkkaServerApp("xwc", "127.0.0.1", 6666, "client");//客户端akka配置
        ActorRef remoteCalcA1 = app.getSystem().actorOf(new Props(CalcActor.class)..withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcA1");//将CalcActor发布到远程10.68.3.122上
        ActorRef remoteCalcA2 = app.getSystem().actorOf(new Props(CalcActor.class)..withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcA2");//将CalcActor发布到远程10.68.3.124上

        final List<Future<Double>> frs = new ArrayList<Future<Double>>();//异步返回结果Future存放在list中
//tell只请求,是否响应它完全不知道。ask是请求,并明确知道未来会相应。        
//        remoteCalcA.tell("1,10000", app.getServerActor());
//        remoteCalcB.tell("10001,20000", app.getServerActor());
        Future f1 = akka.pattern.Patterns.ask(remoteCalcA1, "1,50000", 150000);//让远程122计算从1加到50000,超时时间为150秒
        Future f2 = akka.pattern.Patterns.ask(remoteCalcA1, "50001,100000", 150000);//并发地让远程124计算从50001加到100000,超时时间150秒
        frs.add(f1);
        frs.add(f2);

        Future<Iterable<Double>> future = Futures.sequence(frs, app.getSystem().dispatcher());将未来返回的结果转换成Future<Iterable<Double>>
        Future<Double> fr = future.map(new Mapper<Iterable<Double>, Double>() {

            @Override
            public Double apply(Iterable<Double> parameter) {
                Double result = 0d;
                for (Double s : parameter) {//计算两个服务器返回的结果
                    result += s;
                }
                return result;
            }

        });

        fr.onSuccess(new OnSuccess<Double>() {
            @Override
            public void onSuccess(Double result) {
                System.out.println("云计算返回结果-----" + result);
            }
        });
    }

 

还可以让服务器并发处理:把给从1加到50000的任务分成5个线程并行处理:1..10000,10001..20000,20001..30000,30001..40000,40001..50000,这样能更好地提高效率。

如果按上面的方法仅仅是发布多个remote actor:

ActorRef remoteCalcAn = app.getSystem().actorOf(new Props(CalcActor.class)..withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcAn");

是没法提高效率的,因为这时的CalcActor是单线程的,它只会先接收1..10000,处理完后再接收10001..20000并处理。。。。。

使其能够并行处理很简单,创建remoteActor时加上withRoute即可:

ActorRef remoteCalcAn = app.getSystem().actorOf(new Props(CalcActor.class).withRouter(new RoundRobinRouter(5)).withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcAn");  //RoundRobinRouter的参数5可以理解为分配5个线程并行处理


代码跟上面基本相同 

    public static void main(String[] args) throws Exception {

        final AkkaServerApp app = new AkkaServerApp("xwc", "127.0.0.1", 6666, "client");
        ActorRef remoteCalcA1 = app.getSystem().actorOf(new Props(CalcActor.class).withRouter(new RoundRobinRouter(4)).withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcA1");    
        ActorRef remoteCalcB1 = app.getSystem().actorOf(new Props(CalcActor.class).withRouter(new RoundRobinRouter(4)).withDeploy(new Deploy(new RemoteScope(new Address("akka", "sp", "10.68.3.124", 8888)))), "clientCalcB1");
   
        final List<Future<Double>> frs = new ArrayList<Future<Double>>();
        
        Future f1 = akka.pattern.Patterns.ask(remoteCalcA1, "1,10000", 150000);
        Future f2 = akka.pattern.Patterns.ask(remoteCalcA1, "10001,20000", 150000);
        Future f3 = akka.pattern.Patterns.ask(remoteCalcA1, "20001,30000", 150000);
        Future f4 = akka.pattern.Patterns.ask(remoteCalcA1, "30001,40000", 150000);
        Future f5 = akka.pattern.Patterns.ask(remoteCalcB1, "40001,50000", 150000);
        Future f6 = akka.pattern.Patterns.ask(remoteCalcB1, "50001,60000", 150000);
        Future f7 = akka.pattern.Patterns.ask(remoteCalcB1, "60001,70000", 150000);
        Future f8 = akka.pattern.Patterns.ask(remoteCalcB1, "70001,80000", 150000);
        frs.add(f1);
        frs.add(f2);
        frs.add(f3);
        frs.add(f4);
        frs.add(f5);
        frs.add(f6);
        frs.add(f7);
        frs.add(f8);

        Future<Iterable<Double>> future = Futures.sequence(frs, app.getSystem().dispatcher());
        Future<Double> fr = future.map(new Mapper<Iterable<Double>, Double>() {

            @Override
            public Double apply(Iterable<Double> parameter) {
                Double result = 0d;
                for (Double s : parameter) {
                    result += s;
                }
                return result;
            }

        });

        fr.onSuccess(new OnSuccess<Double>() {
            @Override
            public void onSuccess(Double result) {
                System.out.println("云计算返回从1加到80000的结果-----" + result);
            }
        });
    }


http://m.oschina.net/blog/81118
分享到:
评论

相关推荐

    Akka 实战 akka in action v13 2014版本

    Akka是一个用于构建分布式、高并发、容错性应用程序的开源库和运行时环境,它建立在Actor模型上。在2014年版本的《Akka实战》一书中,作者深入讲解了Akka的核心概念、设计思想以及如何利用Akka构建实际的应用程序。 ...

    FIABTurn​​table:使用Guice + Akka C2库(C2Akka)的FIABTurn​​table实现

    2. **Akka C2**(Component Management for Akka):Akka 是一个用 Scala 编写的开源框架,用于构建高度并行、分布式和反应式的应用程序。C2(Component Container)是 Akka 的一个扩展,它提供了更高级别的组件管理...

    Learning Akka(PACKT,2015)

    Akka is a distributed computing toolkit that enables developers to build correct concurrent and distributed applications using Java and Scala with ease, applications that scale across servers and ...

    c#分布式框架akka范例

    2. **消息传递**:Actor之间的通信是通过发送和接收消息进行的,这些消息可以是任何.NET对象。Akka.NET提供了`IActorRef`接口,代表了一个Actor的引用,用于发送消息。使用`Tell`方法非阻塞地发送消息,而`Ask`方法...

    akka java实现tcp远程调用

    2. Akka ActorSystem:创建和配置ActorSystem以支持远程调用。 3. 配置文件:编写`application.conf`以启用远程部署和指定TCP设置。 4. 服务端和客户端Actor:定义Actor类,实现消息处理逻辑。 5. 远程Actor引用:在...

    akka实例参考

    2. **Actor**:Actor是Akka的基本执行单元,它通过消息传递与外界通信,每个Actor都有自己的邮箱来接收消息,保证了线程安全。 3. **消息传递**:Actor之间通过异步消息传递进行通信,这种模式避免了共享状态,降低...

    Akka.in.Action.2016.9.pdf

    Akka in Action shows you how to build message-oriented systems with Akka. This comprehensive, hands-on tutorial introduces each concept with a working example. You’ll start with the big picture of ...

    akka_2.10-2.314

    Akka 2.10-2.314 版本是针对Scala 2.10.x平台的特定版本,它包含了该框架从2.10到2.3.14的改进和更新。 在Akka 2.3.14中,有几个关键的知识点值得深入探讨: 1. **Actor系统**:Akka的核心是Actor模型,它是一种...

    Akka开发库文档

    2. Akka是为Java程序员提供的并发解决方案,兼容其他JVM语言。 3. Akka提供了大量易用的API,可以帮助开发者更高效地编写代码。 在【部分内容】中,提到了Akka文档的结构,包括: - 介绍了Akka的基本概念和教程。 ...

    akka_2.12-2.4.18.zip

    2. Streams:Akka Streams是一个处理数据流的API,支持背压,可以处理高吞吐量的实时数据流。 3. Persistence:允许Actor保存其状态并在故障后恢复,实现持久化。 4. Cluster:用于构建分布式应用,支持节点间的自动...

    akka-kryo-serialization, 基于Kryo的Akka序列化.zip

    akka-kryo-serialization, 基于Kryo的Akka序列化 akka-kryo-serialization-- Scala 和Akka基于kryo的序列化程序这个库为 Scala 和Akka提供定制的基于kryo的序列化程序。 它可以用于更高效的akka远程处理。它还可以...

    akka学习入门实践

    ### Akka 学习入门实践知识点详解 #### 一、Akka 概述 - **定义**:Akka 是一个用于构建高度并发、分布式、容错性应用的工具包,适用于 Java 和 Scala 开发者。它基于 Actor 模型,支持响应式编程范式。 - **目标**...

    Akka.net分布式数据传输

    ### 2. 分布式数据(Distributed Data) Akka.NET 的分布式数据模块(Distributed Data)提供了一种在分布式环境中安全地共享和同步数据的方式。这个模块基于 CRDT(Conflict-free Replicated Data Types)算法,...

    AkkaJava PDF Doc

    Akka是一个用Scala和Java编写的开源工具包和运行时,用于构建并发、分布式和容错的事件驱动应用程序。Akka基于Actor模型,这是一种并发模型,可以在多核CPU系统上有效地进行扩展。在给出的文件中,提到了Akka V2.3.6...

    Learning Akka

    2. **Actor系统**:Akka的核心是Actor系统,它是一个包含多个相互协作的Actor的容器。每个Actor都有自己的状态和行为,并通过异步消息与其他Actor通信,降低了系统间的依赖。 3. **Actor模型**:Actor模型是一种...

    Akka 基础学习pdf中文文档

    第 2 章 Actor 与并发:响应式编程。Actor 与 Future 的使用。 第 3 章 传递消息:消息传递模式。 第 4 章 Actor 的生命周期—处理状态与错误:Actor 生命周期、监督机制、Stash/ Unstash、Become/Unbecome 以及有限...

    akka框架,应用于scala

    Akka之所以使用Scala语言编写,是因为Scala的强大特性和其与JVM的无缝集成,使得Akka能够充分利用JVM的优势并发挥出Scala的函数式编程和面向对象编程的双重优势。同时,Akka也提供了对Java的全面支持,开发者可以...

    scala akka

    #### 2. Akka的核心概念:Actor模型 - **Actor模型**:在Akka中,Actor是最基本的并发单元。每个Actor都独立运行并且通过消息传递与其他Actor进行通信。 - **Actor系统(Actor System)**:Actor系统的概念类似于...

Global site tag (gtag.js) - Google Analytics