from:http://alvinalexander.com/java/jwarehouse/akka-2.3/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala.shtml
/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package docs.actor import language.postfixOps //#imports1 import akka.actor.Actor import akka.actor.Props import akka.event.Logging //#imports1 import scala.concurrent.Future import akka.actor.{ ActorRef, ActorSystem, PoisonPill, Terminated, ActorLogging } import org.scalatest.{ BeforeAndAfterAll, WordSpec } import org.scalatest.Matchers import akka.testkit._ import akka.util._ import scala.concurrent.duration._ import scala.concurrent.Await //#my-actor class MyActor extends Actor { val log = Logging(context.system, this) def receive = { case "test" => log.info("received test") case _ => log.info("received unknown message") } } //#my-actor final case class DoIt(msg: ImmutableMessage) final case class Message(s: String) //#context-actorOf class FirstActor extends Actor { val child = context.actorOf(Props[MyActor], name = "myChild") //#plus-some-behavior def receive = { case x => sender() ! x } //#plus-some-behavior } //#context-actorOf class ActorWithArgs(arg: String) extends Actor { def receive = { case _ => () } } class DemoActorWrapper extends Actor { //#props-factory object DemoActor { /** * Create Props for an actor of this type. * @param magciNumber The magic number to be passed to this actor鈥檚 constructor. * @return a Props for creating this actor, which can then be further configured * (e.g. calling `.withDispatcher()` on it) */ def props(magicNumber: Int): Props = Props(new DemoActor(magicNumber)) } class DemoActor(magicNumber: Int) extends Actor { def receive = { case x: Int => sender() ! (x + magicNumber) } } class SomeOtherActor extends Actor { // Props(new DemoActor(42)) would not be safe context.actorOf(DemoActor.props(42), "demo") // ... //#props-factory def receive = { case msg => } //#props-factory } //#props-factory def receive = Actor.emptyBehavior } class Hook extends Actor { var child: ActorRef = _ //#preStart override def preStart() { child = context.actorOf(Props[MyActor], "child") } //#preStart def receive = Actor.emptyBehavior //#postStop override def postStop() { //#clean-up-some-resources () //#clean-up-some-resources } //#postStop } class ReplyException extends Actor { def receive = { case _ => //#reply-exception try { val result = operation() sender() ! result } catch { case e: Exception => sender() ! akka.actor.Status.Failure(e) throw e } //#reply-exception } def operation(): String = { "Hi" } } //#gracefulStop-actor object Manager { case object Shutdown } class Manager extends Actor { import Manager._ val worker = context.watch(context.actorOf(Props[Cruncher], "worker")) def receive = { case "job" => worker ! "crunch" case Shutdown => worker ! PoisonPill context become shuttingDown } def shuttingDown: Receive = { case "job" => sender() ! "service unavailable, shutting down" case Terminated(`worker`) => context stop self } } //#gracefulStop-actor class Cruncher extends Actor { def receive = { case "crunch" => // crunch... } } //#swapper case object Swap class Swapper extends Actor { import context._ val log = Logging(system, this) def receive = { case Swap => log.info("Hi") become({ case Swap => log.info("Ho") unbecome() // resets the latest 'become' (just for fun) }, discardOld = false) // push on top instead of replace } } object SwapperApp extends App { val system = ActorSystem("SwapperSystem") val swap = system.actorOf(Props[Swapper], name = "swapper") swap ! Swap // logs Hi swap ! Swap // logs Ho swap ! Swap // logs Hi swap ! Swap // logs Ho swap ! Swap // logs Hi swap ! Swap // logs Ho } //#swapper //#receive-orElse trait ProducerBehavior { this: Actor => val producerBehavior: Receive = { case GiveMeThings => sender() ! Give("thing") } } trait ConsumerBehavior { this: Actor with ActorLogging => val consumerBehavior: Receive = { case ref: ActorRef => ref ! GiveMeThings case Give(thing) => log.info("Got a thing! It's {}", thing) } } class Producer extends Actor with ProducerBehavior { def receive = producerBehavior } class Consumer extends Actor with ActorLogging with ConsumerBehavior { def receive = consumerBehavior } class ProducerConsumer extends Actor with ActorLogging with ProducerBehavior with ConsumerBehavior { def receive = producerBehavior orElse consumerBehavior } // protocol case object GiveMeThings final case class Give(thing: Any) //#receive-orElse class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { "import context" in { new AnyRef { //#import-context class FirstActor extends Actor { import context._ val myActor = actorOf(Props[MyActor], name = "myactor") def receive = { case x => myActor ! x } } //#import-context val first = system.actorOf(Props(classOf[FirstActor], this), name = "first") system.stop(first) } } "creating actor with system.actorOf" in { val myActor = system.actorOf(Props[MyActor]) // testing the actor // TODO: convert docs to AkkaSpec(Map(...)) val filter = EventFilter.custom { case e: Logging.Info => true case _ => false } system.eventStream.publish(TestEvent.Mute(filter)) system.eventStream.subscribe(testActor, classOf[Logging.Info]) myActor ! "test" expectMsgPF(1 second) { case Logging.Info(_, _, "received test") => true } myActor ! "unknown" expectMsgPF(1 second) { case Logging.Info(_, _, "received unknown message") => true } system.eventStream.unsubscribe(testActor) system.eventStream.publish(TestEvent.UnMute(filter)) system.stop(myActor) } "creating a Props config" in { //#creating-props import akka.actor.Props val props1 = Props[MyActor] val props2 = Props(new ActorWithArgs("arg")) // careful, see below val props3 = Props(classOf[ActorWithArgs], "arg") //#creating-props //#creating-props-deprecated // NOT RECOMMENDED within another actor: // encourages to close over enclosing class val props7 = Props(new MyActor) //#creating-props-deprecated } "creating actor with Props" in { //#system-actorOf import akka.actor.ActorSystem // ActorSystem is a heavy object: create only one per application val system = ActorSystem("mySystem") val myActor = system.actorOf(Props[MyActor], "myactor2") //#system-actorOf shutdown(system) } "creating actor with IndirectActorProducer" in { class Echo(name: String) extends Actor { def receive = { case n: Int => sender() ! name case message => val target = testActor //#forward target forward message //#forward } } val a: { def actorRef: ActorRef } = new AnyRef { val applicationContext = this //#creating-indirectly import akka.actor.IndirectActorProducer class DependencyInjector(applicationContext: AnyRef, beanName: String) extends IndirectActorProducer { override def actorClass = classOf[Actor] override def produce = //#obtain-fresh-Actor-instance-from-DI-framework new Echo(beanName) def this(beanName: String) = this("", beanName) //#obtain-fresh-Actor-instance-from-DI-framework } val actorRef = system.actorOf( Props(classOf[DependencyInjector], applicationContext, "hello"), "helloBean") //#creating-indirectly } val actorRef = { import scala.language.reflectiveCalls a.actorRef } val message = 42 implicit val self = testActor //#tell actorRef ! message //#tell expectMsg("hello") actorRef ! "huhu" expectMsg("huhu") } "using implicit timeout" in { val myActor = system.actorOf(Props[FirstActor]) //#using-implicit-timeout import scala.concurrent.duration._ import akka.util.Timeout import akka.pattern.ask implicit val timeout = Timeout(5 seconds) val future = myActor ? "hello" //#using-implicit-timeout Await.result(future, timeout.duration) should be("hello") } "using explicit timeout" in { val myActor = system.actorOf(Props[FirstActor]) //#using-explicit-timeout import scala.concurrent.duration._ import akka.pattern.ask val future = myActor.ask("hello")(5 seconds) //#using-explicit-timeout Await.result(future, 5 seconds) should be("hello") } "using receiveTimeout" in { //#receive-timeout import akka.actor.ReceiveTimeout import scala.concurrent.duration._ class MyActor extends Actor { // To set an initial delay context.setReceiveTimeout(30 milliseconds) def receive = { case "Hello" => // To set in a response to a message context.setReceiveTimeout(100 milliseconds) case ReceiveTimeout => // To turn it off context.setReceiveTimeout(Duration.Undefined) throw new RuntimeException("Receive timed out") } } //#receive-timeout } //#hot-swap-actor class HotSwapActor extends Actor { import context._ def angry: Receive = { case "foo" => sender() ! "I am already angry?" case "bar" => become(happy) } def happy: Receive = { case "bar" => sender() ! "I am already happy :-)" case "foo" => become(angry) } def receive = { case "foo" => become(angry) case "bar" => become(happy) } } //#hot-swap-actor "using hot-swap" in { val actor = system.actorOf(Props(classOf[HotSwapActor], this), name = "hot") } "using Stash" in { //#stash import akka.actor.Stash class ActorWithProtocol extends Actor with Stash { def receive = { case "open" => unstashAll() context.become({ case "write" => // do writing... case "close" => unstashAll() context.unbecome() case msg => stash() }, discardOld = false) // stack on top instead of replacing case msg => stash() } } //#stash } "using watch" in { new AnyRef { //#watch import akka.actor.{ Actor, Props, Terminated } class WatchActor extends Actor { val child = context.actorOf(Props.empty, "child") context.watch(child) // <-- this is the only call needed for registration var lastSender = system.deadLetters def receive = { case "kill" => context.stop(child); lastSender = sender() case Terminated(`child`) => lastSender ! "finished" } } //#watch val a = system.actorOf(Props(classOf[WatchActor], this)) implicit val sender = testActor a ! "kill" expectMsg("finished") } } "demonstrate ActorSelection" in { val context = system //#selection-local // will look up this absolute path context.actorSelection("/user/serviceA/aggregator") // will look up sibling beneath same supervisor context.actorSelection("../joe") //#selection-local //#selection-wildcard // will look all children to serviceB with names starting with worker context.actorSelection("/user/serviceB/worker*") // will look up all siblings beneath same supervisor context.actorSelection("../*") //#selection-wildcard //#selection-remote context.actorSelection("akka.tcp://app@otherhost:1234/user/serviceB") //#selection-remote } "using Identify" in { new AnyRef { //#identify import akka.actor.{ Actor, Props, Identify, ActorIdentity, Terminated } class Follower extends Actor { val identifyId = 1 context.actorSelection("/user/another") ! Identify(identifyId) def receive = { case ActorIdentity(`identifyId`, Some(ref)) => context.watch(ref) context.become(active(ref)) case ActorIdentity(`identifyId`, None) => context.stop(self) } def active(another: ActorRef): Actor.Receive = { case Terminated(`another`) => context.stop(self) } } //#identify val a = system.actorOf(Props.empty) val b = system.actorOf(Props(classOf[Follower], this)) watch(b) system.stop(a) expectMsgType[akka.actor.Terminated].actor should be(b) } } "using pattern gracefulStop" in { val actorRef = system.actorOf(Props[Manager]) //#gracefulStop import akka.pattern.gracefulStop import scala.concurrent.Await try { val stopped: Future[Boolean] = gracefulStop(actorRef, 5 seconds, Manager.Shutdown) Await.result(stopped, 6 seconds) // the actor has been stopped } catch { // the actor wasn't stopped within 5 seconds case e: akka.pattern.AskTimeoutException => } //#gracefulStop } "using pattern ask / pipeTo" in { val actorA, actorB, actorC, actorD = system.actorOf(Props.empty) //#ask-pipeTo import akka.pattern.{ ask, pipe } import system.dispatcher // The ExecutionContext that will be used final case class Result(x: Int, s: String, d: Double) case object Request implicit val timeout = Timeout(5 seconds) // needed for `?` below val f: Future[Result] = for { x <- ask(actorA, Request).mapTo[Int] // call pattern directly s <- (actorB ask Request).mapTo[String] // call by implicit conversion d <- (actorC ? Request).mapTo[Double] // call by symbolic name } yield Result(x, s, d) f pipeTo actorD // .. or .. pipe(f) to actorD //#ask-pipeTo } class Replier extends Actor { def receive = { case ref: ActorRef => //#reply-with-sender sender().tell("reply", context.parent) // replies will go back to parent sender().!("reply")(context.parent) // alternative syntax (beware of the parens!) //#reply-with-sender case x => //#reply-without-sender sender() ! x // replies will go to this actor //#reply-without-sender } } "replying with own or other sender" in { val actor = system.actorOf(Props(classOf[Replier], this)) implicit val me = testActor actor ! 42 expectMsg(42) lastSender should be(actor) actor ! me expectMsg("reply") lastSender.path.toStringWithoutAddress should be("/user") expectMsg("reply") lastSender.path.toStringWithoutAddress should be("/user") } "using ActorDSL outside of akka.actor package" in { import akka.actor.ActorDSL._ actor(new Act { superviseWith(OneForOneStrategy() { case _ => Stop; Restart; Resume; Escalate }) superviseWith(AllForOneStrategy() { case _ => Stop; Restart; Resume; Escalate }) }) } }
相关推荐
scala-world-2015, scala.world 2015的源代码关于akka流/akka http akka流/akka http-scala.world-2015会议会话源要进行测试,请在sbt中运行这里命令:log-service/re-startbackend/runMain example.repoanalyze
- **Actor模型**:Scala内置对Akka框架的支持,实现高效的并发处理。 - **FP特性**:包括不可变数据结构、尾递归优化和类型类,鼓励使用函数式编程风格。 Scala 2.12.x相对于早期版本的改进可能涉及性能优化、语言...
Harness reactive programming to build scalable and fault-tolerant distributed systems using Scala and Akka About This Book Use the concepts of reactive programming to build distributed systems ...
5. ** Actors模型**:Scala内置了Akka库,提供了Actor模型,用于构建并发和分布式系统。Actors是轻量级的并发实体,它们通过消息传递进行通信,避免了共享状态的复杂性。 6. **集合库**:Scala的集合库是其强大功能...
5. **额外的工具和库**:可能还包括Scala相关的工具和库,例如 sbt(Scala构建工具)的集成,或者用于Akka、Play Framework等Scala流行框架的支持。 使用这个插件,开发者可以享受到以下优势: - **语法感知**:...
4. **Akka FileIO**:Akka的FileIO模块提供了对文件系统的非阻塞访问,可以异步读写文件,使用Future表示操作的结果,使得I/O操作能够与其他任务并行执行,提高整体系统的效率。 5. **反应式编程**:一种编程范式,...
7. **Akka框架**:Scala的一个重要应用是Akka,这是一个用于构建高度并发、分布式和反应式系统的框架,它利用Scala的特性提供了强大的 Actor 模型。 8. **Spark大数据处理**:Apache Spark,一个流行的分布式计算...
在实际开发中,除了Scala和ScalaTest,还有许多其他流行的Scala库,如Akka(用于构建并发和分布式系统)、Spark(大数据处理框架)和Play Framework(用于构建Web应用)。这些库也可以通过类似的方式在`pom.xml`中...
9. Akka框架:Scala广泛用于构建分布式系统,尤其是与Akka框架结合。Akka提供了actor模型,简化了并发和容错编程。 10. Play框架:Play是一个基于Scala和Java的Web开发框架,提供了一种现代、反应式的应用开发方式...
安装这个插件后,用户可以享受到诸如代码高亮、自动完成、错误检测、重构工具、Scala REPL(Read-Eval-Print Loop)集成以及对Akka、Spark等Scala库的智能支持等便利。 Scala语言的特点包括: 1. **类型系统**:...
Scala是一种多范式编程语言,它融合了面向对象和函数式编程的概念,旨在提供一种统一且高效的编程模型。Scala的设计目标是提高开发者的生产力,同时保持代码的可维护性和可扩展性。它运行在Java虚拟机(JVM)上,...
Akka是一个用Scala和Java编写的库,用于构建并发、分布式以及容错的事件驱动应用。Akka框架借鉴了Erlang的并发模型,但它是建立在JVM之上,并且提供了丰富的抽象和工具,能够简化开发工作。 标题“Akka Scala 学习...
标题中的“用Scala写的akka actor简单demo”指的是一个使用Scala编程语言编写的Akka Actor系统的基本示例。Akka是轻量级、基于actor模型的框架,它用于构建高度并发、分布式和容错的应用程序。Scala是多范式编程语言...
5. ** Actors模型**:Scala内建对Akka框架的支持,其中Actors模型提供了一种处理并发和分布式计算的方式。Actors通过消息传递来通信,确保线程安全。 6. ** REPL(Read-Eval-Print Loop)**:Scala提供了交互式的...
- ** Actors模型**:Scala内置对Akka框架的支持,允许并发和分布式计算,而不会遇到线程同步的问题。 - **复合性管理**:通过特质(Traits)和柯里化(Currying),Scala提供了一种组织复杂代码结构的方式。 - **...
Scala广泛应用于构建高性能、分布式系统,比如Akka框架用于构建反应式应用程序,Play Framework用于构建Web应用,而Apache Spark大数据处理框架也是用Scala编写,这些都依赖于Scala-2.12.x版本的稳定性和性能。...
Scala的类库如Akka和Play Framework可以方便地与Kafka集成,实现复杂的数据处理和网络应用。Kafka则以其高吞吐量、低延迟的特性,成为实时数据流处理的首选平台。同时,由于Kafka支持多种语言的客户端,即使不是使用...
5. Akka框架:Akka是用Scala编写的开源框架,用于构建高度可扩展、容错的应用程序,它充分利用了Scala的Actor模型。 6. Scala与Java互操作:由于Scala是运行在JVM上的,所以可以直接使用Java库,与Java代码无缝集成...
此外,了解如何将Scala应用于Apache Spark、Akka等框架也会提升你的技能水平。 7. **社区支持** Scala有一个活跃的开发者社区,如Stack Overflow、GitHub和Scala User Group,这些地方可以获取帮助,参与讨论,...
5. **Actor模型**:Scala集成了Akka框架,支持基于Actor的消息传递并发模型,有助于构建可扩展的、容错的系统。 6. **Java互操作性**:Scala运行在JVM上,可以直接使用Java库,反之亦然,这为开发者提供了广泛的...