`
wbj0110
  • 浏览: 1610662 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

Storm源码浅析之topology的提交

阅读更多

一、介绍
    Storm的开发语言主要是Java和Clojure,其中Java定义骨架,而Clojure编写核心逻辑。源码统计结果:

     180 text files.
     177 unique files.                                          
       7 files ignored.

http://cloc.sourceforge.net v 1.55  T=1.0 s (171.0 files/s, 46869.0 lines/s)
-------------------------------------------------------------------------------

Language                     files          blank        comment           code
-------------------------------------------------------------------------------
Java                           125           5010           2414          25661
Lisp                            33            732            283           4871
Python                           7            742            433           4675
CSS                              1             12             45           1837
ruby                             2             22              0            104
Bourne Shell                     1              0              0              6
Javascript                       2              1             15              6
-------------------------------------------------------------------------------
SUM:                           171           6519           3190          37160
-------------------------------------------------------------------------------


    Java代码25000多行,而Clojure(Lisp)只有4871行,说语言不重要再次证明是扯淡。
         
二、Topology和Nimbus        
    Topology是storm的核心理念,将spout和bolt组织成一个topology,运行在storm集群里,完成实时分析和计算的任务。这里我主要想介绍下topology部署到storm集群的大概过程。提交一个topology任务到Storm集群是通过StormSubmitter.submitTopology方法提交:

StormSubmitter.submitTopology(name, conf, builder.createTopology());

    我们将topology打成jar包后,利用bin/storm这个python脚本,执行如下命令:

bin/storm jar xxxx.jar com.taobao.MyTopology args

    将jar包提交给storm集群。storm脚本会启动JVM执行Topology的main方法,执行submitTopology的过程。而submitTopology会将jar文件上传到nimbus,上传是通过socket传输。在storm这个python脚本的jar方法里可以看到:

def jar(jarfile, klass, *args):                                                                                                                               
   exec_storm_class(                                                                                                                                          
        klass,                                                                                                                                                
        jvmtype="-client",                                                                                                                                    
        extrajars=[jarfile, CONF_DIR, STORM_DIR + "/bin"],                                                                                                    
        args=args,                                                                                                                                            
        prefix="export STORM_JAR=" + jarfile + ";") 

     将jar文件的地址设置为环境变量STORM_JAR,这个环境变量在执行submitTopology的时候用到:

//StormSubmitter.java 
private static void submitJar(Map conf) {

        if(submittedJar==null) {
            LOG.info("Jar not uploaded to master yet. Submitting jar");
            String localJar = System.getenv("STORM_JAR");
            submittedJar = submitJar(conf, localJar);
        } else {
            LOG.info("Jar already uploaded to master. Not submitting jar.");
        }
    }

    通过环境变量找到jar包的地址,然后上传。利用环境变量传参是个小技巧。

    其次,nimbus在接收到jar文件后,存放到数据目录的inbox目录,nimbus数据目录的结构

-nimbus
     -inbox
         -stormjar-57f1d694-2865-4b3b-8a7c-99104fc0aea3.jar
         -stormjar-76b4e316-b430-4215-9e26-4f33ba4ee520.jar

     -stormdist
        -storm-id
           -stormjar.jar
           -stormconf.ser
           -stormcode.ser

     其中inbox用于存放提交的jar文件,每个jar文件都重命名为stormjar加上一个32位的UUID。而stormdist存放的是启动topology后生成的文件,每个topology都分配一个唯一的id,ID的规则是“name-计数-时间戳”。启动后的topology的jar文件名命名为storm.jar ,而它的配置经过java序列化后存放在stormconf.ser文件,而stormcode.ser是将topology本身序列化后存放的文件。这些文件在部署的时候,supervisor会从这个目录下载这些文件,然后在supervisor本地执行这些代码。
    进入重点,topology任务的分配过程(zookeeper路径说明忽略root):
1.在zookeeper上创建/taskheartbeats/{storm id} 路径,用于任务的心跳检测。storm对zookeeper的一个重要应用就是利用zk的临时节点做存活检测。task将定时刷新节点的时间戳,然后nimbus会检测这个时间戳是否超过timeout设置。
2.从topology中获取bolts,spouts设置的并行数目以及全局配置的最大并行数,然后产生task id列表,如[1 2 3 4]
3.在zookeeper上创建/tasks/{strom id}/{task id}路径,并存储task信息
4.开始分配任务(内部称为assignment), 具体步骤:
 (1)从zk上获得已有的assignment(新的toplogy当然没有了)
 (2)查找所有可用的slot,所谓slot就是可用的worker,在所有supervisor上配置的多个worker的端口。
 (3)将任务均匀地分配给可用的worker,这里有两种情况:
 (a)task数目比worker多,例如task是[1 2 3 4],可用的slot只有[host1:port1 host2:port1],那么最终是这样分配

{1: [host1:port1] 2 : [host2:port1]
         3 : [host1:port1] 4 : [host2:port1]}

,可以看到任务平均地分配在两个worker上。
(b)如果task数目比worker少,例如task是[1 2],而worker有[host1:port1 host1:port2 host2:port1 host2:port2],那么首先会将woker排序,将不同host间隔排列,保证task不会全部分配到同一个worker上,也就是将worker排列成

[host1:port1 host2:port1 host1:port2 host2:port2]

,然后分配任务为

{1: host1:port1 , 2 : host2:port2}


(4)记录启动时间
(5)判断现有的assignment是否跟重新分配的assignment相同,如果相同,不需要变更,否则更新assignment到zookeeper的/assignments/{storm id}上。
5.启动topology,所谓启动,只是将zookeeper上/storms/{storm id}对应的数据里的active设置为true。
6.nimbus会检查task的心跳,如果发现task心跳超过超时时间,那么会重新跳到第4步做re-assignment。

分享到:
评论

相关推荐

    Storm 源码分析

    当用户提交一个Topology任务后,Nimbus会将该Topology分配给一个或多个Supervisor节点,Supervisor再在本地启动Worker进程执行Topology。每个Worker进程都会有自己的Spout和Bolt实例,这些实例通过Stream进行交互。 ...

    Storm源码走读笔记

    本文档是关于Storm源码的详细走读笔记,主要分析了Storm的启动场景、Topology提交过程、worker进程中的线程使用情况、消息传递机制以及 TridentTopology的创建和ack机制等多个方面。 首先,文档提到了Storm集群中的...

    storm提交topology的过程共1页.pdf.zip

    【标题】"storm提交topology的过程"涉及到的是Apache Storm这一分布式实时计算系统中的核心操作——部署和运行流处理任务,即topology。Apache Storm被广泛应用于实时数据处理、在线机器学习、持续计算以及大规模...

    storm0.9-源码包

    - **Topology**: 一个 Storm 应用由多个 Bolt 和 Spout 组成,它们通过拓扑结构连接,定义了数据流的流向。 2. **API 深度探索** - **TopologyBuilder**: 这个类用于构建 Storm 应用的拓扑结构,包括添加 Spout ...

    storm的测试源码

    - **集群模式**:当拓扑准备好后,你可以将其提交到Zookeeper协调的Storm集群上,进行分布式运行。 4. **容错机制**: - Storm通过检查点和状态持久化确保在节点故障时能够恢复,保证数据不丢失。 5. **监控与...

    apache-storm-0.9.5源码

    `lib/storm.thrift`包含了Thrift定义的Storm服务接口和数据结构,通过这些接口,用户可以编写Spouts、Bolts,并提交Topology到集群。 此外,`storm-core`模块中的`backtype.storm.util`提供了各种工具类,如`...

    storm开发jar包以及storm例子源码

    标题中的"storm开发jar包以及storm例子源码"表明了我们即将探讨的是关于Apache Storm的开发环境设置和示例代码。Apache Storm是一个开源的分布式实时计算系统,它允许开发者处理无限的数据流,常用于大数据实时处理...

    storm源码包 apache-0.9.4

    Apache Storm的核心概念是拓扑(Topology),它由 bolts(处理组件)和 spouts(数据源)组成。Bolts 执行业务逻辑,而 Spouts 生成数据流。这些组件通过流(Stream)连接,形成一个有向无环图(DAG),在集群中...

    storm+kafka源码示例

    在我们的"storm+kafka源码示例"中,拓扑(Topology)是Storm的基本工作单元,它定义了数据流的处理流程。具体到这个示例,`StormKafkaTopoMain.java`是拓扑的主类,它会定义整个处理逻辑。在这个类中,你会看到如何...

    Storm杂谈之Topology的启动过程

    大家都知道,要提交StormTopology到Cluster,需要运行如下命令:bin目录下storm是一个Python文件,我们可以看一下Python脚本的main方法首先解析args参数,解析完了之后,把所有的参数传递给COMMANDS,由COMMANDS调用...

    STORM流计算Topology性能监控

    STORM的TOPOLOGY在线上运行时,随着数据量的增加,在一定的服务器性能及集群规模下,会渐渐达到一个极限,到达极限后,服务器的load、io、cpu、mem等可能会出现耗尽,系统很卡,storm吞吐量骤降的情况。本文档中截图...

    storm利用ack保证数据的可靠性源码

    用户可以通过设置`topology.message.timeout.secs`参数来调整tuple的超时时间,根据实际需求平衡数据的可靠性和系统性能。 5. **acker组件** Acker bolt是Storm的一部分,它负责处理ack和fail消息。acker收到ack...

    源码阅读之storm操作zookeeper-cluster.clj

    源码阅读之storm操作zookeeper-cluster.clj是深入理解storm框架如何与Zookeeper协同工作的关键。这篇文章主要聚焦于storm在Clojure语言环境下如何通过cluster.clj文件与Zookeeper集群进行交互,提供了对相关源码的...

    storm1.2.1-单机部署,运行自己开发的jar

    接下来,下载Apache Storm 1.2.1的源码或预编译二进制包。你可以访问Apache Storm的官方网站或通过Git克隆其GitHub仓库来获取。一旦下载完成,解压缩文件到你选择的目录,例如`/usr/local/storm`。 配置Storm环境。...

    storm job 提交集群测试 注意事项

    - **编写源码**:首先,你需要使用Java或Clojure编写Storm拓扑结构(Topology),定义数据流的处理逻辑,包括Spout(数据源)和Bolt(处理节点)。 - **编译打包**:将源代码编译成JAR文件,通常会包含拓扑结构、...

    Topology可视化在线绘图引擎 v0.5.27.zip

    "Topology可视化在线绘图引擎 v0.5.27.zip" 是一个包含了软件工具和源码的压缩包,主要用于创建和展示拓扑结构。拓扑学是数学的一个分支,研究空间对象及其相互关系,而在计算机科学中,拓扑可视化的应用广泛,特别...

    细细品味Storm_Storm简介及安装

    可以通过提交一个简单的Topology来验证Storm是否安装成功。 ### 总结 Storm是一款强大的分布式实时数据处理框架,具有低延迟、高性能、容错性好等特点。它支持多种编程语言,适用于信息流处理、连续计算和分布式...

    storm-kakfa使用state例子源码

    在这个"storm-kafka使用state例子源码"中,我们将会探讨如何结合两者,利用 Storm 的 State API 来处理从 Kafka 获取的数据。 首先,`storm-kafka`是Apache Storm的一个扩展,它提供了一种方便的方式去消费Kafka中...

Global site tag (gtag.js) - Google Analytics