`

Akka/Scala example source code file (ActorDocSpec.scala)

 
阅读更多

from:http://alvinalexander.com/java/jwarehouse/akka-2.3/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala.shtml

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package docs.actor

import language.postfixOps

//#imports1
import akka.actor.Actor
import akka.actor.Props
import akka.event.Logging

//#imports1

import scala.concurrent.Future
import akka.actor.{ ActorRef, ActorSystem, PoisonPill, Terminated, ActorLogging }
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
import org.scalatest.Matchers
import akka.testkit._
import akka.util._
import scala.concurrent.duration._
import scala.concurrent.Await

//#my-actor
class MyActor extends Actor {
  val log = Logging(context.system, this)
  def receive = {
    case "test" => log.info("received test")
    case _      => log.info("received unknown message")
  }
}
//#my-actor

final case class DoIt(msg: ImmutableMessage)
final case class Message(s: String)

//#context-actorOf
class FirstActor extends Actor {
  val child = context.actorOf(Props[MyActor], name = "myChild")
  //#plus-some-behavior
  def receive = {
    case x => sender() ! x
  }
  //#plus-some-behavior
}
//#context-actorOf

class ActorWithArgs(arg: String) extends Actor {
  def receive = { case _ => () }
}

class DemoActorWrapper extends Actor {
  //#props-factory
  object DemoActor {
    /**
     * Create Props for an actor of this type.
     * @param magciNumber The magic number to be passed to this actor鈥檚 constructor.
     * @return a Props for creating this actor, which can then be further configured
     *         (e.g. calling `.withDispatcher()` on it)
     */
    def props(magicNumber: Int): Props = Props(new DemoActor(magicNumber))
  }

  class DemoActor(magicNumber: Int) extends Actor {
    def receive = {
      case x: Int => sender() ! (x + magicNumber)
    }
  }

  class SomeOtherActor extends Actor {
    // Props(new DemoActor(42)) would not be safe
    context.actorOf(DemoActor.props(42), "demo")
    // ...
    //#props-factory
    def receive = {
      case msg =>
    }
    //#props-factory
  }
  //#props-factory

  def receive = Actor.emptyBehavior
}

class Hook extends Actor {
  var child: ActorRef = _
  //#preStart
  override def preStart() {
    child = context.actorOf(Props[MyActor], "child")
  }
  //#preStart
  def receive = Actor.emptyBehavior
  //#postStop
  override def postStop() {
    //#clean-up-some-resources
    ()
    //#clean-up-some-resources
  }
  //#postStop
}

class ReplyException extends Actor {
  def receive = {
    case _ =>
      //#reply-exception
      try {
        val result = operation()
        sender() ! result
      } catch {
        case e: Exception =>
          sender() ! akka.actor.Status.Failure(e)
          throw e
      }
    //#reply-exception
  }

  def operation(): String = { "Hi" }

}

//#gracefulStop-actor
object Manager {
  case object Shutdown
}

class Manager extends Actor {
  import Manager._
  val worker = context.watch(context.actorOf(Props[Cruncher], "worker"))

  def receive = {
    case "job" => worker ! "crunch"
    case Shutdown =>
      worker ! PoisonPill
      context become shuttingDown
  }

  def shuttingDown: Receive = {
    case "job" => sender() ! "service unavailable, shutting down"
    case Terminated(`worker`) =>
      context stop self
  }
}
//#gracefulStop-actor

class Cruncher extends Actor {
  def receive = {
    case "crunch" => // crunch...
  }
}

//#swapper
case object Swap
class Swapper extends Actor {
  import context._
  val log = Logging(system, this)

  def receive = {
    case Swap =>
      log.info("Hi")
      become({
        case Swap =>
          log.info("Ho")
          unbecome() // resets the latest 'become' (just for fun)
      }, discardOld = false) // push on top instead of replace
  }
}

object SwapperApp extends App {
  val system = ActorSystem("SwapperSystem")
  val swap = system.actorOf(Props[Swapper], name = "swapper")
  swap ! Swap // logs Hi
  swap ! Swap // logs Ho
  swap ! Swap // logs Hi
  swap ! Swap // logs Ho
  swap ! Swap // logs Hi
  swap ! Swap // logs Ho
}
//#swapper

//#receive-orElse

trait ProducerBehavior {
  this: Actor =>

  val producerBehavior: Receive = {
    case GiveMeThings =>
      sender() ! Give("thing")
  }
}

trait ConsumerBehavior {
  this: Actor with ActorLogging =>

  val consumerBehavior: Receive = {
    case ref: ActorRef =>
      ref ! GiveMeThings

    case Give(thing) =>
      log.info("Got a thing! It's {}", thing)
  }
}

class Producer extends Actor with ProducerBehavior {
  def receive = producerBehavior
}

class Consumer extends Actor with ActorLogging with ConsumerBehavior {
  def receive = consumerBehavior
}

class ProducerConsumer extends Actor with ActorLogging
  with ProducerBehavior with ConsumerBehavior {

  def receive = producerBehavior orElse consumerBehavior
}

// protocol
case object GiveMeThings
final case class Give(thing: Any)

//#receive-orElse

class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {

  "import context" in {
    new AnyRef {
      //#import-context
      class FirstActor extends Actor {
        import context._
        val myActor = actorOf(Props[MyActor], name = "myactor")
        def receive = {
          case x => myActor ! x
        }
      }
      //#import-context

      val first = system.actorOf(Props(classOf[FirstActor], this), name = "first")
      system.stop(first)
    }
  }

  "creating actor with system.actorOf" in {
    val myActor = system.actorOf(Props[MyActor])

    // testing the actor

    // TODO: convert docs to AkkaSpec(Map(...))
    val filter = EventFilter.custom {
      case e: Logging.Info => true
      case _               => false
    }
    system.eventStream.publish(TestEvent.Mute(filter))
    system.eventStream.subscribe(testActor, classOf[Logging.Info])

    myActor ! "test"
    expectMsgPF(1 second) { case Logging.Info(_, _, "received test") => true }

    myActor ! "unknown"
    expectMsgPF(1 second) { case Logging.Info(_, _, "received unknown message") => true }

    system.eventStream.unsubscribe(testActor)
    system.eventStream.publish(TestEvent.UnMute(filter))

    system.stop(myActor)
  }

  "creating a Props config" in {
    //#creating-props
    import akka.actor.Props

    val props1 = Props[MyActor]
    val props2 = Props(new ActorWithArgs("arg")) // careful, see below
    val props3 = Props(classOf[ActorWithArgs], "arg")
    //#creating-props

    //#creating-props-deprecated
    // NOT RECOMMENDED within another actor:
    // encourages to close over enclosing class
    val props7 = Props(new MyActor)
    //#creating-props-deprecated
  }

  "creating actor with Props" in {
    //#system-actorOf
    import akka.actor.ActorSystem

    // ActorSystem is a heavy object: create only one per application
    val system = ActorSystem("mySystem")
    val myActor = system.actorOf(Props[MyActor], "myactor2")
    //#system-actorOf
    shutdown(system)
  }

  "creating actor with IndirectActorProducer" in {
    class Echo(name: String) extends Actor {
      def receive = {
        case n: Int => sender() ! name
        case message =>
          val target = testActor
          //#forward
          target forward message
        //#forward
      }
    }

    val a: { def actorRef: ActorRef } = new AnyRef {
      val applicationContext = this

      //#creating-indirectly
      import akka.actor.IndirectActorProducer

      class DependencyInjector(applicationContext: AnyRef, beanName: String)
        extends IndirectActorProducer {

        override def actorClass = classOf[Actor]
        override def produce =
          //#obtain-fresh-Actor-instance-from-DI-framework
          new Echo(beanName)

        def this(beanName: String) = this("", beanName)
        //#obtain-fresh-Actor-instance-from-DI-framework
      }

      val actorRef = system.actorOf(
        Props(classOf[DependencyInjector], applicationContext, "hello"),
        "helloBean")
      //#creating-indirectly
    }
    val actorRef = {
      import scala.language.reflectiveCalls
      a.actorRef
    }

    val message = 42
    implicit val self = testActor
    //#tell
    actorRef ! message
    //#tell
    expectMsg("hello")
    actorRef ! "huhu"
    expectMsg("huhu")
  }

  "using implicit timeout" in {
    val myActor = system.actorOf(Props[FirstActor])
    //#using-implicit-timeout
    import scala.concurrent.duration._
    import akka.util.Timeout
    import akka.pattern.ask
    implicit val timeout = Timeout(5 seconds)
    val future = myActor ? "hello"
    //#using-implicit-timeout
    Await.result(future, timeout.duration) should be("hello")

  }

  "using explicit timeout" in {
    val myActor = system.actorOf(Props[FirstActor])
    //#using-explicit-timeout
    import scala.concurrent.duration._
    import akka.pattern.ask
    val future = myActor.ask("hello")(5 seconds)
    //#using-explicit-timeout
    Await.result(future, 5 seconds) should be("hello")
  }

  "using receiveTimeout" in {
    //#receive-timeout
    import akka.actor.ReceiveTimeout
    import scala.concurrent.duration._
    class MyActor extends Actor {
      // To set an initial delay
      context.setReceiveTimeout(30 milliseconds)
      def receive = {
        case "Hello" =>
          // To set in a response to a message
          context.setReceiveTimeout(100 milliseconds)
        case ReceiveTimeout =>
          // To turn it off
          context.setReceiveTimeout(Duration.Undefined)
          throw new RuntimeException("Receive timed out")
      }
    }
    //#receive-timeout
  }

  //#hot-swap-actor
  class HotSwapActor extends Actor {
    import context._
    def angry: Receive = {
      case "foo" => sender() ! "I am already angry?"
      case "bar" => become(happy)
    }

    def happy: Receive = {
      case "bar" => sender() ! "I am already happy :-)"
      case "foo" => become(angry)
    }

    def receive = {
      case "foo" => become(angry)
      case "bar" => become(happy)
    }
  }
  //#hot-swap-actor

  "using hot-swap" in {
    val actor = system.actorOf(Props(classOf[HotSwapActor], this), name = "hot")
  }

  "using Stash" in {
    //#stash
    import akka.actor.Stash
    class ActorWithProtocol extends Actor with Stash {
      def receive = {
        case "open" =>
          unstashAll()
          context.become({
            case "write" => // do writing...
            case "close" =>
              unstashAll()
              context.unbecome()
            case msg => stash()
          }, discardOld = false) // stack on top instead of replacing
        case msg => stash()
      }
    }
    //#stash
  }

  "using watch" in {
    new AnyRef {
      //#watch
      import akka.actor.{ Actor, Props, Terminated }

      class WatchActor extends Actor {
        val child = context.actorOf(Props.empty, "child")
        context.watch(child) // <-- this is the only call needed for registration
        var lastSender = system.deadLetters

        def receive = {
          case "kill" =>
            context.stop(child); lastSender = sender()
          case Terminated(`child`) => lastSender ! "finished"
        }
      }
      //#watch
      val a = system.actorOf(Props(classOf[WatchActor], this))
      implicit val sender = testActor
      a ! "kill"
      expectMsg("finished")
    }
  }

  "demonstrate ActorSelection" in {
    val context = system
    //#selection-local
    // will look up this absolute path
    context.actorSelection("/user/serviceA/aggregator")
    // will look up sibling beneath same supervisor
    context.actorSelection("../joe")
    //#selection-local
    //#selection-wildcard
    // will look all children to serviceB with names starting with worker
    context.actorSelection("/user/serviceB/worker*")
    // will look up all siblings beneath same supervisor
    context.actorSelection("../*")
    //#selection-wildcard
    //#selection-remote
    context.actorSelection("akka.tcp://app@otherhost:1234/user/serviceB")
    //#selection-remote
  }

  "using Identify" in {
    new AnyRef {
      //#identify
      import akka.actor.{ Actor, Props, Identify, ActorIdentity, Terminated }

      class Follower extends Actor {
        val identifyId = 1
        context.actorSelection("/user/another") ! Identify(identifyId)

        def receive = {
          case ActorIdentity(`identifyId`, Some(ref)) =>
            context.watch(ref)
            context.become(active(ref))
          case ActorIdentity(`identifyId`, None) => context.stop(self)

        }

        def active(another: ActorRef): Actor.Receive = {
          case Terminated(`another`) => context.stop(self)
        }
      }
      //#identify

      val a = system.actorOf(Props.empty)
      val b = system.actorOf(Props(classOf[Follower], this))
      watch(b)
      system.stop(a)
      expectMsgType[akka.actor.Terminated].actor should be(b)
    }
  }

  "using pattern gracefulStop" in {
    val actorRef = system.actorOf(Props[Manager])
    //#gracefulStop
    import akka.pattern.gracefulStop
    import scala.concurrent.Await

    try {
      val stopped: Future[Boolean] = gracefulStop(actorRef, 5 seconds, Manager.Shutdown)
      Await.result(stopped, 6 seconds)
      // the actor has been stopped
    } catch {
      // the actor wasn't stopped within 5 seconds
      case e: akka.pattern.AskTimeoutException =>
    }
    //#gracefulStop
  }

  "using pattern ask / pipeTo" in {
    val actorA, actorB, actorC, actorD = system.actorOf(Props.empty)
    //#ask-pipeTo
    import akka.pattern.{ ask, pipe }
    import system.dispatcher // The ExecutionContext that will be used
    final case class Result(x: Int, s: String, d: Double)
    case object Request

    implicit val timeout = Timeout(5 seconds) // needed for `?` below

    val f: Future[Result] =
      for {
        x <- ask(actorA, Request).mapTo[Int] // call pattern directly
        s <- (actorB ask Request).mapTo[String] // call by implicit conversion
        d <- (actorC ? Request).mapTo[Double] // call by symbolic name
      } yield Result(x, s, d)

    f pipeTo actorD // .. or ..
    pipe(f) to actorD
    //#ask-pipeTo
  }

  class Replier extends Actor {
    def receive = {
      case ref: ActorRef =>
        //#reply-with-sender
        sender().tell("reply", context.parent) // replies will go back to parent
        sender().!("reply")(context.parent) // alternative syntax (beware of the parens!)
      //#reply-with-sender
      case x =>
        //#reply-without-sender
        sender() ! x // replies will go to this actor
      //#reply-without-sender
    }
  }

  "replying with own or other sender" in {
    val actor = system.actorOf(Props(classOf[Replier], this))
    implicit val me = testActor
    actor ! 42
    expectMsg(42)
    lastSender should be(actor)
    actor ! me
    expectMsg("reply")
    lastSender.path.toStringWithoutAddress should be("/user")
    expectMsg("reply")
    lastSender.path.toStringWithoutAddress should be("/user")
  }

  "using ActorDSL outside of akka.actor package" in {
    import akka.actor.ActorDSL._
    actor(new Act {
      superviseWith(OneForOneStrategy() { case _ => Stop; Restart; Resume; Escalate })
      superviseWith(AllForOneStrategy() { case _ => Stop; Restart; Resume; Escalate })
    })
  }

}

 

分享到:
评论

相关推荐

    scala-world-2015, scala.world 2015的源代码关于akka流/akka http.zip

    scala-world-2015, scala.world 2015的源代码关于akka流/akka http akka流/akka http-scala.world-2015会议会话源要进行测试,请在sbt中运行这里命令:log-service/re-startbackend/runMain example.repoanalyze

    scala-2.12.13.tgz

    - **Actor模型**:Scala内置对Akka框架的支持,实现高效的并发处理。 - **FP特性**:包括不可变数据结构、尾递归优化和类型类,鼓励使用函数式编程风格。 Scala 2.12.x相对于早期版本的改进可能涉及性能优化、语言...

    Reactive.Programming.with.Scala.and.Akka.1783984341.epub

    Harness reactive programming to build scalable and fault-tolerant distributed systems using Scala and Akka About This Book Use the concepts of reactive programming to build distributed systems ...

    scala-2.12.11.zip

    5. ** Actors模型**:Scala内置了Akka库,提供了Actor模型,用于构建并发和分布式系统。Actors是轻量级的并发实体,它们通过消息传递进行通信,避免了共享状态的复杂性。 6. **集合库**:Scala的集合库是其强大功能...

    scala-intellij-bin-2016.3.9

    5. **额外的工具和库**:可能还包括Scala相关的工具和库,例如 sbt(Scala构建工具)的集成,或者用于Akka、Play Framework等Scala流行框架的支持。 使用这个插件,开发者可以享受到以下优势: - **语法感知**:...

    com.ibm.jbatch-model-1.0-b28.zip

    4. **Akka FileIO**:Akka的FileIO模块提供了对文件系统的非阻塞访问,可以异步读写文件,使用Future表示操作的结果,使得I/O操作能够与其他任务并行执行,提高整体系统的效率。 5. **反应式编程**:一种编程范式,...

    scala sdk .......

    7. **Akka框架**:Scala的一个重要应用是Akka,这是一个用于构建高度并发、分布式和反应式系统的框架,它利用Scala的特性提供了强大的 Actor 模型。 8. **Spark大数据处理**:Apache Spark,一个流行的分布式计算...

    scala库 by maven依赖

    在实际开发中,除了Scala和ScalaTest,还有许多其他流行的Scala库,如Akka(用于构建并发和分布式系统)、Spark(大数据处理框架)和Play Framework(用于构建Web应用)。这些库也可以通过类似的方式在`pom.xml`中...

    scala-2.12.10.tgz

    9. Akka框架:Scala广泛用于构建分布式系统,尤其是与Akka框架结合。Akka提供了actor模型,简化了并发和容错编程。 10. Play框架:Play是一个基于Scala和Java的Web开发框架,提供了一种现代、反应式的应用开发方式...

    scala-intellij-bin-2021.3.6.zip

    安装这个插件后,用户可以享受到诸如代码高亮、自动完成、错误检测、重构工具、Scala REPL(Read-Eval-Print Loop)集成以及对Akka、Spark等Scala库的智能支持等便利。 Scala语言的特点包括: 1. **类型系统**:...

    Scala(中文完整版).zip

    Scala是一种多范式编程语言,它融合了面向对象和函数式编程的概念,旨在提供一种统一且高效的编程模型。Scala的设计目标是提高开发者的生产力,同时保持代码的可维护性和可扩展性。它运行在Java虚拟机(JVM)上,...

    Akka Scala 学习高清原版pdf

    Akka是一个用Scala和Java编写的库,用于构建并发、分布式以及容错的事件驱动应用。Akka框架借鉴了Erlang的并发模型,但它是建立在JVM之上,并且提供了丰富的抽象和工具,能够简化开发工作。 标题“Akka Scala 学习...

    用Scala写的akka actor简单demo

    标题中的“用Scala写的akka actor简单demo”指的是一个使用Scala编程语言编写的Akka Actor系统的基本示例。Akka是轻量级、基于actor模型的框架,它用于构建高度并发、分布式和容错的应用程序。Scala是多范式编程语言...

    scala-2.11.8.tar.gz

    5. ** Actors模型**:Scala内建对Akka框架的支持,其中Actors模型提供了一种处理并发和分布式计算的方式。Actors通过消息传递来通信,确保线程安全。 6. ** REPL(Read-Eval-Print Loop)**:Scala提供了交互式的...

    官网scala-2.11.8.tar.gz

    - ** Actors模型**:Scala内置对Akka框架的支持,允许并发和分布式计算,而不会遇到线程同步的问题。 - **复合性管理**:通过特质(Traits)和柯里化(Currying),Scala提供了一种组织复杂代码结构的方式。 - **...

    scala-2.12.6.tgz

    Scala广泛应用于构建高性能、分布式系统,比如Akka框架用于构建反应式应用程序,Play Framework用于构建Web应用,而Apache Spark大数据处理框架也是用Scala编写,这些都依赖于Scala-2.12.x版本的稳定性和性能。...

    Scala安装包和kafka安装包

    Scala的类库如Akka和Play Framework可以方便地与Kafka集成,实现复杂的数据处理和网络应用。Kafka则以其高吞吐量、低延迟的特性,成为实时数据流处理的首选平台。同时,由于Kafka支持多种语言的客户端,即使不是使用...

    Scala-学习资料-mht.rar

    5. Akka框架:Akka是用Scala编写的开源框架,用于构建高度可扩展、容错的应用程序,它充分利用了Scala的Actor模型。 6. Scala与Java互操作:由于Scala是运行在JVM上的,所以可以直接使用Java库,与Java代码无缝集成...

    linux上安装scala资源包 scala.zip

    此外,了解如何将Scala应用于Apache Spark、Akka等框架也会提升你的技能水平。 7. **社区支持** Scala有一个活跃的开发者社区,如Stack Overflow、GitHub和Scala User Group,这些地方可以获取帮助,参与讨论,...

    scala-2.12.12.zip

    5. **Actor模型**:Scala集成了Akka框架,支持基于Actor的消息传递并发模型,有助于构建可扩展的、容错的系统。 6. **Java互操作性**:Scala运行在JVM上,可以直接使用Java库,反之亦然,这为开发者提供了广泛的...

Global site tag (gtag.js) - Google Analytics