`
xiefeifeihu
  • 浏览: 99800 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

Akka2使用探索4(Actors)

阅读更多

  • Actor模型为编写并发和分布式系统提供了一种更高的抽象级别。它将开发人员从显式地处理锁和线程管理的工作中解脱出来,使编写并发和并行式系统更加容易。

    Akka Actor的API与Scala Actor类似,并且从Erlang中借用了一些语法。

     

    Actor类的定义

    定义一个Actor类需要继承UntypedActor,并实现onReceive方法。

     

    Props

    Props是一个用来在创建actor时指定选项的配置类。 以下是使用如何创建Props实例的示例.

    Props props1 = new Props();
    Props props2 = new Props(MyUntypedActor.class);
    Props props3 = new Props(new UntypedActorFactory() {
    public UntypedActor create() {
    return new MyUntypedActor();
    }
    });
    Props props4 = props1.withCreator(new UntypedActorFactory() {
    public UntypedActor create() {
    return new MyUntypedActor();
    }
    });

     

    使用Props创建Actor

    Actor可以通过将 Props 实例传入 actorOf 工厂方法来创建。

    ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"), "myactor");

     

    使用默认构造函数创建Actors

    import akka.actor.ActorRef;
    import akka.actor.ActorSystem;
    import akka.actor.Props;
        ActorSystem system = ActorSystem.create("MySystem");
        ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class), "myactor");

    actorOf返回ActorRef实例,它是你创建的UntypedActor实例的句柄,可用它与实际的Actor交互。ActorRef是不可变的,它也是可序列化的,可用于网络传输,在远程主机上它仍然代表原节点上的同一个Actor。

    也可以通过actor的context上下文来创建,它被创建他的actor监管,系统创建的actor将成为顶级actor。

    public class FirstUntypedActor extends UntypedActor {
    ActorRef myActor = getContext().actorOf(new Props(MyActor.class), "myactor");

    name 参数是可选的, 但建议你为你的actor起一个合适的名字,因为它将在日志信息中被用于标识各个actor. 名字不可以为空,也不能以以$开头。如果给定的名字已经被赋给了同一个父actor的其它子actor,将会抛出InvalidActorNameException

    Actor 在创建后将自动异步地启动。当你创建UntypedActor时它会自动调用preStart回调方法,你可以重载preStart方法,加入初始化代码。

    注意:

    使用system.actorOf创建顶级actor是个阻塞操作,有可能发生死锁。避免方法就是不要在actors或futures内部使用默认的dispatcher调用system.actorOf方法。

     

    使用非缺省构造方法创建Actor

    如果UntypedActor的构造方法有参数,就不能用actorOf(new Props(clazz))创建了。需要使用new Props(new UntypedActorFactory() {..})创建,例子如下:

    // allows passing in arguments to the MyActor constructor
    ActorRef myActor = system.actorOf(new Props(new UntypedActorFactory() {
    public UntypedActor create() {
    return new MyActor("...");
    }
    }), "myactor");

     

    UntypedActor API

    UntypedActor只定义了一个抽象方法,就是上面提到的onReceive(Objectmessage), 用来实现actor的行为。

    如果当前actor的行为与收到的消息不匹配,则会调用unhandled方法, 它的缺省实现是向actor系统的事件流中发布一条 akka.actor.UnhandledMessage(message, sender, recipient)

    另外,它还包括:

  • getSelf()代表本actor的 ActorRef
  • getSender()代表最近收到的消息的发送actor,通常用于下面将讲到的回应消息中
  • supervisorStrategy()用户可重写它来定义对子actor的监管策略
  • getContext()暴露actor和当前消息的上下文信息,如:
  • 用于创建子actor的工厂方法 (actorOf)
  • actor所属的系统
  • 父监管者
  • 所监管的子actor
  • 生命周期监控
  • hotswap行为栈

    其余的可见方法是可以被用户重写的生命周期hook。

    public void preStart() { }

    public void preRestart(Throwable reason, Option message) { for (ActorRef each : getContext().getChildren()) getContext().stop(each); postStop(); }

    public void postRestart(Throwable reason) {
    preStart();
    }
    public void postStop() {
    }

     

    使用DeathWatch进行生命周期监控

    为了能在其它actor结束时(永久终止, 而不是临时的失败和重启)收到通知, actor可以将自己注册为其它actor在终止时所发布的 Terminated 消息的接收者. 这个服务是由actor系统的 DeathWatch 组件提供的。

    public static class WatchActor extends UntypedActor {
    final ActorRef child = this.getContext().actorOf(Props.empty(), "child");
    {
    this.getContext().watch(child); // <-- this is the only call needed for registration
    }
    ActorRef lastSender = getContext().system().deadLetters();
    @Override
    public void onReceive(Object message) {
    if (message.equals("kill")) {
    getContext().stop(child);
    lastSender = getSender();
    } else if (message instanceof Terminated) {
    final Terminated t = (Terminated) message;
    if (t.getActor() == child) {
    lastSender.tell("finished");
    }
    } else {
    unhandled(message);
    }
    }
    }

    要注意 Terminated 消息的产生与注册和终止行为所发生的顺序无关。多次注册并不表示会有多个消息产生,也不保证有且只有一个这样的消息被接收到:如果被监控的actor已经生成了消息并且已经进入了队列, 在这个消息被处理之前又发生了另一次注册,则会有第二个消息进入队列,因为一个已经终止的actor注册监控器会立刻导致Terminated 消息的发生。

    可以使用 context.unwatch(target)来停止对另一个actor的生存状态的监控, 但很明显这不能保证不会接收到Terminated 消息因为该消息可能已经进入了队列。

     

    启动 Hook

    actor启动后,它的 preStart 会被立即执行。

    重启 Hook

    所有的Actor都是被监管的, i.e. 以某种失败处理策略与另一个actor链接在一起。 如果在处理一个消息的时候抛出的异常,Actor将被重启。这个重启过程包括上面提到的Hook:

  • 要被重启的actor的 preRestart 被调用,携带着导致重启的异常以及触发异常的消息; 如果重启并不是因为消息的处理而发生的,所携带的消息为 None , 例如,当一个监管者没有处理某个异常继而被它自己的监管者重启时。 这个方法是用来完成清理、准备移交给新的actor实例的最佳位置。 它的缺省实现是终止所有的子actor并调用 postStop.
  • 最初 actorOf 调用的工厂方法将被用来创建新的实例。
  • 新的actor的 postRestart 方法被调用,携带着导致重启的异常信息。 By default the preStart is called, just as in the normal start-up case.

    actor的重启会替换掉原来的actor对象; 重启不影响邮箱的内容, 所以对消息的处理将在 postRestart hook 返回后继续. 触发异常的消息不会被重新接收。在actor重启过程中所有发送到该actor的消息将象平常一样被放进邮箱队列中。

    终止 Hook

    一个Actor终止后,它的 postStop hook将被调用, 这可以用来取消该actor在其它服务中的注册. 这个hook保证在该actor的消息队列被禁止后才运行, i.e. 之后发给该actor的消息将被重定向到 ActorSystemdeadLetters 中。

     

    标识 Actor

    每个actor拥有一个唯一的逻辑路径, 此路径是由从actor系统的根开始的父子链构成;它还拥有一个物理路径,如果监管链包含有远程监管者,此路径可能会与逻辑路径不同。这些路径用来在系统中查找actor,例如,当收到一个远程消息时查找收件者, 但是它们的更直接的用处在于:actor可以通过指定绝对或相对路径(逻辑的或物理的)来查找其它的actor并随结果获取一个 ActorRef

    
    
  • context.actorFor("/user/serviceA/aggregator") // 查找绝对路径
  • context.actorFor("../joe") // 查找同一父监管者下的兄弟
  • 其中指定的路径被解释为一个 java.net.URI, 它以 / 分隔成路径段. 如果路径以 /开始, 表示一个绝对路径,从根监管者 ( "/user"的父亲)开始查找; 否则是从当前actor开始。如果某一个路径段为 .., 会找到当前所遍历到的actor的上一级, 否则则会向下一级寻找具有该名字的子actor。 必须注意的是 actor路径中的.. 总是表示逻辑结构,也就是其监管者。

    如果要查找的路径不存在,会返回一个特殊的actor引用,它的行为与actor系统的死信队列类似,但是保留其身份(i.e. 查找路径)。

    如果开启了远程调用,则远程actor地址也可以被查找。:

    
    
  • context.actorFor("akka://app@otherhost:1234/user/serviceB")
  • 这些查找动作立即返回一个(可能是远程的)actor引用, 所以你必须向它发送一个消息并等待其响应,来确认serviceB 是真正可访问和运行的。

     

    发送消息

    向actor发送消息是使用下列方法之一。

  • tell 意思是“fire-and-forget”, e.g. 异步发送一个消息并立即返回。这是发送消息的推荐方式。 不会阻塞地等待消息。它拥有最好的并发性和可扩展性。
  • ask 异步发送一条消息并返回一个 Future代表一个可能的回应。需要采用Future的处理模式。

    每一个消息发送者分别保证自己的消息的次序. try {
    String result = operation();
    getSender().tell(result);
    } catch (Exception e) {
    getSender().tell(new akka.actor.Status.Failure(e));
    throw e;
    }

     

    ask使用方式如下:

        List<Future<Object>> futures = []
            AkkaClientNoReply client = new AkkaClientNoReply("akka://xw@127.0.0.1:8888/user/server")
            client.send("hello")
            0.upto(15) {
                futures << akka.pattern.Patterns.ask(client.akkaClient, it, 1000 * 60)
            //模拟客户端给服务端发0——15消息,服务器处理(把数值+1返回给客户端)
            }
    
            final Future<Iterable<Object>> aggregate = Futures.sequence(futures, client.system.dispatcher());
            final Future<Integer> transformed = aggregate.map(new Mapper<Iterable<Object>, Integer>() {
                public Integer apply(Iterable<Object> coll) {
                    final Iterator<Object> it = coll.iterator();
                    int count = 0;
                    while (it.hasNext()) {
                        int x = (Integer) it.next();
                        count = count + x
                    }
                    return new Integer(count);
                }
            });
    
            AkkaServerApp app = new AkkaServerApp("resultHandler", "127.0.0.1", 6666, "result")
            app.messageProcessor = {msg, UntypedActorContext context ->
                log.info("1到16之和为" + msg)
            }
            app.startup()
    
            akka.pattern.Patterns.pipe(transformed).to(app.serverActor)

    如果服务端处理消息时发生了异常而导致没有给客户端回应,那么客户端收到的结果将会收到Timeout的Failure:Failure(akka.pattern.AskTimeoutException: Timed out)。可以将异常捕获用Failure封装异常发给客户端:actor.tell(new akka.actor.Status.Failure(e))。

     

    Future的onComplete, onResult, 或 onTimeout 方法可以用来注册一个回调,以便在Future完成时得到通知。从而提供一种避免阻塞的方法。

    警告

    在使用future回调如 onComplete, onSuccess, and onFailure时, 在actor内部你要小心避免捕捉该actor的引用, i.e. 不要在回调中调用该actor的方法或访问其可变状态。这会破坏actor的封装,会引用同步bug和race condition, 因为回调会与此actor一同被并发调度。 不幸的是目前还没有一种编译时的方法能够探测到这种非法访问。

     

    转发消息

    你可以将消息从一个actor转发给另一个。虽然经过了一个‘中转’,但最初的发送者地址/引用将保持不变。当实现功能类似路由器、负载均衡器、备份等的actor时会很有用。

    myActor.forward(message, getContext());

     

    回应消息

    getSender().tell(replyMsg)

    如果没有sender (不是从actor发送的消息或者没有future上下文) 那么 sender 缺省为“dead-letter” actor的引用.

     

    初始化接收消息超时

    设置receiveTimeout 属性并声明一个处理 ReceiveTimeout 对象的匹配分支。

    public class MyReceivedTimeoutUntypedActor extends UntypedActor {
    public MyReceivedTimeoutUntypedActor() {
    getContext().setReceiveTimeout(Duration.parse("30 seconds"));
    }
    public void onReceive(Object message) {
    if (message.equals("Hello")) {
    getSender().tell("Hello world");
    } else if (message == Actors.receiveTimeout()) {
    throw new RuntimeException("received timeout");
    } else {
    unhandled(message);
    }
    }
    }

     

    终止Actor

    通过调用ActorRefFactory i.e. ActorContextActorSystemstop 方法来终止一个actor , 通常 context 用来终止子actor,而 system 用来终止顶级actor. 实际的终止操作是异步执行的, i.e. stop 可能在actor被终止之前返回。

    如果当前有正在处理的消息,对该消息的处理将在actor被终止之前完成,但是邮箱中的后续消息将不会被处理。缺省情况下这些消息会被送到 ActorSystem死信, 但是这取决于邮箱的实现。

    actor的终止分两步: 第一步actor将停止对邮箱的处理,向所有子actor发送终止命令,然后处理来自子actor的终止消息直到所有的子actor都完成终止, 最后终止自己 (调用 postStop, 销毁邮箱, 向 DeathWatch 发布 Terminated, 通知其监管者). 这个过程保证actor系统中的子树以一种有序的方式终止, 将终止命令传播到叶子结点并收集它们回送的确认消息给被终止的监管者。如果其中某个actor没有响应 (i.e. 由于处理消息用了太长时间以至于没有收到终止命令), 整个过程将会被阻塞。

    ActorSystem.shutdown被调用时, 系统根监管actor会被终止,以上的过程将保证整个系统的正确终止。

    postStop hook 是在actor被完全终止以后调用。

     

    PoisonPill

    你也可以向actor发送 akka.actor.PoisonPill 消息, 这个消息处理完成后actor会被终止。 PoisonPill 与普通消息一样被放进队列,因此会在已经入队列的其它消息之后被执行。

     

    优雅地终止

    如果你想等待终止过程的结束,或者组合若干actor的终止次序,可以使用gracefulStop:

    try {

    Future<Boolean> stopped = akka.pattern.Patterns.gracefulStop(actorRef, Duration.create(5, TimeUnit.SECONDS), system);

    Await.result(stopped, Duration.create(6, TimeUnit.SECONDS));

    // the actor has been stopped

    } catch (ActorTimeoutException e) {

    // the actor wasn't stopped within 5 seconds

    }

     

    热拔插 Become/Unbecome

    升级 Upgrade

    Akka支持在运行时对Actor消息循环 (e.g. 的实现)进行实时替换: 在actor中调用 context.become 方法。

    Become 要求一个 akka.japi.Procedure 参数作为新的消息处理实现。 被替换的代码被存在一个栈中,可以被push和pop。

    降级

    由于被热替换掉的代码存在栈中,你也可以对代码进行降级,只需要在actor中调用 context.unbecome 方法。

     

    Killing actor

    发送Kill消息给actor

     

    Actor 与 异常

    在消息被actor处理的过程中可能会抛出异常,例如数据库异常。

    消息会怎样

    如果消息处理过程中(即从邮箱中取出并交给receive后)发生了异常,这个消息将被丢失。必须明白它不会被放回到邮箱中。所以如果你希望重试对消息的处理,你需要自己抓住异常然后在异常处理流程中重试. 请确保你限制重试的次数,因为你不会希望系统产生活锁 (从而消耗大量CPU而于事无补)。

    邮箱会怎样

    如果消息处理过程中发生异常,邮箱没有任何变化。如果actor被重启,邮箱会被保留。邮箱中的所有消息不会丢失。

    actor会怎样

    如果抛出了异常,actor实例将被丢弃而生成一个新的实例。这个新的实例会被该actor的引用所引用(所以这个过程对开发人员来说是不可见的)。注意这意味着如果你不在preRestart 回调中进行保存,并在postRestart回调中恢复,那么失败的actor实例的当前状态会被丢失。 

     

  • 分享到:
    评论

    相关推荐

      akka-in-action完整源代码

      - **分布式数据**:探索Akka分布式数据模块(Distributed Data),了解如何在多节点间共享和同步数据。 5. 高级话题: - **CQRS和Event Sourcing**:结合Akka实现命令查询责任分离(CQRS)和事件溯源(Event ...

      akka型分布式状态博客:Lightbend博客文章的同伴回购-如何使用Akka Cluster分配应用程序状态

      总的来说,通过使用Akka Cluster、Akka Persistence、Akka Typed Actors以及Cluster Sharding,开发人员可以构建出能够在大规模分布式环境中高效运行、容错性强的应用程序。同时,结合现代云原生技术如Docker和...

      akka型团队:与Akka Typed,Akka Cluster Sharding和Cassandra结合在一起的未打磨宠物项目

      在这个未完成的项目中,开发者可能已经探索了如何创建一个Akka Typed Actor系统,并在集群中使用Cluster Sharding策略分布actor。同时,他们可能已经实现了与Cassandra的数据交互,包括数据的存取、查询以及事务管理...

      FauxBanque:Akka.net自学

      FauxBanque 使用 C# 作为主要开发语言,这使得 .NET 开发人员可以充分利用其强大的类型系统和语法特性来构建复杂的 Akka.NET 应用。 3. **Actors 的创建与交互** 在 FauxBanque 项目中,开发者会学习如何定义演员...

      akka-stream-introduction:我的演讲“ Akka Stream简介”的源代码

      4. **Type Safety**: Akka Stream使用Scala的类型系统确保数据转换的正确性,编译时就能发现潜在的问题。 5. **Integration**: Akka Stream与Akka其他模块(如Actors和HTTP)紧密集成,使得构建完整的分布式系统变...

      macwire-akka-activator:使用 MacWire 和 Akka Activator 进行无框架依赖注入

      然后,在启动Akka系统时,我们可以使用`MacwireInjector`来创建ActorSystem和Actors,就像这样: ```scala val injector = newInjector(new MyModule) val system = ActorSystem("mySystem", injector) val myActor...

      akka-intro-hands-on-slides

      2. **轻量级 Actors**:Akka 中的 Actors 是轻量级的,它们运行在自己的执行上下文中,有效地利用了硬件资源。每个 Actor 都有其独立的堆内存,减少了竞态条件的可能性。 3. **消息传递**:Actors 之间的通信是异步...

      bakka:使用 akka 管理并发的游乐场

      #Banking with akka 在银行示例中探索不同并发选择的游乐场##Compiling 您可以通过运行activator ui从浏览器访问此代码。 您还可以使用 IntelliJ 社区版 + Scala 插件打开项目。 要在 Eclipse Scala IDE 中打开,请...

      scala例子 实例

      5. Actors模型:Scala内置了Akka框架,其Actors模型为并发和分布式计算提供了简单且强大的解决方案。 6. 强大的集合库:Scala的集合库提供了丰富的数据结构,如List、Set、Map,以及高效的转换操作,如map、filter...

      tlug-akka-workshop:我为Teh-lug(Linux用户组-Tehran)社区提供了一个简单的代码,其中介绍了“ Actor模型”

      《Akka Actor模型在Teh-lug工作坊中的探索》 在编程领域,尤其是并发处理和分布式计算中,Actor模型已经成为了重要的理论基础。本文将深入探讨在Teh-lug(Linux用户组-Tehran)社区中进行的一次工作坊,主题是Akka...

      scala 2.13.8 安装包。。。。。。。。

      5. ** Actors模型**:Scala集成了Akka库,其中的Actors模型为构建并发和分布式应用提供了强大的工具。Actors通过消息传递来通信,避免了共享状态的复杂性。 6. **集合库**:Scala的集合库是一大亮点,它提供了一...

      scala-2.11.8.tar.gz

      5. ** Actors模型**:Scala内建对Akka框架的支持,其中Actors模型提供了一种处理并发和分布式计算的方式。Actors通过消息传递来通信,确保线程安全。 6. ** REPL(Read-Eval-Print Loop)**:Scala提供了交互式的...

      scala-2.12.7.zip

      5. ** Actors模型**:Scala内置对Akka Actors的支持,这是一种用于构建并发和分布式系统的模型,通过消息传递来确保线程安全。 6. **集合库**:Scala的集合库是其强大功能的一部分,提供了丰富的数据结构,如List、...

      scala-2.10.4

      5. ** Actors模型**:Scala内置了Akka框架,其中Actors模型提供了轻量级的线程和消息传递机制,非常适合构建并发和分布式应用。 6. **集合库**:Scala的集合库是其强大功能的一部分,提供了丰富的操作和转换,如map...

      scala0000000000

      4. ** Actors模型**:Scala内置对Akka框架的支持,Akka是一个用于构建并发和分布式系统的库,它基于Actors模型,使得异步编程更加简单和高效。 5. **对象编程**:Scala是面向对象的语言,类、对象和继承是其核心...

      scala编程入门教材

      Scala编程入门教材旨在引导初学者踏入Scala这一强大且多用途的...这将为你打下坚实的基础,进一步探索更高级的主题,如响应式编程、分布式系统设计或大数据处理工具(如Spark),这些都广泛使用Scala作为主要编程语言。

      atomic-scala-examples

      7. ** Actors 和 Concurrency**:Scala对Akka框架的支持,利用actors进行并发和分布式计算。 8. **泛型**:类型参数化,使代码更具通用性和可复用性。 9. **隐式转换**:Scala的隐式转换可以将一种类型的对象透明...

      Scala编程 (完整版)带书签目录

      5. ** Actors模型**:Scala内置对Akka框架的支持,Akka使用Actors模型进行并发处理,这是一种轻量级的线程模型,能够有效管理并发和分布式系统的复杂性。 6. **集合库**:Scala的集合库非常强大,提供了丰富的操作...

      Scala编程完整版

      7. ** Actors 和 Concurrency**:Scala支持Akka框架,其中Actors模型提供了并发和分布式计算的解决方案。Actors是独立运行的实体,通过消息传递进行通信,有助于构建可扩展的系统。 8. ** Scalactic 和 ScalaTest**...

    Global site tag (gtag.js) - Google Analytics