`
deepinmind
  • 浏览: 450963 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
1dc14e59-7bdf-33ab-841a-02d087aed982
Java函数式编程
浏览量:41568
社区版块
存档分类
最新评论

Akka笔记之配置及调度

阅读更多
正如前几篇文章中所看到的,我们可以通过ActorSystem的actorOf方法来创建一个Actor。其实ActorSystem还可以做许多事情。本篇中我们来看下如何用它来进行配置及调度。

我们先看下ActorSystem里的方法的一个子集。



1. 配置管理

还记得前面](http://it.deepinmind.com/akka/2014/10/22/akka-notes-logging-and-testing.html)我们用来配置日志级别的那个application.conf文件吗?它就好比Java程序里的那些.properties文件一样。马上我们会看到如何通过这个配置文件来自定义分发器以及邮箱等。(这么说好像对[typesafe config](https://github.com/typesafehub/config)而言不太公平。看过[这几个例子之后你就知道它有多棒了)

当我们通过ActorSystem对象的apply方法创建ActorSystem实例而不指定任何配置的时候,它会在classpath的根路径下依次去查找application.conf, application.json以及application.peroperties文件并自动进行加载。

因此



val system=ActorSystem("UniversityMessagingSystem")




等同于


val system=ActorSystem("UniversityMessagingSystem", ConfigFactory.load()) 





为了证实这一说法,我们来看下ActorSystem.scala里的apply方法。


def apply(name: String, config: Option[Config] = None, classLoader: Option[ClassLoader] = None, defaultExecutionContext: Option[ExecutionContext] = None): ActorSystem = {
    val cl = classLoader.getOrElse(findClassLoader())
    val appConfig = config.getOrElse(ConfigFactory.load(cl))
    new ActorSystemImpl(name, appConfig, cl, defaultExecutionContext).start()
  }
 


A. 覆盖默认配置

如果你不喜欢使用application.conf的话(比如说在测试用例中),或者希望能自己指定配置文件(比方说想测试不同的配置文件或者部署到不同的环境上),你可以传入一个自己的配置来覆盖掉它,这样就不会去classpath中读取默认的配置了。

使用ConfigFactory.parseString


val actorSystem=ActorSystem("UniversityMessageSystem", ConfigFactory.parseString("""akka.loggers = ["akka.testkit.TestEventListener"]"""))
 


或者

在测试用例里也很简单


class TeacherTestLogListener extends TestKit(ActorSystem("UniversityMessageSystem", ConfigFactory.parseString("""akka.loggers = ["akka.testkit.TestEventListener"]"""))) 
  with WordSpecLike
  with MustMatchers
  with BeforeAndAfterAll {




还可以使用ConfigFactory.load


val system = ActorSystem("UniversityMessageSystem", ConfigFactory.load("uat-application.conf"))



如果你需要在运行时读取自己的配置参数的话,你可以通过它提供的API来完成:


val system=ActorSystem("UniversityMessageSystem", ConfigFactory.parseString("""akka.loggers = ["akka.testkit.TestEventListener"]""")) 
println (system.settings.config.getValue("akka.loggers")) // Results in > SimpleConfigList(["akka.testkit.TestEventListener"])



B. 扩展默认配置

除了覆盖原有文件之外,你还可以通过Config类的withFallback方法来使用自己的自定义配置去扩展默认配置。

假设你的application.conf是这样的:


akka{ 
    loggers = ["akka.event.slf4j.Slf4jLogger"]
    loglevel = DEBUG
    arun="hello"
}



而你希望覆盖掉它的akka.loggers属性:


val config=ConfigFactory.parseString("""akka.loggers = ["akka.testkit.TestEventListener"]""")
    val system=ActorSystem("UniversityMessageSystem", config.withFallback(ConfigFactory.load()))
 

 
最终你拿到的是两个配置合并之后的结果:


println (system.settings.config.getValue("akka.arun")) //> ConfigString("hello")
    println (system.settings.config.getValue("akka.loggers")) //> SimpleConfigList(["akka.testkit.TestEventListener"])


  
那么,为什么要介绍这个配置呢?因为ActorSystem就是用来加载并访问配置信息的。
  • *重要提示:**

  • 请注意一下Fallback方法的顺序——哪个是默认配置,哪个又是扩展配置。记住,你是要回退到默认的配置上去,也就是说

    
    config.withFallback(ConfigFactory.load())
    
    


    是OK的,而

    
    ConfigFactory.load().withFallback(config) 
    
    


    则不是你想要的结果。

    2. 调度器



    ActorSystem的API中可以看出,ActorSystem中有一个叫schedule的小而强大的方法,它会返回给你一个Scheduler。这个调度器有一系列的调度方法,我们可以用它们来完成许多有意思的事情。

    A. 一次性调度



    还是使用学生-老师那个例子,假设StudentActor希望在收到InitSignal消息5秒之后才向老师发送请求,而不是立即发送,那么代码应该这么写:

    
    class StudentDelayedActor (teacherActorRef:ActorRef) extends Actor with ActorLogging {
    
      def receive = {
        case InitSignal=> {
          import context.dispatcher
          context.system.scheduler.scheduleOnce(5 seconds, teacherActorRef, QuoteRequest)
          //teacherActorRef!QuoteRequest
        }
        ...
        ...
      }
    }
    
    


    测试用例

    我们来编写一个简单的用例去验证下这个功能:

    
    "A delayed student" must {
    
        "fire the QuoteRequest after 5 seconds when an InitSignal is sent to it" in {
    
          import me.rerun.akkanotes.messaging.protocols.StudentProtocol._
    
          val teacherRef = system.actorOf(Props[TeacherActor], "teacherActorDelayed")
          val studentRef = system.actorOf(Props(new StudentDelayedActor(teacherRef)), "studentDelayedActor")
    
          EventFilter.info (start="Printing from Student Actor", occurrences=1).intercept{
              studentRef!InitSignal
          }
        }
    
      }
     
    



    延长Eventfilter拦截器的超时时间

    噢,不妙。Eventfilter等待EventStream中出现消息的默认超时时间只有3秒。为了能验证我们这个用例,我们把这个超时时间增加到7秒。filter-leeway配置项就能帮助我们完成这点。

    
    class RequestResponseTest extends TestKit(ActorSystem("TestUniversityMessageSystem", ConfigFactory.parseString(""" 
                                                akka{
                                                  loggers = ["akka.testkit.TestEventListener"]
                                                  test{
                                                      filter-leeway = 7s
                                                  }
                                                }
                                        """)))
      with WordSpecLike
      with MustMatchers
      with BeforeAndAfterAll
      with ImplicitSender {
      ...
      ...
     
    





    B. 循环调度

    要想重复地执行某个动作,你可以使用Scheduler的schedule方法。

    最常用的一个schedule方法就是定期地向一个Actor发送消息。这个方法接收4个参数:

    1. 首次执行开始的延迟时间
    2. 下一次执行的间隔时间
    3. 接收消息的目标ActorRef
    4. 消息

    
    case InitSignal=> { 
          import context.dispatcher
          context.system.scheduler.schedule(0 seconds, 5 seconds, teacherActorRef, QuoteRequest)
          //teacherActorRef!QuoteRequest
        }
    
    

      
      
    注意事项

    这里import context.dispatcher这条语句非常重要。

    schedule方法需要一个非常重要的隐式参数——ExecutionContext,看一下schedule方法的实现你就会明白为什么这个参数如此重要了:

    
    final def schedule( 
        initialDelay: FiniteDuration,
        interval: FiniteDuration,
        receiver: ActorRef,
        message: Any)(implicit executor: ExecutionContext,
                      sender: ActorRef = Actor.noSender): Cancellable =
        schedule(initialDelay, interval, new Runnable {
          def run = {
            receiver ! message
            if (receiver.isTerminated)
              throw new SchedulerException("timer active for terminated actor")
          }
        })
       
    
     
      
    schedule方法只是把消息发送封装到了一个Runnable中,而它最终是由传进来的ExecutionContext来执行的。

    为了使得ExecutionContext在作用域内隐式可用,我们使用到了在上下文中可用的隐式分发器。

    摘自ActorCell.scala

    
    /**
       * Returns the dispatcher (MessageDispatcher) that is used for this Actor.
       * Importing this member will place an implicit ExecutionContext in scope.
       */
      implicit def dispatcher: ExecutionContextExecutor
     
    
      

    代码


    同样的,项目的完整代码可以从Github中进行下载。



    原创文章转载请注明出处:http://it.deepinmind.com

    英文原文链接
    3
    1
    分享到:
    评论

    相关推荐

      Akka 实战 akka in action v13 2014版本

      Akka的应用程序结构设计为高度模块化和可配置的,这允许开发者在不同环境中快速调整应用程序的行为。 Akka的容错性也是书中的重点之一。Akka通过监督策略和容错机制,提供了系统在面对错误时的自动恢复能力。这种...

      akka java实现tcp远程调用

      它是整个Akka应用的入口点,负责创建、管理和调度Actor。我们首先需要创建一个`ActorSystem`实例,并指定一个名称,例如`"RemotingSystem"`。接着,我们需要配置远程部署(remote deployment)以启用TCP通信。在`...

      akka-quartz-scheduler:Quartz扩展和实用程序,用于Akka中的cron式调度

      阿卡石英计 Quartz扩展和实用程序,可在Akka 2.6.x中进行真正的调度。 当前版本是针对Scala 2.13.x和Akka 2.6.x构建的,可在Maven Central上使用。 如果您想支持Scala和Akka的其他组合,只需将您的请求发布在问题...

      akka实例参考

      1. **Actor系统**:Akka中的核心组件是Actor系统,它是所有Actor的容器,负责管理和调度Actor。每个Actor系统都有一个根Actor,其他Actor都是其子Actor。 2. **Actor**:Actor是Akka的基本执行单元,它通过消息传递...

      AkkaJava PDF Doc

      Akka是一个建立在Actor模型之上的工具包,它提供了一套用于构建并发、分布式和容错应用程序的API和服务。Akka可以运行在单个JVM上,也可以配置为在分布式系统中运行,其中的Actor可以跨多台机器进行通信。 #### ...

      akka-kryo-serialization, 基于Kryo的Akka序列化.zip

      akka-kryo-serialization, 基于Kryo的Akka序列化 akka-kryo-serialization-- Scala 和Akka基于kryo的序列化程序这个库为 Scala 和Akka提供定制的基于kryo的序列化程序。 它可以用于更高效的akka远程处理。它还可以...

      akka的教学

      Actor 系统是 Akka 的核心组件之一,负责管理 Actor 的生命周期。本节详细解释了如何创建 Actor 系统、如何在其内部创建 Actor 实例以及如何配置 Actor 系统的属性。 ##### 2.3 什么是 Actor? Actor 是 Akka 中最...

      akka java document

      - **Actor:** Akka 的核心组件之一,它负责接收并处理消息。 - **Actor 系统:** 管理 Actor 的容器,负责 Actor 的生命周期管理。 - **消息传递:** Actor 之间通信的主要方式,通过发送消息来进行交互。 - **监督...

      Akka开发库文档

      - 介绍了Akka的监督、监控、故障容错、类型化Actor、调度器、消息箱、路由、有限状态机Actor构建、持久化以及测试Actor系统的相关知识。 - 详述了如何使用Lambda表达式在Java中编写Actor、实现故障容错、状态机和...

      c#分布式框架akka范例

      在**akkadotnet-code-samples-master**这个压缩包中,你可能会找到不同类型的示例代码,如基本Actor的创建和交互,Actor的远程部署,集群配置,以及使用流处理数据等。这些示例将帮助你理解和掌握如何在实际项目中...

      Learning Akka(PACKT,2015)

      Akka is a distributed computing toolkit that enables developers to build correct concurrent and distributed applications using Java and Scala with ease, applications that scale across servers and ...

      akka2.0使用方法

      12. **调度器**:Akka的调度器负责安排Actor的消息处理,可以自定义调度策略以满足特定需求。 13. **网络通信**:Akka使用TCP或UDP协议进行网络通信,提供高效的远程Actor调用。 14. **测试Actor**:Akka提供了`...

      Akka.net分布式数据传输

      在分布式系统中,数据传输是核心功能之一,Akka.NET 提供了高效且可靠的手段来实现在网络中的节点间通信。本文将深入探讨 Akka.NET 的分布式数据传输机制及其应用。 ### 1. Akka.NET 概述 Akka.NET 是基于 Actor ...

      Akka Java文档

      Akka 支持丰富的配置选项,包括 Actor 的创建参数、调度策略等。这些配置可以通过配置文件或者代码动态设置。 #### 三、深入 Actor ##### 3.1 Actors 深入探讨 Actor 的各个方面,包括创建 Actor、定义行为、消息...

      scala akka

      - **Building Akka**:介绍了如何构建Akka项目,包括依赖管理、编译配置等。 - **Multi-JVM Testing**:多JVM测试确保了在不同环境下的兼容性。 - **I/O Layer Design**:I/O层的设计对于处理大量数据非常重要。 - *...

      akka-in-action完整示例源代码

      5. **调度器**:Akka内置的调度器负责管理Actor的工作线程,可以根据配置进行定制,以适应不同的性能需求。 6. **流(Streams)**:Akka Streams提供了一种处理数据流的声明式方式,它遵循反应式宣言,支持背压机制...

      akka-in-action完整源代码

      - **路由与调度**:Akka的路由机制如何将消息分发到多个Actor,以及调度器如何决定哪个Actor应该处理消息。 - **监控与故障恢复**:学习如何配置和使用SupervisorStrategy,以及如何处理Actor故障和重启。 - **...

      Akka开发库开发包

      Akka的Actor系统可以轻松地创建、管理和调度大量Actor,这使得构建大规模并发应用成为可能。 Akka支持Java和Scala两种编程语言,提供了丰富的API供开发者使用。标签中的"JAVA Scala"表明这个开发包同时兼容这两种...

    Global site tag (gtag.js) - Google Analytics