Storm深入理解
Storm是一个免费开源、分布式、高容错的实时计算系统。
相关示例项目:Leek——简易版实时智能选股平台
一、Storm集群架构
-
Nimbus :Storm集群的Master节点,负责分发用户代码,指派给具体的Supervisor节点上的Worker节点,去运行Topology对应的组件(Spout/Bolt)的Task。
-
Supervisor :Storm集群的从节点,负责管理运行在Supervisor节点上的每一个Worker进程的启动和终止。通过Storm的配置文件中的supervisor.slots.ports配置项,可以指定在一个Supervisor上最大允许多少个Slot,每个Slot通过端口号来唯一标识,一个端口号对应一个Worker进程(如果该Worker进程被启动)。
-
ZooKeeper :用来协调Nimbus和Supervisor,如果Supervisor因故障出现问题而无法运行Topology,Nimbus会第一时间感知到,并重新分配Topology到其它可用的Supervisor上运行。
二、Storm组件抽象
一个Topology的Spout/Bolt对应的多个Task可能分布在多个Supervisor的多个Worker内部。而每个Worker内部又存在多个Executor,根据实际对Topology的配置在运行时进行计算并分配。
- Topology :Storm对一个分布式计算应用程序的抽象,目的是通过一个实现Topology能够完整地完成一件事情(从业务角度来看)。一个Topology是由一组静态程序组件(Spout/Bolt)、组件关系Streaming Groups这两部分组成。
- Spout :描述了数据是如何从外部系统(或者组件内部直接产生)进入到Storm集群,并由该Spout所属的Topology来处理,通常是从一个数据源读取数据,也可以做一些简单的处理(为了不影响数据连续地、实时地、快速地进入到系统,通常不建议把复杂处理逻辑放在这里去做)。
- Bolt :描述了与业务相关的处理逻辑。
上面都是一些表达静态事物(组件)的概念,我们编写完成一个Topology之后,上面的组件都以静态的方式存在。下面,我们看一下提交Topology运行以后,会产生那些动态的组件(概念):
- Task :Spout/Bolt在运行时所表现出来的实体,都称为Task,一个Spout/Bolt在运行时可能对应一个或多个Spout Task/Bolt Task,与实际在编写Topology时进行配置有关。
- Worker :运行时Task所在的一级容器,Executor运行于Worker中,一个Worker对应于- - Supervisor上创建的一个JVM实例
- Executor :运行时Task所在的直接容器,在Executor中执行Task的处理逻辑;一个或多个Executor实例可以运行在同一个Worker进程中,一个或多个Task可以运行于同一个Executor中;在Worker进程并行的基础上,Executor可以并行,进而Task也能够基于Executor实现并行计算
三、Storm的流分组策略
Storm中最重要的抽象,应该就是Stream grouping了,它能够控制Spot/Bolt对应的Task以什么样的方式来分发Tuple,将Tuple发射到目的Spot/Bolt对应的Task,如下图所示:
- Shuffle grouping: This randomly distributes tuples across the target bolt's tasks such that each bolt receives an equal number of tuples.(随机分配tuple到不同的task中,保证均匀分配)
- Fields grouping: This routes tuples to bolt tasks based on the values of the fields specified in the grouping. For example, if a stream is grouped on the "word" field, tuples with the same value for the "word" field will always be routed to the same bolt task.(根据每个tuble的field值来分配到不同的task中,保证相同的值到相同的task中)
- All grouping: This replicates the tuple stream across all bolt tasks such thateach task will receive a copy of the tuple.(每个tuple被复制发送到所有相关的task中)
- Global grouping: This routes all tuples in a stream to a single task, choosingthe task with the lowest task ID value. Note that setting a parallelism hint or number of tasks on a bolt when using the global grouping is meaningless since all tuples will be routed to the same bolt task. The global grouping should be used with caution since it will route all tuples to a single JVM instance, potentially creating a bottleneck or overwhelming a specific JVM/machine in a cluster.(每个tuple会被发送到一个ID最小的task里面,慎用!容易引起性能问题)
- None grouping: The none grouping is functionally equivalent to the shuffle grouping. It has been reserved for future use.(不分组,效果和shuffle grouping差不多)
- Direct grouping: With a direct grouping, the source stream decides whichcomponent will receive a given tuple by calling the emitDirect() method.It and can only be used on streams that have been declared direct streams.(由Tupe的生产者来决定发送给下游的哪一个Bolt的Task ,这个要在实际开发编写Bolt代码的逻辑中进行精确控制)
- Local or shuffle grouping:The local or shuffle grouping is similar to the shuffle grouping but will shuffle tuples among bolt tasks running in the same worker process, if any. Otherwise, it will fall back to the shuffle grouping behavior. Depending on the parallelism of a topology, the local or shuffle grouping can increase topology performance by limiting network transfer.(如果目标Bolt有1个或多个Task都在同一个Worker进程对应的JVM实例中,则Tuple只发送给这些Task)
-
Own Stream Grouping : you can define your own stream grouping by implementing the CustomStreamGrouping interface(通过实现CustomStreamGrouping接口自定义分组)
四、Topology并行度计算
官网的栗子:
conf.setNumWorkers(2); // 该Topology运行在Supervisor节点的2个Worker进程中
topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // 设置并行度为2,则Task个数为2*1
topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
.setNumTasks(4)
.shuffleGrouping("blue-spout"); // 设置并行度为2,设置Task个数为4 ,则Task个数为4
topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
.shuffleGrouping("green-bolt"); // 设置并行度为6,则Task个数为6*1
那么,下面我们看Storm是如何计算一个Topology运行时的并行度,并分配到2个Worker中的:
- 计算Task总数:2乘1+4+6乘1=12(总计创建12个Task实例)
- 计算运行时Topology并行度:10/2=5(每个Worker对应5个Executor)
- 将12个Task分配到2个Worker中的5*2个Executor中:应该是每个Worker上5个Executor,将6个Task分配到5个Executor中
- 每个Worker中分配6个Task,应该是分配3个Yellow Task、2个Green Task、1个Blue Task
- Storm内部优化:会把同类型的Task尽量放到同一个Executor中运行
- 分配过程:从Task个数最少的开始,1个Blue Task只能放到一个Executor,总计1个Executor被占用;2个Green Task可以放到同一个Executor中,总计2个Executor被占用;最后看剩下的3个Yellow Task能否分配到5-2=3个Executor中,显然每个Yellow Task对应一个Executor
五、Bolt生命周期
Bolt是这样一种组件,它把元组作为输入,然后产生新的元组作为输出。实现一个bolt时,通常需要实现IRichBolt接口。Bolts对象由客户端机器创建,序列化为拓扑,并提交给集群中的主机。然后集群启动工人进程反序列化bolt,调用prepare,最后开始处理元组。
//为bolt声明输出模式
declareOutputFields(OutputFieldsDeclarer declarer)
//仅在bolt开始处理元组之前调用
prepare(java.util.Map stormConf, TopologyContext context, OutputCollector collector)
//处理输入的单个元组
execute(Tuple input)
//在bolt即将关闭时调用
cleanup()
六、在Storm上的topology的生命周期如下:
- 上传代码并做校验(/data/nimbus/inbox);
- 建立本地目录(/data/nimbus/stormdist/topology-id/);
- 建立zookeeper上的心跳目录;
- 计算topology的工作量(parallelism hint),分配task-id并写入zookeeper;
- 把task分配给supervisor执行;
- 在supervisor中定时检查是否有新的task,下载新代码、删除老代码,剩下的工作交个小弟worker;
- 在worker中把task拿到,看里面有哪些spout/Bolt,然后计算需要给哪些task发消息并建立连接;
- 在nimbus将topology终止的时候会将zookeeper上的相关信息删除;
七、消息的可靠处理机制
Storm内部通过一种巧妙的异或算法判读每个tuple是否被正确完整的处理。
- Spout的一个Task创建一个Tuple时,即在Spout的nextTuple()方法中实现从特定数据源读取数据的处理逻辑中,会与Acker进行通信,向Acker发送消息,Acker保存该Tuple对应信息:{:spout-task task-id :val ack-val)}。
- Bolt在emit一个新的子Tuple时,会保存子Tuple与父Tuple的关系。
- 在Bolt中进行ack时,会计算出父Tuple与由该父Tuple新生成的所有子Tuple的一个异或值,将该值发送给Acker(计算异或值:tuple-id ^ (child-tuple-id1 ^ child-tuple-id2 … ^ child-tuple-idN))。可见,这里Bolt并没有把所有生成的子Tuple发送给Acker,这要比发送一个异或值大得多了,只发送一个异或值大大降低了Bolt与Acker之间网络通信的开销。
- Acker收到Bolt发送的异或值,与当前保存的task-id对应的初始ack-val做异或,tuple-id与ack-val相同,异或结果为0,但是子Tuple的child-tuple-id等并不互相相同,只有等所有的子Tuple的child-tuple-id都执行ack回来,最后ack-val就为0,表示整个Tuple树处理成功。无论成功与失败,最后都要从Acker维护的队列中移除。
- 最后,Acker会向产生该原始父Tuple的Spout对应的Task发送通知,成功或者失败,回调Spout的ack或fail方法。如果我们在实现Spout时,重写了ack和fail方法,处理回调就会执行这里的逻辑。
当然这种异或算法存在1/2^64概率的误差,可以忽略不计。
在开发中,对于那些不允许丢失的消息我们在发送消息时要对tuple指定messageID并进行锚定,告诉tuple tree这里增加了一个新的节点,保证消息的可靠性。
collector.emit(tuple,messageId)//可靠消息
collector.emit(tuple)//不可靠的消息
collector.emit(tuple, new Values(word));//锚定发送,可靠的消息
collector.emit(new Values(word)));//非锚定发送,不可靠的消息
注意:继承BaseBasicBolt实现的API本是就是可靠性的,不需要自己进行锚定发送和调用ack以及fail方法。
八、Storm的容错机制
1、任务级容错
- Bolt任务crash引起的消息未被应答。此时,acker中所有与此Bolt任务关联的消息都会因为超时而失败,对应的Spout的fail方法将被调用。
- acker任务失败。如果acker任务本身失败了,它在失败之前持有的所有消息都将超时而失败。Spout的fail方法将被调用。
- Spout任务失败。在这种情况下,与Spout任务对接的外部设备(如MQ)负责消息的完整性。例如,当客户端异常时,kestrel队列会将处于pending状态的所有消息重新放回队列中。
2、任务槽(slot)故障
- Worker失败。每个Worker中包含数个Bolt(或Spout)任务。Supervisor负责监控这些任务,当worker失败后会尝试在本机重启它,如果它在启动时连续失败了一定的次数,无法发送心跳信息到Nimbus,Nimbus将在另一台主机上重新分配worker。
- Supervisor失败。Supervisor是无状态(所有的状态都保存在Zookeeper或者磁盘上)和快速失败(每当遇到任何意外的情况,进程自动毁灭)的,因此Supervisor的失败不会影响当前正在运行的任务,只要及时将他们重新启动即可。
- Nimbus失败。Nimbus也是无状态和快速失败的,因此Nimbus的失败不会影响当前正在运行的任务,但是当Nimbus失败时,无法提交新的任务,只要及时将它重新启动即可。
3、集群节点(机器):
- Storm集群中的节点故障。此时Nimbus会将此机器上所有正在运行的任务转移到其他可用的机器上运行。
- Zookeeper集群中的节点故障。Zookeeper保证少于半数的机器宕机系统仍可正常运行,及时修复故障机器即可。
九、Storm's DRPC Server
DRPC Server整体工作过程:
(1)接受一个RPC请求
(2)发送请求到Storm Topology
(3)执行相应操作
(4)把结果发回给客户端
参考资料:
- http://storm.apache.org/
- http://storm.apache.org/documentation.html
- http://storm.apache.org/documentation/Guaranteeing-message-processing.html
- http://storm.apache.org/documentation/Understanding-the-parallelism-of-a-Storm-topology.html
- 《Storm Blueprints : Patterns for Distributed Real-time Computation》
- 《Getting Started With Storm》
- 《Learning Storm》
- 《Storm Real-Time Event Processing》
相关推荐
在深入理解 Storm 的核心概念和特性之前,首先需要知道它的记录级容错原理,这是 Storm 强大功能的基础。 **记录级容错原理** Storm 允许 Spout 在发射源 Tuple 时指定一个 Message ID,这个 ID 可以是任意对象,...
《Storm深入学习》 在大数据实时处理领域,Apache Storm是一个不可或缺的工具,它提供了一种高效、可扩展的方式来处理无界的数据流。本篇深入探讨了Storm的核心概念和使用技巧,包括基本Bolt的实现、批处理策略、...
本文将深入探讨如何实现Storm与Kafka的集成,重点在于如何从Kafka中读取数据。 **一、整合说明** Apache Storm是一个开源的分布式实时计算系统,它能够持续处理无限的数据流,确保每个事件都得到精确一次(Exactly...
4. `docs` 目录:文档和用户指南,帮助开发者了解如何使用 Storm。 5. `examples` 目录:示例项目,用于演示如何构建和运行 Storm 作业。 6. `jars` 或 `extlib` 目录:用户可以放置自定义的库文件,以便在 Storm 中...
Storm是一个开源的分布式实时计算系统,由Twitter开发并开源,其设计目标是让实时处理变得简单、强大且可靠。在Storm中,数据流被抽象...通过深入理解和使用这个JAR包,开发者可以构建出高效、可靠的实时数据处理系统。
标题中的"storm开发jar包以及storm例子源码"表明了我们即将探讨的是关于Apache Storm的开发环境设置和示例代码。...通过学习和实践这些材料,开发者可以深入理解Storm的工作原理,掌握实时数据处理的基本技能。
PDF版资料通常包括教程、用户手册、技术文档等,帮助用户深入理解和应用Storm。这些资料可能涵盖以下关键知识点: 1. **Storm架构**:Storm由多个组件构成,如Nimbus(主控节点)、Supervisor(工作节点)、Worker...
在提供的文件名“storm-book-examples-ch02-getting_started-8e42636”中,我们可以推断这是某个关于Storm入门的章节,可能包含逐步指导和示例代码,帮助读者了解如何开始使用Storm。另一文件名“storm-book-...
通过"storm demo"项目,你将有机会深入了解如何利用Apache Storm搭建实时数据处理系统,掌握Spout、Bolt和Topology的使用,以及如何在实际场景中实现数据的实时分析和处理。此外,该项目也鼓励你去探索和理解分布式...
在这个“Storm API实现词频统计”的案例中,我们将深入探讨如何利用Java编程语言和Storm API来构建一个实时的词频统计应用。 首先,我们需要理解Storm的基本架构。Storm由多个组件构成,包括Spout(数据源)、Bolt...
深入理解Storm的源码对于更好地利用Storm进行开发和优化是非常重要的。Storm的核心源码主要包括以下几个方面: 1. **Nimbus和Supervisor的实现**:这部分代码实现了Nimbus和Supervisor的主循环逻辑,包括任务分配、...
《深入理解Storm流处理框架与JAR包应用》 Storm是一个强大的实时大数据处理框架,由Twitter开源并广泛应用于实时分析、在线机器学习、持续计算、分布式RPC等多种场景。它的核心概念是拓扑(Topology),通过定义...
对于通信的每个细节,最好的理解方式是深入查看GUI源代码。GUI源代码包含在任何固件包中,使用Perl语言编写,Perl语言足够原始易懂,可以容易地理解代码内容。 值得注意的是,当使用Simple Commands或RC Commands时...
为了深入理解这个入门示例,用户需要解压文件,查看源码,了解如何创建Storm拓扑,以及如何处理数据流。这可能涉及到以下几个关键知识点: 1. **Storm概念**:理解Spout和Bolt的概念,以及它们在实时数据处理中的...
Apache Storm 是一个开源的分布式实时计算系统,它允许...此外,理解Java编程和理解数据流处理的概念对于编写高效、可靠的Storm Topology至关重要。通过不断实践和优化,Storm可以在大数据实时处理领域发挥巨大作用。
《storm实时数据处理》这本书深入探讨了Apache Storm这一强大的实时计算系统,它是大数据处理领域中的重要工具,尤其在实时流处理方面具有显著优势。Storm设计的核心理念是简单、可扩展和容错性,使得它在处理大规模...
标题中的"storm0.9.0jar包"指的是Apache Storm的0.9.0版本的JAR文件。Apache Storm是一个开源的分布式实时计算系统,它...通过深入学习Storm的架构、API和最佳实践,开发者可以构建出高效、可靠的实时数据处理系统。
总的来说,"storm-starter-master"是学习和探索Apache Storm实时数据处理能力的理想起点,通过这个项目,开发者可以深入理解流式数据处理的基本概念,以及如何利用Storm构建可扩展、容错的实时处理系统。通过实践...
总的来说,分析Apache Storm 0.9.5的源码,我们可以深入了解其设计思想、内部机制和实现细节,这对于开发者在实际项目中优化性能、解决故障、定制功能都有极大的帮助。同时,这也为理解后续版本的改进和发展提供了...