`
spartan1
  • 浏览: 365252 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

akka源码分析

 
阅读更多

看akka源码的一些体会,没有列出源码来。akka代码主要包括两块:底层分发(akka.dispatch包)和上层模型(akka.actor包),从底层线程调度(dispatch)往上看起

 

函数式语言主要处理表达式求值,面向对象语言主要处理对象间消息发送消息。

 

 

1. 底层线程调度

 

Doug Lea: ForkJoinTask

ForkJoinTask是用少数线程执行海量独立任务的极好架构,这里的独立任务指的是任务和任务之间不要有共享数据,否则会有并发访问的问题。

ForkJoinTask的实现包括三个类:

ForkJoinPool: 实现了ExecutorService,提供execute、submit等线程池基本方法,池中的线程都是ForkJoinWorkerThread;

ForkJoinWorkerThread: 继承自Thread,包含了自己的ForkJoin任务队列,在处理完自己任务队列中任务的时候,可以从其他Worker的队列中

偷任务来执行;

ForkJoinTask: 实现了Future接口,可以直接作为ForkJoinPool.submit的返回值,提供的fork方法将自己放到当前Worker线程的任务队列中,

join方法让当前线程等待任务完成,或者通过偷过来等方式自己执行该任务

 

为了性能考虑,这三个类紧耦合,存在大量互相访问成员属性的情况,Doug Lea老先生说,这种比较ugly的实现,能让性能提高四倍,可以每秒

处理10亿级别的ForkJoin任务。

为了处理并发,大量使用了sun.misc.Unsafe类中提供的直接对内存的CAS(compare and swap)原子操作,为了解决可能的乱序执行

导致的问题,整个代码中都充斥着在if条件判断中对变量赋值的操作,感觉就是在看C代码。

 

ForkJoinTask是多核单进程版本的MapReduceJob。

 

 

2. 上层actor模型

 

Actor是用户态定义的类型,用户能够看到的Actor都是从这个类型来的。用户能看到的actor是trait akka.actor.Actor,这个只是actor对外的

一个门面,actor要访问actor系统内部的功能,基本上都要通过ActorContext来访问。

 

ActorCell是actor的内部表示,实现了ActorContext这个trait,所有的功能基本上都是在ActorCell提供的。ActorCell占用64字节。

 

ActorContext是从actor的角度看到的ActorCell的视图,提供了设置接收超时、自身引用、become/unbecome、获取sender引用、

获取children引用列表、获取MessageDispatcher、获取ActorSystem、获取parent引用、watch/unwatch一个actor的方法,

因ActorContext继承了ActorRefFactory,所以也有actorOf、actorFor等创建/获取actorRef的能力。

 

Actor/ActorCell和enipcore的Service/ServiceBase概念一模一样,都是一个是系统外面向用户的,一个是系统内进行调度的。

 

ActorRef是用户看到的对Actor的引用,任何对actor的访问,都是通过ActorRef来的。ActorRef提供了获取path、tell/forward消息的功能

 

实际上内部是使用一个InternalActorRef来表示ActorRef的,InternalActorRef继承自ActorRef,提供了Actor生命周期管理的接口。

LocalActorRef实现了InternalActorRef,是本节点中真正的actorRef实现,其中会创建并启动ActorCell。

 

ActorSystem在创建时,LocalActorRefProvider会创建rootGuardian(根actor),然后rootGuardian下会创建面向用户态actor的

guardian,这两个都是InternalActorRef,是通过直接new LocalActorRef创建出来的,这两个guardian的Actor类都是Guardian。

 

在actor内部创建子actor时,执行的是context.actorOf方法,context实际上就是ActorCell,ActorCell.actorOf调用了

LocalActorRefProvider.actorOf方法,直接new一个LocalActorRef出来,而新创建的LocalActorRef会创建ActorCell,并调用其

start方法,ActorCell.start方法中,将创建mailbox,并向mailbox中发送一个Create系统消息,然后让dispatcher开始调度mailbox

 

执行ActorSystem.actorOf方法创建actor时,实际上向guardian这个Actor发送CreateChild消息,让它创建一个actor。guardian在

收到CreateChild消息时,调用context.actorOf方法创建新actor,这个就与在actor内部创建子actor的做法一样了。

 

 

3. Actor模型和线程模型如何结合

 

MessageQueue实现了入队列enqueue(receiver:ActorRef, handle: Envelope),出队列dequeue():Envelope

SystemMessageQueue提供了systemEnqueue(receiver:ActorRef, message: SystemMessage),全部出队列systemDrain():SystemMessage方法。

其中,Envelope封装了message:Any和sender:ActorRef两个成员,而SystemMessage实际上是一个LinkedList,包含了所有的系统消息。

 

MailBox继承自系统消息队列SystemMessageQueue,实现了Runnable接口,同时包含了一个ActorCell成员,一个MessageQueue成员

MailBox代理了MessageQueue的所有方法。MessageQueue的具体类型,根据MailBoxType的不同而不同,比如UnboundedMailbox将创建ConcurrentLinkedQueue

 

Dispatchers根据ID生成Dispatcher,ActorSystem中有一个默认的dispatcher,dispatcher底层有executor,executor有两种ForkJoinExecutor和

ThreadPoolExecutor,默认是ForkJoinExecutor。

 

另外,scala中的val都是在对象初始化时就执行的

 

3.1 在创建ActorSystem时,初始化默认的dispatcher,使用默认的ForkJoinPool(ExecutorService)

3.2 在使用actorRef ! Message发送消息时,调用了actorRef对应的actorCell.tell方法,其中调用了dispatcher.dispatch方法

    dispatch(akka/dispatch/Dispather.scala)中做了两件事:

      一是将消息放到actorCell的消息队列中(mbox.enqueue(receiver.self, invocation))

      二是调用dispather底层的线程池executor.execute(mbox)(registerForExecution(mbox, true, false))执行mbox.run()方法

        而mbox.run()中,将先从SystemMessage链表中处理系统消息,然后从MessageQueue成员中处理用户消息。处理系统消息时,

        调用actorCell.systemInvoke方法,将所有的系统消息顺序全部处理完;处理用户消息时,调用actorCell.invoke方法,根据dispatcher

        的throughput决定本次处理多少条消息,根据dispatcher的throughputDeadlineTime决定本次处理多长时间,时间长度在处理

        完一条消息后检查一次。

 

    对于ForkJoinPool这种executor,每次执行execute(mbox)时,实际上都是先创建一个继承自ForkJoinTask的MailboxExecutionTask,

    其中的exec方法调用mbox.run方法,因此每次执行都会创建一个ForkJoinTask对象。

 

    还有一点,消息队列都是放到actor对应的mailbox中(以Envelope的形式封装消息本身和sender),而执行的task对象会放到Executor的

    每个线程对应的工作队列中,task和消息分别使用不同的队列。

 

 

4. 定时处理

 

actorSystem在初始化时,会创建scheduler。scheduler内部维护HashedWheelTimer定时器,schedular提供schedule、scheduleOnce等方法,

可以在指定时间之后执行一个task,或者向某个actor发送一个消息。执行task时,使用system.dispatcher执行。

 

schedule主要在状态机FSM、actor.receive接收超时中使用。actor.receive中使用时,首先实现actor.preStart方法,其中调用setReceiveTimeout设置超时时间,在每个receive方法中,需要能够处理ReceiveTimeout事件,如果需要再次超时时,需要再次设置超时事件。只有receive处理完了所有的事件并且设置了超时事件后,超时才会被再次设置

 

内部实现上,actorCell通过调用checkReceiveTimeout方法调用系统scheduler设置一个一次性的超时事件。在actorCell处理Create系统消息时,创建了actor后,首先调用其actor.preStart方法,然后执行checkReceiveTimeout判断是否设置超时。

 

 

5. FSM的实现

 

akka提供了FSM的实现,该实现基于actor模型,提供了状态与状态数据定义、超时等一系列状态机相关的模型和方法

 

 

6. akka如何与耗时系统进行交互,即akka如何与外部系统进行适配(待续)

 

 

7. 在play中的应用(待续)

 

总结:

akka中重点的类都在akka.actor和akka.dispatch两个包中。前者提供了actor模型的抽象和语义,后者提供了底层执行机制。

ActorSystem是系统的控制中心,这里汇聚了用于线程调度的dispatcher,用于定时处理的scheduler,用于创建actor的provider。

dispatcher提供了dispatch/dispatchSystem/execute等多种执行轻量级任务的方法

 

akka中,还有监控(supervise)、Promise/Future、与外部系统交互、Patterns、路由还没有看,暂时不看了。


分享到:
评论
1 楼 rhtwj6231 2013-05-28  
你好,请问有没方法获取MailBox中的队列长度呢?或者通过ActorCell是否可以获取?

相关推荐

    akka-in-action完整源代码

    3. Akka源码分析: - **Actor定义**:通过阅读源码,我们可以理解如何定义一个Actor,包括其接收的消息类型、行为函数和生命周期方法。 - **路由与调度**:Akka的路由机制如何将消息分发到多个Actor,以及调度器...

    akka-streams-kafka-examples-源码.rar

    6. **源码分析** - **Example代码**: 压缩包中的源码提供了各种使用场景,如简单的生产消费、分组聚合、水印窗口等,通过对这些示例的学习,可以更深入地理解Akka Streams与Kafka的集成。 总结,"akka-streams-...

    Spark源码分析.pdf

    《Spark源码分析》这本书是针对那些希望深入了解大数据处理框架Spark以及与其紧密相关的Hadoop技术的专业人士所编写的。Spark作为一个快速、通用且可扩展的数据处理引擎,已经在大数据领域占据了重要地位,而深入...

    java8集合源码分析-akka-comparison:阿卡比较

    集合源码分析 java 实例来自from Oracle官方并发教程 Thread Objects 线程对象 #####定义并启动一个线程 任务:创建线程打印hello world com.oracle.sec2.thread_objects.HelloThread 定义并启动一个Actor也是很简单...

    akka的教学

    列举了一些实际项目中 Akka 的成功应用案例,如金融交易系统、游戏服务器、实时数据分析平台等,帮助读者了解 Akka 如何解决实际问题。 #### 二、一般概念 这一部分深入探讨了 Akka 的核心概念和技术细节。 ##### ...

    spark源码分析.pdf

    Spark源码分析是一项深入理解Apache Spark内部工作机制的重要途径。Apache Spark是一个快速、通用、可扩展的大数据处理平台,拥有着内存计算的特性,它提供了RDD(弹性分布式数据集)、DAG(有向无环图)、stage、...

    Akka Actor模型开发库 v2.6.12-源码.zip

    《Akka Actor模型开发库 v2.6.12 源码解析》 Akka是基于Actor模型的高性能、高并发、分布式计算框架,它由Lightbend公司(前身...对于想深入了解Akka的开发者来说,阅读和分析v2.6.12版本的源码是一个很好的实践途径。

    Spark 核心思想与源码分析.7z

    《Spark核心思想与源码分析》是一份深入探讨Apache Spark技术的资料,旨在帮助读者理解Spark的内在工作原理,从而更好地应用和优化这个大数据处理框架。Spark作为一个分布式计算框架,以其高效、易用和可扩展性赢得...

    Spark源码分析3-The connect between driver,master and excutor

    《Spark源码分析3——驱动器、主节点与执行器之间的连接》 在Spark的分布式计算框架中,驱动器(Driver)、主节点(Master)和执行器(Executor)是核心组件,它们之间的通信和协作构成了Spark作业执行的基础。本文将深入...

    akka-ignite:Apache Ignite与Typesafe Akka的集成

    4. **事件驱动的实时处理**:Akka的事件驱动特性与Ignite的流处理能力相结合,可以实现实时的数据分析和响应。 5. **网络通信优化**:Akka的网络层与Ignite的通信机制集成,可以优化跨节点的数据传输,提高通信效率...

    flink源码分析

    《Flink源码分析》 Flink作为一款流行的开源大数据处理框架,以其高效的流处理和批处理能力在业界获得了广泛的应用。深入理解Flink的源码对于开发者来说至关重要,这不仅可以帮助我们更好地利用Flink的功能,还能让...

    akka-stream-xray:Akka 流可视化

    `akka-stream-xray-master`这个压缩包文件可能包含了项目的源代码,你可以通过阅读源码来了解如何集成和使用这个工具。 **集成Akka Stream X-Ray** 1. **添加依赖**:在你的`build.sbt`或`pom.xml`文件中,添加...

    Akka-Patterns:Akka工作拉动模式可防止邮箱溢出,限制和分配工作

    通过阅读和分析源码,我们可以学习如何在实际项目中实现工作拉动模式,优化Akka Actor系统的性能和稳定性。 总之,Akka工作拉动模式是处理并发问题的一种强大工具,它通过控制任务的分配和执行,有效地防止了邮箱...

    gossip-girl:实时同步服务器,使用 Akka & Redis

    通过对该项目的分析,我们将揭示Akka与Redis如何协同工作,以实现高效、可靠的分布式数据通信。 首先,让我们来了解Akka。Akka是由Lightbend公司开发的一个开源框架,专为构建高度并发、分布式和反应式的应用程序而...

    akkaflow-源码.rar

    《Akka流:深入解析源码》 ...通过深入理解和分析"akkaflow-源码.zip",不仅可以掌握Akka流的基本工作原理,还可以学习到高级的并发控制和数据处理技巧,这对于提升你的Scala和分布式系统开发能力具有极大帮助。

    scala 源码

    Akka库是Scala中实现Actor模型的重要框架,源码分析可以帮助我们了解Actor是如何工作的。 模式匹配是Scala的另一大特色,它不仅用于解构复杂数据结构,还能用于控制流程。在源码中,你会看到如何通过模式匹配写出...

    SparkCore:Spark核心分析,主要包含SparkContext源码,执行程序启动,阶段划分,任务执行和Spark2.0的新特性

    Spark(基于1.3.1)源码分析主要针对于Spark源码分析,对于比较重要的方法和代码,有注释,在熟悉的Spark源码之前,首先必须了解Akka的通信,如果不了解的可以看一下我的Demo,单击此处 ,这里主要进行的源码分析是...

    scala-2.11.8源码

    源码分析可以从以下几个方面展开: 1. **类型系统**:Scala的类型系统是其强大特性的基石。包括模式匹配、高阶类型、类型推断、隐式转换等。例如,`Any`, `AnyVal`, `AnyRef` 是Scala的基础类型,它们与Java的`...

    nbm-maven-plugin-3.13.2.zip

    在项目源码akka-tracing-master中,我们可以看到Akka Tracing的实现细节,包括如何启动和配置tracing,如何添加自定义事件,以及如何与其他工具进行集成。通过对源码的学习,开发者可以更深入地理解Akka Tracing的...

Global site tag (gtag.js) - Google Analytics