`
xiefeifeihu
  • 浏览: 99313 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

Akka2使用探索7——“云计算”示例(Actor、Future、Remoting、Router、Deploy、异步、并发使用Demo)

阅读更多

假设有一个很耗时的运算,单台机器已经没法满足需求,这时你可以想到由多台计算机协作完成。具体怎么做呢。

举个很简单的例子,假设这个耗时的运算是从1加到100000,你现在有两台服务器,可以让这两台服务器分别完成从1加到50000,和从50001加到100000,然后本机完成这两个结果之和。

 

两台服务器分别启动两个akka Server,同时还有一个CalcActor。这个计算actor接收两个参数:Integer start和Integer end,可以从start一直加到end,最后将结果返回给发送者:getSender().tell(result)。

@Log4j
class CalcActor extends UntypedActor {
    @Override
    void onReceive(Object message) {
        log.debug "CalcActor received: ${message}----self:${getSelf()},sender:${getSender()}"
        if (message instanceof String) {
            String[] args = message.split(",")
            int start = Integer.parseInt(args[0])
            int end = Integer.parseInt(args[1])
            int result = 0
            println("start calc:" + start + " upto " + end)
            start.upto(end) {
                result += it
            }
            sleep(5000) //模拟还要额外耗时5秒
            println("result:" + result)
            getSender().tell(result)
        } else {
            unhandled(message)
        }
    }
}
两个服务器分别为:
AkkaServerApp serverA = new AkkaServerApp("sc", "10.68.3.122", 8888, "calc") //AkkaSystemName为sc,ip为10.68.3.122,端口为8888,serviceName为calc。

    AkkaServerApp serverA = new AkkaServerApp("sp", "10.68.3.124", 8888, "calc")//AkkaSystemName为sp,ip为10.68.3.124,端口为8888,serviceName为calc。

主要的代码在客户端:
    public static void main(String[] args) throws Exception {

        final AkkaServerApp app = new AkkaServerApp("xwc", "127.0.0.1", 6666, "client");//客户端akka配置
        ActorRef remoteCalcA1 = app.getSystem().actorOf(new Props(CalcActor.class)..withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcA1");//将CalcActor发布到远程10.68.3.122上
        ActorRef remoteCalcA2 = app.getSystem().actorOf(new Props(CalcActor.class)..withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcA2");//将CalcActor发布到远程10.68.3.124上

        final List<Future<Double>> frs = new ArrayList<Future<Double>>();//异步返回结果Future存放在list中
//tell只请求,是否响应它完全不知道。ask是请求,并明确知道未来会相应。        
//        remoteCalcA.tell("1,10000", app.getServerActor());
//        remoteCalcB.tell("10001,20000", app.getServerActor());
        Future f1 = akka.pattern.Patterns.ask(remoteCalcA1, "1,50000", 150000);//让远程122计算从1加到50000,超时时间为150秒
        Future f2 = akka.pattern.Patterns.ask(remoteCalcA1, "50001,100000", 150000);//并发地让远程124计算从50001加到100000,超时时间150秒
        frs.add(f1);
        frs.add(f2);

        Future<Iterable<Double>> future = Futures.sequence(frs, app.getSystem().dispatcher());将未来返回的结果转换成Future<Iterable<Double>>
        Future<Double> fr = future.map(new Mapper<Iterable<Double>, Double>() {

            @Override
            public Double apply(Iterable<Double> parameter) {
                Double result = 0d;
                for (Double s : parameter) {//计算两个服务器返回的结果
                    result += s;
                }
                return result;
            }

        });

        fr.onSuccess(new OnSuccess<Double>() {
            @Override
            public void onSuccess(Double result) {
                System.out.println("云计算返回结果-----" + result);
            }
        });
    }

 

还可以让服务器并发处理:把给从1加到50000的任务分成5个线程并行处理:1..10000,10001..20000,20001..30000,30001..40000,40001..50000,这样能更好地提高效率。

如果按上面的方法仅仅是发布多个remote actor:

ActorRef remoteCalcAn = app.getSystem().actorOf(new Props(CalcActor.class)..withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcAn");

是没法提高效率的,因为这时的CalcActor是单线程的,它只会先接收1..10000,处理完后再接收10001..20000并处理。。。。。

使其能够并行处理很简单,创建remoteActor时加上withRoute即可:

 

ActorRef remoteCalcAn = app.getSystem().actorOf(new Props(CalcActor.class).withRouter(new RoundRobinRouter(5)).withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcAn");  //RoundRobinRouter的参数5可以理解为分配5个线程并行处理

代码跟上面基本相同
    public static void main(String[] args) throws Exception {

        final AkkaServerApp app = new AkkaServerApp("xwc", "127.0.0.1", 6666, "client");
        ActorRef remoteCalcA1 = app.getSystem().actorOf(new Props(CalcActor.class).withRouter(new RoundRobinRouter(4)).withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcA1");    
        ActorRef remoteCalcB1 = app.getSystem().actorOf(new Props(CalcActor.class).withRouter(new RoundRobinRouter(4)).withDeploy(new Deploy(new RemoteScope(new Address("akka", "sp", "10.68.3.124", 8888)))), "clientCalcB1");
   
        final List<Future<Double>> frs = new ArrayList<Future<Double>>();
        
        Future f1 = akka.pattern.Patterns.ask(remoteCalcA1, "1,10000", 150000);
        Future f2 = akka.pattern.Patterns.ask(remoteCalcA1, "10001,20000", 150000);
        Future f3 = akka.pattern.Patterns.ask(remoteCalcA1, "20001,30000", 150000);
        Future f4 = akka.pattern.Patterns.ask(remoteCalcA1, "30001,40000", 150000);
        Future f5 = akka.pattern.Patterns.ask(remoteCalcB1, "40001,50000", 150000);
        Future f6 = akka.pattern.Patterns.ask(remoteCalcB1, "50001,60000", 150000);
        Future f7 = akka.pattern.Patterns.ask(remoteCalcB1, "60001,70000", 150000);
        Future f8 = akka.pattern.Patterns.ask(remoteCalcB1, "70001,80000", 150000);
        frs.add(f1);
        frs.add(f2);
        frs.add(f3);
        frs.add(f4);
        frs.add(f5);
        frs.add(f6);
        frs.add(f7);
        frs.add(f8);

        Future<Iterable<Double>> future = Futures.sequence(frs, app.getSystem().dispatcher());
        Future<Double> fr = future.map(new Mapper<Iterable<Double>, Double>() {

            @Override
            public Double apply(Iterable<Double> parameter) {
                Double result = 0d;
                for (Double s : parameter) {
                    result += s;
                }
                return result;
            }

        });

        fr.onSuccess(new OnSuccess<Double>() {
            @Override
            public void onSuccess(Double result) {
                System.out.println("云计算返回从1加到80000的结果-----" + result);
            }
        });
    }

 

0
5
分享到:
评论
1 楼 ZZX19880809 2014-12-29  
啥ji巴玩意,copy也不看下,这啥样子了

相关推荐

    用Scala写的akka actor简单demo

    标题中的“用Scala写的akka actor简单demo”指的是一个使用Scala编程语言编写的Akka Actor系统的基本示例。Akka是轻量级、基于actor模型的框架,它用于构建高度并发、分布式和容错的应用程序。Scala是多范式编程语言...

    akka-actor-1.1.2.jar.zip

    尤其值得一提的是其核心组件——Akka Actor,它是实现异步、反应式编程的关键。本文将深入探讨Akka Actor 1.1.2版本,以及如何利用jar包进行项目集成。 Akka Actor系统是基于消息传递的并发模型,它将Actor作为计算...

    响应式架构 消息模式Actor实现与Scala.Akka应用集成

    7. 测试Actor系统:学习如何编写测试用例来验证Actor的正确性,使用如ScalaTest或Akka TestKit等工具。 8. 实战案例:分析实际项目中的示例,如何将Akka应用于Web服务、大数据处理或其他并发场景。 通过以上知识点...

    akka-actor-1.0-RC2.jar.zip

    《Akka Actor库详解——基于akka-actor-1.0-RC2.jar.zip的剖析》 Akka是一个由Lightbend公司维护的开源框架,主要用于构建高度并发、分布式和反应式的应用程序。在Java和Scala平台上,Akka因其强大的性能和易用性而...

    akka-actor_2.11 jar包

    akka-actor_2.11 jar包

    c#分布式框架akka范例

    在**akkadotnet-code-samples-master**这个压缩包中,你可能会找到不同类型的示例代码,如基本Actor的创建和交互,Actor的远程部署,集群配置,以及使用流处理数据等。这些示例将帮助你理解和掌握如何在实际项目中...

    akka-in-action完整示例源代码

    2. **Actor模型**:Actor模型是一种处理并发问题的理论模型,它通过隔离状态和并发操作来避免数据竞争。在Akka中,每个Actor都是独立的,有自己的生命周期和状态,这使得系统更易于理解和调试。 3. **持久化**:...

    java餐饮管理源码-akka:使用gradle搭建akkademo,语言是scala

    Actor是Akka中最核心的概念,它是一个封装了状态和行为的对象,Actor之间可以通过交换消息的方式进行通信,每个Actor都有自己的收件箱(Mailbox)。 通过Actor能够简化锁及线程管理,可以非常容易地开发出正确地并发...

    akka-typed-actor-1.0-RC2.jar.zip

    《Akka Typed Actor详解——基于1.0-RC2.jar版本》 在Java和Scala的世界里,Akka是一个强大的工具库,它提供了构建高度可扩展、反应式应用程序的框架。Akka Typed Actor是Akka框架中的一个重要组件,它是对传统...

    akka2.0使用方法

    2. **Actor**:Actor是一种并发实体,通过消息传递进行通信。每个Actor都有自己的邮箱,用于接收其他Actor发送的消息。创建Actor可以通过`system.actorOf(Props[MyActorClass])`,其中`MyActorClass`是你定义的Actor...

    响应式架构++消息模式Actor实现与Scala.Akka应用集成+,沃恩·弗农+

    2. **消息传递**:讲解Actor之间的通信方式,包括发送消息的语法、异步消息处理以及如何处理消息队列。 3. **状态管理**:讨论Actor如何维护自身的状态,以及如何通过消息来改变状态,确保线程安全。 4. **监督...

    Akka Scala 学习高清原版pdf

    标签“Akka scala 并发 actor”则进一步具体化了文档内容,即深入探讨了Akka中的actor并发模型。 文档内容涉及到以下几个核心知识点: 1. 概述和入门: - Akka是什么?它是一个开源的工具集,用来简化在Java...

    Akka Actor Tutorial代码

    在Scala中使用Akka Actor可以利用其强大的类型系统和并发能力,使得编写高性能的并发程序变得更加简单。 **Actor系统** Akka中的Actor系统是所有Actor的容器和管理器。它负责Actor的生命周期,包括创建、调度、...

    akka-actor_2.12 jar包

    akka-actor_2.12 jar包

    akka实例参考

    Akka 是一个强大的开源工具,主要用于构建高度并发、分布式和反应式的应用程序,主要在Java和Scala语言环境下使用。它的核心设计理念是Actor模型,这使得它能够处理大量的并发操作,同时保持系统的可扩展性和容错性...

    Akka Actor模型开发库 v2.9.1.zip

    Akka Actor模型是一种高效、可扩展的并发编程模型,它源于Scala编程语言并被广泛应用于分布式系统中。Akka库的v2.9.1版本提供了丰富的功能和优化,为开发者构建高度并发、容错和反应式的应用程序提供了一个强大的...

    Akka简介.pptx

    Akka 是一个强大的工具包,尤其在大数据处理和高并发场景下,它的核心特性在于其基于Actor模型的设计。Akka 是由Scala语言构建的,但同时也提供了与Java友好的API,使得Java开发者也能轻松利用其功能。由于Scala是...

    akka java实现tcp远程调用

    Akka 是一个强大的工具包和框架,主要用于构建高度并发、分布式和反应式的应用程序,它基于actor模型。在Java中,Akka可以用来实现TCP远程调用,这使得不同系统之间能够通过网络进行通信。下面我们将深入探讨如何...

Global site tag (gtag.js) - Google Analytics