转载:http://xumingming.sinaapp.com/647/twitter-storm-code-analysis-topology-execution/
我们通过前面的文章(Twitter Storm源代码分析之ZooKeeper中的目录结构)知道了storm集群里面nimbus是通过zookeeper来给supervisor发送指令的,并且知道了通过zookeeper到底交换了哪些信息。 那么一个topology从提交到执行到底是个什么样的过程?nimbus和supervisor到底做了什么样的事情呢?本文将带你去探寻这些答案。
代码列表
如何提交一个topology?
要提交一个topology给storm的话, 我们在命令行里面是这么做的:
1
|
storm jar allmycode.jar org.me.MyTopology arg1 arg2 arg3 |
那么在这个命令的背后,storm集群里面发生了什么呢?
storm里的幕后英雄:nimbus,supervisor
看似简单的topology提交, 其实背后充满着血雨腥风(好吧,我夸张了), 我们来看看我们的幕后英雄nimbus, supervisor都做了什么。
上传topology的代码
首先由Nimbus$Iface
的beginFileUpload
, uploadChunk
以及finishFileUpload
方法来把jar包上传到nimbus服务器上的/inbox目录
1
2
3
4
5
6
7
8
9
|
/{storm-local-dir}
|
|-/nimbus
|
|-/inbox -- 从nimbus客户端上传的jar包
| 会在这个目录里面
|
|-/stormjar-{uuid}.jar -- 上传的jar包其中{uuid}表示
生成的一个uuid
|
运行topology之前的一些校验
topology的代码上传之后Nimbus$Iface
的submitTopology
方法会负责对这个topology进行处理, 它首先要对storm本身,以及topology进行一些校验:
- 它要检查storm的状态是否是active的
- 它要检查是否已经有同名的topology已经在storm里面运行了
- 因为我们会在代码里面给spout, bolt指定id, storm会检查是否有两个spout和bolt使用了相同的id。
- 任何一个id都不能以”__”开头, 这种命名方式是系统保留的。
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
(check-storm-active! nimbus storm-name false) ( defn validate-topology! [ topology ]
( let [ bolt-ids ( keys (.get_bolts topology))
spout-ids ( keys (.get_spouts topology))
state-spout-ids ( keys (.get_state_spouts topology))
; 三种id之间有没有交集?
common (any-intersection bolt-ids spout-ids state-spout-ids) ]
; 这些id之间是不能有交集的: spout的id和bolt的id是不能一样的
(when-not (empty? common)
(throw
(InvalidTopologyException.
(str "Cannot use same component id for both spout and bolt: "
(vec common))
)))
; 用户定义的id不能以__开头, 这些是系统保留的
(when-not (every?
(complement system-component?)
(concat bolt-ids spout-ids state-spout-ids))
(throw
(InvalidTopologyException.
"Component ids cannot start with '__'")))
;; TODO: validate that every declared stream is not a system stream
))
|
如果以上检查都通过了,那么就进入下一步了。
建立topology的本地目录
然后为这个topology建立它的本地目录:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
/{storm-local-dir}
|
|-/nimbus
|
|-/inbox -- 从nimbus客户端上传的jar包
| | 会在这个目录里面
| |
| |-/stormjar-{uuid}.jar -- 上传的jar包其中{uuid}表示
| 生成的一个uuid
|
|-/stormdist
|
|-/{topology-id}
|
|-/stormjar.jar -- 包含这个topology所有代码
| 的jar包(从nimbus/inbox
| 里面挪过来的)
|
|-/stormcode.ser -- 这个topology对象的序列化
|
|-/stormconf.ser -- 运行这个topology的配置
|
对应的代码:
01
02
03
04
05
06
07
08
09
10
11
12
13
|
( defn - setup-storm-code
[ conf storm-id tmp-jar-location storm-conf topology ]
( let [ stormroot (master-stormdist-root conf storm-id) ]
(FileUtils/forceMkdir (File. stormroot))
(FileUtils/cleanDirectory (File. stormroot))
(setup-jar conf tmp-jar-location stormroot)
(FileUtils/writeByteArrayToFile
(File. (master-stormcode-path stormroot))
(Utils/serialize topology))
(FileUtils/writeByteArrayToFile
(File. (master-stormconf-path stormroot))
(Utils/serialize storm-conf))
))
|
建立topology在zookeeper上的心跳目录
nimbus老兄是个有责任心的人, 它虽然最终会把任务分成一个个task让supervisor去做, 但是他时刻都在关注着大家的情况, 所以它要求每个task每隔一定时间就要给它打个招呼(心跳信息), 以让它知道事情还在正常发展, 如果有task超时不打招呼, nimbus会认为这个task不行了, 然后进行重新分配。zookeeper上面的心跳目录:
1
2
3
4
5
6
7
8
|
|-/taskbeats -- 所有task的心跳
|
|-/{topology-id} -- 这个目录保存这个topology的所
| 有的task的心跳信息
|
|-/{task-id} -- task的心跳信息,包括心跳的时
间,task运行时间以及一些统计
信息
|
计算topology的工作量
nimbus是个精明人, 它对每个topology都会做出详细的预算:需要多少工作量(多少个task)。它是根据topology定义中给的parallelism hint参数, 来给spout/bolt来设定task数目了,并且分配对应的task-id。并且把分配好task的信息写入zookeeper上的/task目录下:
1
2
3
4
5
6
7
8
9
|
|-/tasks -- 所有的task
|
|-/{topology-id} -- 这个目录下面id为
| {topology-id}的topology
| 所对应的所有的task-id
|
|-/{task-id} -- 这个文件里面保存的是这个
task对应的component-id:
可能是spout-id或者bolt-id
|
从上图中注释中看到{task-id}这个文件里面存储的是它所代表的spout/bolt的id, 这其实就是一个细化工作量的过程。
打比方说我们的topology里面一共有一个spout, 一个bolt。 其中spout的parallelism是2, bolt的parallelism是4, 那么我们可以把这个topology的总工作量看成是6, 那么一共有6个task,那么/tasks/{topology-id}下面一共会有6个以task-id命名的文件,其中两个文件的内容是spout的id, 其它四个文件的内容是bolt的id。
看代码:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
(.setup-heartbeats! storm-cluster-state storm-id) (setup-storm-static conf storm-id storm-cluster-state) ( defn - setup-storm-static [ conf storm-id storm-cluster-state ]
( doseq [ [ task-id component-id ] (mk-task-component-assignments conf storm-id) ]
(. set -task! storm-cluster-state storm-id task-id (TaskInfo. component-id))
))
( defn mk-task-maker [ max-parallelism parallelism-func id-counter ]
( fn [ [ component-id spec ] ]
( let [ parallelism (parallelism-func spec)
parallelism ( if max-parallelism (min parallelism max-parallelism) parallelism)
num-tasks (max 1 parallelism) ]
(for-times num-tasks
[ (id-counter) component-id ] )
)))
|
把计算好的工作分配给supervisor去做
然后nimbus就要给supervisor分配工作了。工作分配的单位是task(上面已经计算好了的,并且已经给每个task编号了), 那么分配工作意思就是把上面定义好的一堆task分配给supervisor来做, 在nimbus里面,Assignment表示一个topology的任务分配信息:
1
2
|
(defrecord Assignment [ master-code-dir
node->host task->node+port task->start-time-secs ] )
|
其中核心数据就是task->node+port, 它其实就是从task-id到supervisor-id+port的映射, 也就是把这个task分配给某台机器的某个端口来做。 工作分配信息会被写入zookeeper的如下目录:
01
02
03
04
05
06
07
08
09
10
11
|
/-{storm-zk-root} -- storm在zookeeper上的根
| 目录
|
|-/assignments -- topology的任务分配信息
|
|-/{topology-id} -- 这个下面保存的是每个
topology的assignments
信息包括: 对应的
nimbus上的代码目录,所有
task的启动时间,
每个task与机器、端口的映射
|
TODO: 补充工作分配的细节
正式运行topology
到现在为止,任务都分配好了,那么我们可以正式启动这个topology了,在源代码里面,启动topology其实就是向zookeeper上面该topology所对应的目录写入这个topology的信息:
1
2
3
4
5
6
7
8
|
|-/storms -- 这个目录保存所有正在运行
| 的topology的id
|
|-/{topology-id} -- 这个文件保存这个topology
的一些信息,包括topology的
名字,topology开始运行的时
间以及这个topology的状态
(具体看StormBase类)
|
看代码:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
( defn - start-storm
[ storm-name storm-cluster-state storm-id ]
(log-message "Activating " storm-name ": " storm-id)
(.activate-storm! storm-cluster-state
storm-id
(StormBase. storm-name
(current-time-secs)
{ :type :active })))
(activate-storm! [ this storm-id storm-base ]
; 把这个topology的信息(StormBase)
; 写入/storms/{topology-id}这个文件
(set-data cluster-state (storm-path storm-id)
(Utils/serialize storm-base))
)
|
好!nimbus干的不错,到这里为止nimbus的工作算是差不多完成了,它对topology进行了一些检查,发现没什么问题, 然后又评估了一下工作量, 然后再看看它的小弟们(supervisor)哪些有空,它进行了合理的分配,所有的事情都安排妥当了,nimbus终于可以松一口气了。下面就看supervisor的了。
Supervisor领任务
我们的supervisor同志无时无刻不想着为大哥nimbus分忧, 它每隔几秒钟就去看看大哥有没有给它分配新的任务,这些逻辑主要在supervisor.clj里面的synchronize-supervisor
和sync-processes
两个方法里面它:
- 首先它看看storm里面有没有新提交的它没有下载的topology的代码, 如果有的话, 它就把这个新topology的代码下载下来。它可不管这个topology由不由它负责哦(这一点是可以优化的)
01020304050607080910111213141516
(
doseq
[
[
storm-id master-code-dir
]
storm-code-
map
]
(
when
-not (downloaded-storm-ids storm-id)
(log-message
"Downloading code for storm id "
storm-id
" from "
master-code-dir)
; 从nimbus上下载这个topology的代码
(download-storm-code conf storm-id
master-code-dir)
(log-message
"Finished downloading code for storm id "
storm-id
" from "
master-code-dir)
))
- 然后它会删除那些已经不再运行的topology的代码
123456
(
doseq
[
storm-id downloaded-storm-ids
]
(
when
-not (assigned-storm-ids storm-id)
(log-message
"Removing code for storm id "
storm-id)
(rmr (supervisor-stormdist-root conf storm-id))
))
- 然后他根据老大哥nimbus给它指派的任务信息(task-id对应到的topology的spout或者bolt), 来让它自己的小弟:worker来做这个事情
010203040506070809101112131415161718192021
(dofor
[
[
port assignment
]
reassign-tasks
]
(
let
[
id (
new
-worker-ids port)
]
(log-message
"Launching worker with assignment "
(pr-
str
assignment)
" for this supervisor "
supervisor-id
" on port "
port
" with id "
id
)
; 启动一个worker(supervisor+port)
; 来处理assignments
(launch-worker conf
shared-context
(:storm-id assignment)
supervisor-id
port
id
worker-thread-pids-atom)
id))
Worker执行
worker是个苦命的人, 上面的nimbus, supervisor只会指手画脚, 它要来做所有的脏活累活。
- 1. 它首先去zookeeper上去看看老大哥们都给他分配了哪些task(task-ids)
0102030405060708091011121314
(
defn
read-worker-task-ids
[
storm-cluster-state storm-id supervisor-id port
]
(
let
[
assignment
(
:task-
>
;node+port
(.assignment-info
storm-cluster-state storm-id nil))
]
(
doall
(mapcat (
fn
[
[
task-id loc
]
]
; 找出这个worker(supervisor+port)的tasks
(
if
(= loc
[
supervisor-id port
]
)
[
task-id
]
))
assignment))
))
- 2. 然后根据这些task-id来找出所对应的topology的spout/bolt
12
task->
;component (storm-task-info
storm-cluster-state storm-id)
- 3. 计算出它所代表的这些spout/bolt会给哪些task发送消息
12345
; task-ids是这个worker所负责的那些task, 那么
; worker-outbound-tasks函数的结果就是这些task
; 的消息要发送的task(supervisor+port)
outbound-tasks (worker-outbound-tasks
task->component mk-topology-context task-ids)
- 4. 建立到3里面所提到的那些task的连接(socket), 然后在需要发送消息的时候就通过这些socket来发送
01020304050607080910111213
(swap! node+port->
;socket
merge
(into {}
(dofor
[
[
node port
:as
endpoint
]
new
-connections
]
[
endpoint
; msg/connect函数返回的就是从这个worker的端口
; 到目的地主机、端口的socket
(msg/connect
mq-context
((:node->host assignment) node)
port)
]
)))
到这里为止,topology里面的组件(spout/bolt)都根据parallelism被分成多个task, 而这些task被分配给supervisor的多个worker来执行。大家各司其职,整个topology已经运行起来了。
Topology的终止
除非你显式地终止一个topology, 否则它会一直运行的,可以用下面的命令去终止一个topology:
1
|
storm kill {stormname}
|
在这个命令的背后, storm-cluster-state
的remove-storm!
命令会被调用:
1
2
3
4
|
(remove-storm! [ this storm-id ]
(delete-node cluster-state (storm-task-root storm-id))
(delete-node cluster-state (assignment-path storm-id))
(remove-storm-base! this storm-id))
|
上面的代码会把zookeeper上面/tasks
, /assignments
, /storms
下面有关这个topology的数据都删除了。这些数据(或者目录)之前都是nimbus创建的。还剩下/taskbeats
以及/taskerrors
下的数据没有清除, 这块数据会在supervisor下次从zookeeper上同步数据的时候删除的(supervisor会删除那些已经不存在的topology相关的数据)。这样这个topology的数据就从storm集群上彻底删除了。
相关推荐
- **Nimbus** (主节点):作为集群中的主控节点,负责上传计算任务供执行、分发代码到集群中、在集群中启动工作进程,并监控计算过程,在必要时重新分配工作进程。 - **Zookeeper**:用于协调Storm集群中的各个节点,...
描述"基于Java的实例源码-开放实时数据处理平台 Twitter Storm.zip"进一步确认了这个压缩文件包含的是用于学习和参考的源代码,特别是对于那些想要理解和实施实时数据流处理的开发者来说,这是一个宝贵的资源。...
本文档是关于Storm源码的详细走读笔记,主要分析了Storm的启动场景、Topology提交过程、worker进程中的线程使用情况、消息传递机制以及 TridentTopology的创建和ack机制等多个方面。 首先,文档提到了Storm集群中的...
1. Spout:Spout是Apache Storm的数据输入组件,负责从外部数据源(如Kafka、Twitter或数据库)读取数据并生成数据流。在源码中,`backtype.storm.spout`包包含了各种Spout的实现,如`KafkaSpout`用于从Kafka消费...
这个版本可能包含了源代码、文档、示例以及编译和部署所需的配置文件,供开发者研究和使用。通过深入研究这些代码和文档,开发者可以更深入地理解Storm的工作原理,并利用它来构建自己的实时数据处理系统。
"nathanmarz-storm-9a3e1ec" 这个文件名可能指的是Nathan Marz的个人Storm分支的一个特定版本,9a3e1ec可能是Git仓库中的一个提交哈希,代表了该版本的源代码快照。这个版本可能包含了Storm的某些特定改进或修复。 ...
在这个压缩包中,我们可以预见到与Storm相关的各种源代码、配置文件或者示例项目。 Storm的核心概念包括: 1. **Spout**:数据源,负责产生数据流。可以是任何数据源,如数据库、消息队列等。 2. **Bolt**:数据...
Storm是一个开源的分布式实时计算系统,由Twitter开发并开源,其设计目标是让实时处理变得简单、强大且可靠。在Storm中,数据流被抽象为持续不断的Tuple(元组)序列,这些Tuple在网络中的worker节点间进行分布式...
- **DataSource**:外部数据源,如 Kafka、Twitter 等。 - **Spout**:数据源组件,负责读取外部数据并将数据转换为 Tuple 形式,传递给下游的 Bolt 处理。 - **Bolt**:数据处理组件,接收来自 Spout 或上游 Bolt ...
在"storm-starter-master"这个项目中,通常会包含一系列的示例,如简单的单词计数(WordCount)、日志分析等,这些示例有助于初学者理解如何在Storm中创建拓扑结构(Topology)并部署运行。每个示例都会展示如何定义...
### Storm 从零到精通知识点解析 #### 一、Storm简介 **1.1 什么是Storm** Apache Storm 是一个开源的分布式实时计算系统,能够处理大量实时数据流。Storm 的设计目的是为了实现实时处理,它能够确保每个事件都能...
- **Spout**:数据源,负责从外部源(如Kafka、Twitter等)拉取或接收数据,并将数据发布到Storm的各个工作节点。 - **Bolt**:处理组件,执行数据清洗、聚合、过滤等操作。 2. **实时处理**:Storm的强项在于...
源码版本的Apache Storm允许开发者查看和修改源代码,以便进行以下操作: - **自定义行为**:根据特定需求调整核心组件或实现新的功能。 - **性能优化**:分析和改进代码以提高处理速度或降低资源消耗。 - **调试和...
这个名为"apache-storm-2.1.0.tar.gz"的压缩包包含了Apache Storm的2.1.0版本,这是一个非源码的发行版,意味着它包含了编译后的二进制文件,可以直接在Linux环境中运行。该版本是由Apache软件基金会维护的,最初由...
“使用Apache Storm进行实时分析” ###简介Udacity与Twitter合作已经开始了关于Apache Storm的出色课程。 您可以在免费加入该课程。 这基本上是一个在制品。 ### Commands引导进入Storm虚拟机的命令。 vagrant up...
`Storm` 是一个开源的分布式实时计算系统,由Twitter开发并贡献给了Apache基金会。它能够处理无界数据流,即源源不断的实时数据,并保证每个消息只被处理一次(Exactly Once语义)。在商品订单频繁项集挖掘中,`...
Twitter将Storm正式开源了,这是一个分布式的、容错的实时计算系统,它被托管在GitHub上,遵循 Eclipse Public License 1.0。Storm是由BackType开发的实时处理系统,BackType现在已在Twitter麾下。GitHub上的最新...
- **编译打包**:将源代码编译成JAR文件,通常会包含拓扑结构、依赖库等。 - **配置参数**:根据实际需求,配置拓扑运行参数,如worker数量、执行器(Executor)线程数等。 - **提交Job**:通过命令行工具`storm ...
在Apache Storm中,拓扑(Topology)是核心概念之一,它定义了数据流的处理逻辑。一个拓扑由多个 bolts(处理组件)和 spouts(数据源)组成,通过流(Stream)相互连接。Spouts负责从外部源(如消息队列或数据库)...