Akka 和 Storm 的设计差异
Akka 和 Storm 都是实现低延时, 高吞吐量计算的重要工具. 不过它们并非完全的竞品,
如果说 Akka 是 linux 内核的话, storm 更像是类似 Ubuntu 的发行版.然而 Storm
并非 Akka 的发行版, 或许说 Akka 比作 BSD, Storm 比作 Ubuntu 更合适.
实现的功能差异
Akka 包括了一套 API 和执行引擎.
Storm 除了 API 和执行引擎之外,还包括了监控数据,WEB界面,集群管理,消息传递保障机制.
此文讨论 Akka 和 Storm 重合的部分,也就是 API 和 执行引擎的异同.
API 差异
我们看下 Storm 两个主要的 API
public interface ISpout extends Serializable {
/**
* Called when a task for this component is initialized within a worker on the cluster.
* It provides the spout with the environment in which the spout executes.
*
* <p>This includes the:</p>
*
* @param conf The Storm configuration for this spout. This is the configuration provided to the topology merged in with cluster configuration on this machine.
* @param context This object can be used to get information about this task's place within the topology, including the task id and component id of this task, input and output information, etc.
* @param collector The collector is used to emit tuples from this spout. Tuples can be emitted at any time, including the open and close methods. The collector is thread-safe and should be saved as an instance variable of this spout object.
*/
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
/**
* Called when an ISpout is going to be shutdown. There is no guarentee that close
* will be called, because the supervisor kill -9's worker processes on the cluster.
*
* <p>The one context where close is guaranteed to be called is a topology is
* killed when running Storm in local mode.</p>
*/
void close();
/**
* Called when a spout has been activated out of a deactivated mode.
* nextTuple will be called on this spout soon. A spout can become activated
* after having been deactivated when the topology is manipulated using the
* `storm` client.
*/
void activate();
/**
* Called when a spout has been deactivated. nextTuple will not be called while
* a spout is deactivated. The spout may or may not be reactivated in the future.
*/
void deactivate();
/**
* When this method is called, Storm is requesting that the Spout emit tuples to the
* output collector. This method should be non-blocking, so if the Spout has no tuples
* to emit, this method should return. nextTuple, ack, and fail are all called in a tight
* loop in a single thread in the spout task. When there are no tuples to emit, it is courteous
* to have nextTuple sleep for a short amount of time (like a single millisecond)
* so as not to waste too much CPU.
*/
void nextTuple();
/**
* Storm has determined that the tuple emitted by this spout with the msgId identifier
* has been fully processed. Typically, an implementation of this method will take that
* message off the queue and prevent it from being replayed.
*/
void ack(Object msgId);
/**
* The tuple emitted by this spout with the msgId identifier has failed to be
* fully processed. Typically, an implementation of this method will put that
* message back on the queue to be replayed at a later time.
*/
void fail(Object msgId);
}
以及
public interface IBasicBolt extends IComponent {
void prepare(Map stormConf, TopologyContext context);
/**
* Process the input tuple and optionally emit new tuples based on the input tuple.
*
* All acking is managed for you. Throw a FailedException if you want to fail the tuple.
*/
void execute(Tuple input, BasicOutputCollector collector);
void cleanup();
}
和 akka 中 actor 的 api
trait Actor {
import Actor._
// to make type Receive known in subclasses without import
type Receive = Actor.Receive
/**
* Stores the context for this actor, including self, and sender.
* It is implicit to support operations such as `forward`.
*
* WARNING: Only valid within the Actor itself, so do not close over it and
* publish it to other threads!
*
* [[akka.actor.ActorContext]] is the Scala API. `getContext` returns a
* [[akka.actor.UntypedActorContext]], which is the Java API of the actor
* context.
*/
implicit val context: ActorContext = {
val contextStack = ActorCell.contextStack.get
if ((contextStack.isEmpty) || (contextStack.head eq null))
throw ActorInitializationException(
s"You cannot create an instance of [${getClass.getName}] explicitly using the constructor (new). " +
"You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.")
val c = contextStack.head
ActorCell.contextStack.set(null :: contextStack)
c
}
/**
* The 'self' field holds the ActorRef for this actor.
* <p/>
* Can be used to send messages to itself:
* <pre>
* self ! message
* </pre>
*/
implicit final val self = context.self //MUST BE A VAL, TRUST ME
/**
* The reference sender Actor of the last received message.
* Is defined if the message was sent from another Actor,
* else `deadLetters` in [[akka.actor.ActorSystem]].
*
* WARNING: Only valid within the Actor itself, so do not close over it and
* publish it to other threads!
*/
final def sender(): ActorRef = context.sender()
/**
* This defines the initial actor behavior, it must return a partial function
* with the actor logic.
*/
//#receive
def receive: Actor.Receive
//#receive
/**
* INTERNAL API.
*
* Can be overridden to intercept calls to this actor's current behavior.
*
* @param receive current behavior.
* @param msg current message.
*/
protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = receive.applyOrElse(msg, unhandled)
/**
* Can be overridden to intercept calls to `preStart`. Calls `preStart` by default.
*/
protected[akka] def aroundPreStart(): Unit = preStart()
/**
* Can be overridden to intercept calls to `postStop`. Calls `postStop` by default.
*/
protected[akka] def aroundPostStop(): Unit = postStop()
/**
* Can be overridden to intercept calls to `preRestart`. Calls `preRestart` by default.
*/
protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = preRestart(reason, message)
/**
* Can be overridden to intercept calls to `postRestart`. Calls `postRestart` by default.
*/
protected[akka] def aroundPostRestart(reason: Throwable): Unit = postRestart(reason)
/**
* User overridable definition the strategy to use for supervising
* child actors.
*/
def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
/**
* User overridable callback.
* <p/>
* Is called when an Actor is started.
* Actors are automatically started asynchronously when created.
* Empty default implementation.
*/
@throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
//#lifecycle-hooks
def preStart(): Unit = ()
//#lifecycle-hooks
/**
* User overridable callback.
* <p/>
* Is called asynchronously after 'actor.stop()' is invoked.
* Empty default implementation.
*/
@throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
//#lifecycle-hooks
def postStop(): Unit = ()
//#lifecycle-hooks
/**
* User overridable callback: '''By default it disposes of all children and then calls `postStop()`.'''
* @param reason the Throwable that caused the restart to happen
* @param message optionally the current message the actor processed when failing, if applicable
* <p/>
* Is called on a crashed Actor right BEFORE it is restarted to allow clean
* up of resources before Actor is terminated.
*/
@throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
//#lifecycle-hooks
def preRestart(reason: Throwable, message: Option[Any]): Unit = {
context.children foreach { child ⇒
context.unwatch(child)
context.stop(child)
}
postStop()
}
//#lifecycle-hooks
/**
* User overridable callback: By default it calls `preStart()`.
* @param reason the Throwable that caused the restart to happen
* <p/>
* Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash.
*/
@throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
//#lifecycle-hooks
def postRestart(reason: Throwable): Unit = {
preStart()
}
//#lifecycle-hooks
/**
* User overridable callback.
* <p/>
* Is called when a message isn't handled by the current behavior of the actor
* by default it fails with either a [[akka.actor.DeathPactException]] (in
* case of an unhandled [[akka.actor.Terminated]] message) or publishes an [[akka.actor.UnhandledMessage]]
* to the actor's system's [[akka.event.EventStream]]
*/
def unhandled(message: Any): Unit = {
message match {
case Terminated(dead) ⇒ throw new DeathPactException(dead)
case _ ⇒ context.system.eventStream.publish(UnhandledMessage(message, sender(), self))
}
}
}
可以说 Storm 主要的 API 和 Actor 非常相像, 不过从时间线上看 Storm 和 Akka
都是从差不多的时间开始开发的,因此很有可能 Storm 是作者受了 Erlang 的 Actor 实现启发而写的.
从目前的状况看来, 很有可能作者想用 Clojure 语言写一个"朴素"的 Actor 实现, 然而这个"朴素"实现已经满足了 Storm 的设计目标, 所以作者也没有继续把 Storm 变成一个 Actor 在 clojure 上的完整实现.
那么,仅仅是从 API 上看的话 Spout/Bolt 和 Actor 的差异有哪些呢?
Storm API 比 Actor 多的地方
Storm 在 API 上比 Actor 多了 ack 和 fail 两个接口. 有这两个接口主要是因为 Storm 比 Akka 的应用场景更加细分(基本上只是用于统计), 所以已经做好了容错机制,能让在这个细分领域的用户达到开箱可用.
另外,在 Storm 的 Tuple 类中存储着一些 context 信息,也是出于目标使用场景的需求封装的.
Actor API 比 Storm 多的地方
context: Spout 的 open 方法里也有 context, 然而 context 在 actor 中是随时可以调用的,表明 Actor 比 Spout 更加鼓励用户使用 context, context 中的数据也会动态更新.
self: Actor对自身的引用,可以理解为 Actor 模型更加支持下游收到数据的组件往上游回发数据的行为,甚至自己对自己发数据也可以.在 Storm 中,我们默认数据发送是单向的,下游接收的组件不会对上游有反馈(除了系统定义的ack,和fail)
postRestart: 区分 Actor 的第一次启动和重启, 还是蛮有用的,Storm 没有应该是最初懒得写或者没想到,后来又不想改核心 API.
unhandled: 对没有预期到会发送给自身的消息做处理,默认是传到一个系统 stream,因为 Actor 本身是开放的,外部应用只要知道这个 Actor 的地址就能发消息给它.Storm 本身只接收你为它设计好的消息,所以没有这个需求.
运行时差异
Actor 和 Task 的比较, 线程调度模型的不同, 以及代码热部署,Storm 的 ack 机制对异步代码的限制等.
Actor 和 Component 的比较
Component 是 Spout 和 Bolt 的总称,是 Storm 中执行用户代码的基本组件. 共同点是都根据消息做出响应,也能够存储内容,一次只有一个线程进入,除非你手动另外开启线程.主要的区别在于 Actor 是非常轻量的组件,你可以在一个程序里创建几万个 Actor, 或者每十行代码都在一个 Actor 里, 这都没有问题. 然而换成 Storm 的Component, 情况都不一样了,你最好只用若干个 Component 来描述顶层抽象.
线程调度模型
API 很相似,为什么 Actor 可以随便开新的, Component 就要尽量少开呢? 秘密都在 Akka 的调度器(Dispatchers)里. Akka 程序的所有异步代码,包括 Actor,Future,Runnable 甚至ParIterable, 可以说除了你要用主线程启动ActorSystem外,其他所有线程都可以交给Dispatcher管理.Dispatcher 可以自定义,默认的情况下采用了 "fork-join-executor",相对于一般的线程池,fork-join-executor 特别适合 Actor模型,可以提供相当优异的性能.
相比较的, Storm 的线程调度模型就要"朴素"很多,就是每个 Component 一个线程,或者若干个Component轮流共用一个线程,这也就是为什么Component不能开太多的原因.
代码热部署
实时计算方面,热部署的需求主要是诸如修改排序算法之类的,替换某个算法模块,其他东西不变.
因为 Storm 是可以通过 Thrift 支持任何语言编程的,所以你如果是用python之类的脚本语言写的算法,想要换掉算法而不重启,那只要把每台机器上相应位置的py文件替换掉就好了.不过这样就会让程序限定在用此类语言实现.
Akka 方面, 因为 Actor 模型对进程内和进程间的通信接口都是统一的, 可以负责算法的一类 Actor 作为单独的进程启动,代码更新了就重启这个进程. 虽然系统中有一个进程重启了,但是整个系统还是可以一刻不停地运转.
Storm 中的 Ack 机制
Storm 的消息保障机制是 具有独创性的, 利用位亦或能够用非常小的内存,高性能地掌握数据处理过程中的成功或失败情况. 默认的情况下,在用户的代码中只需要指定一个MessageId, Ack 机制就能愉快地跑起来了. 所以通常用户不用关心这块内容, 但是默认接口的问题就是, 一旦使用了异步程序, ack 机制就会失效,包括 schedule 和 submit runnable 等行为,都不会被 Ack 机制关心,也就是说异步逻辑执行失败了,acker也不知道. 如何能让 Storm 的 Ack 机制与异步代码和谐相处,还是一个待探讨的问题.
总结
我认为 Storm 的 API 是优秀的, 可靠性也是在若干年的实践中得到证实的, 然而其核心运转机制过于朴素又给人一种烈士暮年的感觉. Storm 最初的使用者 Twitter 也在不久前公布了他们兼容 Storm 接口的新的解决方案 Heron, 不过并没有开源. 如果有开源方案能够基于 Akka "重新实现" 一个 Storm,那将是非常令人期待的事情. 我目前发现 gearpump是其中一个.
参考资料
https://segmentfault.com/a/1190000003886656
http://blog.csdn.net/jmppok/article/details/17267585
http://www.linuxidc.com/Linux/2015-12/126439.htm
相关推荐
另外,本书介绍了 Actor 模型的一个实现框架 Akka 以及它的工具,而后讨论了在充分利用 actor 架构的基础上使用 Akka 框架来设计软件系统的方法,以及使用它来开发并发性和分布式应用程序的方怯。本书还介绍了领域 ...
Akka是一个用于构建分布式、高并发、容错性应用程序的开源库和运行时环境,它建立在Actor模型上。在2014年版本的《Akka实战》一书中,作者深入讲解了Akka的核心概念、设计思想以及如何利用Akka构建实际的应用程序。 ...
AKKA强调的是“高并发设计”,这包括高CPU利用率、低延迟、高吞吐量和良好的可扩展性。它通过消息传递的方式实现这些特性,而不是依赖于共享状态。在AKKA中,开发者不需要考虑线程、锁、并发集合和线程通知等底层...
综上所述,文档涉及的知识点丰富,覆盖了Actor模型和Akka框架的介绍,分布式应用程序设计实践,领域驱动设计方法,以及如何将这些理论和实践结合起来创建高效的分布式系统。同时,文档中还含有书籍的版权和出版信息...
《Akka应用模式:分布式应用程序设计实践指南》是一本深入探讨Akka框架的书籍,旨在帮助开发者理解和掌握在分布式环境中构建高效、容错的应用程序。Akka是基于Actor模型的并行和分布式计算库,使用Scala和Java语言,...
Akka 是一个强大的工具包和框架,主要用于构建高度并发、分布式和反应式的应用程序,它基于actor模型。在Java中,Akka可以用来实现TCP远程调用,这使得不同系统之间能够通过网络进行通信。下面我们将深入探讨如何...
Akka.NET是一个强大的工具包,它为.NET开发者提供了一种实现高性能、高可扩展性和容错性的分布式计算的途径。这个框架受到了Scala平台上的Akka的启发,将Actor模型引入了.NET世界。在这个"**c#分布式框架akka范例**...
它的核心设计理念是Actor模型,这使得它能够处理大量的并发操作,同时保持系统的可扩展性和容错性。 Akka 实例参考通常会包含一系列的代码示例,这些示例旨在帮助初学者理解并掌握Akka的关键概念和用法。以下是一些...
Akka是用Scala编写的,但在Java中也可以方便地使用,它提供了一个强大的actor模型来处理并发问题,使得在高度并行和分布式系统中编程变得更加简单。在这个框架中,协程(或称为actors)被用来替代传统的多线程,以...
Akka is a distributed computing toolkit that enables developers to build correct concurrent and distributed applications using Java and Scala with ease, applications that scale across servers and ...
Akka是一个用Scala语言开发的高性能、分布式计算框架,它主要在Java和Scala环境中使用。Akka库的设计理念是提供一种高效、容错且可扩展的解决方案,用于构建高度并发、分布式以及反应式的应用程序。标题中的"Akka...
《Learning Akka》这本书是关于Akka框架的深入学习指南,它主要面向那些希望掌握分布式系统设计和并行计算技术的开发人员。Akka是一个用Scala语言构建的高性能、容错的工具包,广泛应用于Java和Scala环境中。本书...
Akka框架是基于Java虚拟机(JVM)平台的一个强大工具,它专为构建高可用性、高并发和分布式系统而设计。Akka的核心理念是利用actor模型来处理并发问题,提供了一种简单而高效的方式来管理复杂的并发状态,从而避免了...
Akka 作为一种强大的工具集,不仅提供了丰富的 API 和设计模式来构建高并发、分布式系统,而且还强调了响应式编程的重要性。通过学习 Akka,开发者能够更加轻松地面对现代软件开发中的挑战,构建出既高效又可靠的...
Akka是一个基于Scala语言的开发框架,由Jonas Boner主持开发,旨在为Java程序员提供一套强大的并发工具和抽象模型。由于Akka基于JVM,因此它也兼容包括Java在内的其他运行在JVM上的语言。Akka的核心思想是利用Actor...
Akka是一个用Scala和Java编写的开源工具包和运行时,用于构建并发、分布式和容错的事件驱动应用程序。Akka基于Actor模型,这是一种并发模型,可以在多核CPU系统上有效地进行扩展。在给出的文件中,提到了Akka V2.3.6...
**Akka** 是一个为构建高并发、分布式和容错系统而设计的工具包和运行时环境。本教学内容将详细介绍 Akka 的基本概念、核心功能以及如何在 Java 中使用 Akka 来构建强大的应用程序。 ##### 1.1 什么是 Akka? Akka ...
介绍 Akka Streams 的设计理念和优势。 ##### 8.2 快速入门指南 提供一个快速上手的示例。 ##### 8.3 Reactive Tweets 使用 Akka Streams 处理 Twitter 数据流的示例。 ##### 8.4 Akka Streams 设计原则 解释 ...