`
kavy
  • 浏览: 890982 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Storm应用系列之——Topology部署

 
阅读更多

本系列属个人原创,转载请注明!

原文地址:http://blog.csdn.net/xeseo/article/details/18219183

本系列源码地址: https://github.com/EdisonXu/storm-samples

https://github.com/baijian/storm-java

https://github.com/ashrithr/storm-helloworld

根据前文介绍,我们知道,storm的任务是包装在topology类中,由nimbus提交分配到整个cluster。

Topology有两种大类提交部署方式:

 

  • 提交到本地模式,一般用于调试。该模式下由于是起在同一个JVM进程下,所以不要让其负载太高。
  • 提交到集群模式。

提交到本地模式

这个非常的简单。
1. 编写代码
  1. public class LocalRunningTopology extends ExclaimBasicTopo {  
  2.   
  3.     public static void main(String[] args) throws Exception {  
  4.   
  5.         LocalRunningTopology topo = new LocalRunningTopology();  
  6.         Config conf = new Config();  
  7.         conf.setDebug(true);  
  8.   
  9.         LocalCluster cluster = new LocalCluster();  
  10.         cluster.submitTopology("test", conf, topo.buildTopology());  
  11.         Utils.sleep(100000);  
  12.         cluster.killTopology("test");  
  13.         cluster.shutdown();  
  14.     }  
  15. }  
2. 直接就可以在IDE里面运行,也可以提交到nimbus上,用nimbus进行本地运行:./storm jar storm-samples.jar com.edi.storm.topos.LocalRunningTopology

提交到集群

1. 编写代码
  1. public class ClusterRunningTopology extends ExclaimBasicTopo {  
  2.   
  3.     public static void main(String[] args) throws Exception {  
  4.   
  5.         String topoName = "test";  
  6.           
  7.         ClusterRunningTopology topo = new ClusterRunningTopology();  
  8.         Config conf = new Config();  
  9.         conf.setDebug(true);  
  10.   
  11.         conf.setNumWorkers(3);  
  12.   
  13.         StormSubmitter.submitTopology(topoName, conf, topo.buildTopology());  
  14.     }  
  15. }  
 
2. 编译jar包
因为我是Maven项目,所以直接 mvn clean install 生成jar
 
3. 上传至nimbus部署
./storm jar storm-samples.jar com.edi.storm.topos.ClusterRunningTopology

实际开发时常用提交模式

实际开发时,我们往往是把本地和集群混绑在一起,用传入参数以示区别
  1. public static void main(String[] args) throws Exception {  
  2.           
  3.         ExclaimBasicTopo topo = new ExclaimBasicTopo();  
  4.         Config conf = new Config();  
  5.         conf.setDebug(false);  
  6.   
  7.         if (args != null && args.length > 0) {  
  8.             conf.setNumWorkers(3);  
  9.   
  10.             StormSubmitter.submitTopology(args[0], conf, topo.buildTopology());  
  11.         } else {  
  12.   
  13.             LocalCluster cluster = new LocalCluster();  
  14.             cluster.submitTopology("test", conf, topo.buildTopology());  
  15.             Utils.sleep(100000);  
  16.             cluster.killTopology("test");  
  17.             cluster.shutdown();  
  18.         }  
  19.     }  
 
这样,本地就可以不传参直接运行,而需要部署到集群时,打完包传到nimbus上运行命令:
./storm jar storm-samples.jar com.edi.storm.topos.ClusterRunningTopology <TOPO_NAME>
填上一个集群唯一的<TOPO_NAME>即可。

有人又说了,这样还不是很方便,我能不能直接在IDE里面提交到storm集群?

可以。

 

IDE直接提交至集群

修改上面提交集群的代码如下:

 

  1. public static void main(String[] args) throws Exception {  
  2.   
  3.         String topoName = "test";  
  4.         ExclaimBasicTopo topo = new ExclaimBasicTopo();  
  5.         Config conf = new Config();  
  6.         conf.setDebug(false);  
  7.   
  8.         File jarFile = EJob.createTempJar(RemoteRunningTopology.class.getClassLoader().getResource("").getPath());  
  9.         ClassLoader classLoader = EJob.getClassLoader();  
  10.         Thread.currentThread().setContextClassLoader(classLoader);  
  11.           
  12.         //System.setProperty("storm.jar", Class.forName("com.edi.storm.topos.RemoteRunningTopology").getProtectionDomain().getCodeSource().getLocation().getPath());  
  13.         System.setProperty("storm.jar", jarFile.toString());  
  14.         conf.setNumWorkers(5);  
  15.         conf.setDebug(false);  
  16.         conf.put(Config.NIMBUS_HOST, "10.1.110.24");  
  17.         //conf.put(Config.NIMBUS_THRIFT_PORT, 8889);  
  18.         StormSubmitter.submitTopology(topoName, conf, topo.buildTopology());  
  19.     }  


起作用的部分主要有三点:

 

1. 设置系统变量"storm.jar"。这个变量的值代表要部署的Topology的jar包地址。

这个地址必须是文件,所以,我们就可以写完代码后自己打个jar包放在某个固定位置,然后IDE直接运行该topology去集群提交部署。

当然,也可以在代码中打jar,所以我这里的代码中加入了一个打包的Utilities类,EJob。

 

2. 设置参数Config.NIMBUS_HOST,其值为nimbus的hostname或ip地址。

3. 设置参数Config.NIMBUS_THRIFT_PORT,其值为nimbus上Thrift接口的地址,也就是nimbus的conf/storm.yaml中参数nimbus.thrift.port的值,前提是你配了。如果没配,可以不设。

 

这样就可以直接在IDE里面运行提交上去了。

 

Topology提交原理

Topology提交后发生了什么呢?这个原理要放在这里讲了。因为这直接关系到对Strom运行概念的理解。

1. Nimbus$Iface的beginFileUpload,uploadChunk以及finishFileUpload方法将运行的包上传至其数据目录(storm.yaml中storm.local.dir对应的目录)下的inbox目录。

 

  1. /{storm.local.dir}  
  2.   |  
  3.   | - /nimbus  
  4.         |  
  5.         | - /inbox  
  6.               |  
  7.               | - /stormjar-{uuid}.jar  

 

 

不论上传的包名字是什么,最终会变成stormjar-{uuid}.jar。

2. Nimbus$Iface的submitTopology方法会负责对这个topology进行处理,首先是对Storm本身及topology进行一些校验:

a. 检查Storm状态是否active

b. 检查是否有同名topology在运行

c. 检查是否有同id的spout和bolt,以及其id是否合法。任何一个id都不能以"_"开头,这种命名方式是系统保留的。

3. 建立topology的本地目录

 

[plain] view plaincopyprint?在CODE上查看代码片派生到我的代码片
 
  1. /{storm.local.dir}  
  2.     |  
  3.     | - /nimbus  
  4.           |  
  5.           | - /inbox  
  6.           | - /stormdist  
  7.                 |  
  8.                 | - /{topology-id}  
  9.                         |  
  10.                         | - /stormjar.jar -- 包含这个topology所有代码的jar包(从nimbus/inbox挪过来)  
  11.                         |  
  12.                         | -/stormcode.ser -- 这个topology对象的序列化  
  13.                         | -/stormconf.ser -- 运行这个topology的配置  

 

 

4. 建立该topology在zookeeper上的心跳目录

nimbus老兄是个有责任心的人,它虽然最终会把任务分成一个个task让supervisor去做,但是它时刻在关注着大家的情况,所以它要求每个task每隔一定时间就要给它打个招呼(心跳信息),让它知道事情还在正常发展。如果有task超时不打招呼,nimbus会人为这个task不行了,然后进行重新分配。zookeeper上的心跳目录:

 

[plain] view plaincopyprint?在CODE上查看代码片派生到我的代码片
 
  1. /<span style="font-family: Consolas, 'Liberation Mono', Courier, monospace;">{storm.zookeeper.root}</span>  
  2.   
  3.   |  
  4.   | - /workerbeats  
  5.          |  
  6.          | - {topology-id}  
  7.                 |  
  8.                 | - /{task-id}  -- task的心跳信息,包括心跳的时间,task运行时间以及一些统计信息  

 

 

5. 计算topology的工作量

nimbus会根据topology中给的parallelism hint参数,来给spout/bolt设定task数目,并分配相应的task-id,然后把分配号的task信息写到zookeeper上去:

 

[plain] view plaincopyprint?在CODE上查看代码片派生到我的代码片
 
  1. /{storm.zookeeper.root}  
  2.   |  
  3.   | - /assignments  
  4.         |  
  5.         | - /{topology-id}  


6. 保存toplogy信息到zookeeper

 

 

[plain] view plaincopyprint?在CODE上查看代码片派生到我的代码片
 
  1. /{storm.zookeeper.root}  
  2.     |  
  3.     | - /storms  
  4.           |  
  5.           | - /{topology-id}  

 

7. supervisor因为监听了zookeeper上的目录,所以当它发现有topology时,会先把所有的topology的信息如jar等下到本地,并删除不再运行的topology的本地信息

 

[plain] view plaincopyprint?在CODE上查看代码片派生到我的代码片
 
  1. /{storm.local.dir}  
  2.     |  
  3.     | - /supervisor  
  4.           |  
  5.           | - stormdist  
  6.                |  
  7.                | - {topology-id}  
  8.                       |  
  9.                       | - stormcode.ser  
  10.                       | - stormconf.ser  
  11.                       | - stormjar.jar  


8. supervisor根据分配的任务,去启动worker去处理assignment

 

9. worker启动后,会去zookeeper上找其对应的task。同时根据task的outbound信息建立对外的socket连接,将来发送tuple就是从这些socket连接发出去的。

到这里,一个topology就已经完全部署和运转起来了。

 

分享到:
评论

相关推荐

    storm提交topology的过程共1页.pdf.zip

    【标题】"storm提交topology的过程"涉及到的是Apache Storm这一分布式实时计算系统中的核心操作——部署和运行流处理任务,即topology。Apache Storm被广泛应用于实时数据处理、在线机器学习、持续计算以及大规模...

    storm企业应用 实战 运维和调优

    但是根据标题和描述中的信息,我能够为你生成关于Storm企业应用实战运维和调优的知识点。 Apache Storm是一个实时计算的分布式计算框架,它类似于Hadoop,但它是为了实时处理而不是批处理设计的。Storm可以处理大量...

    storm部署(包括所有依赖rpm包、集群搭建详解)

    在IT行业中,Apache Storm是一个实时计算系统,常用于大数据...通过以上步骤,可以成功搭建一个运行Storm应用的集群。在实际操作中,务必仔细检查每个环节,确保所有服务正常运行,从而实现高效、稳定的实时数据处理。

    storm demo

    【storm demo】是一个基于Apache Storm的实践项目,旨在教授如何运用Storm的核心组件——Spout、Bolt和Topology来构建实时数据处理系统。Apache Storm是一个开源的分布式实时计算系统,能够处理大量的流数据,并保证...

    细细品味Storm_Storm简介及安装

    Storm分布式实时计算模式由Apache Storm 项目核心贡献者吉奥兹、奥尼尔亲笔撰 写,融合了作者丰富的Storm实战经验,通过大量示例,全面而系统地讲解使用Storm进行分布式实 时计算的核心概念及应用,并针对不同的应用...

    storm1.2.1-单机部署,运行自己开发的jar

    在本教程中,我们将深入探讨如何在单机环境中部署Apache Storm 1.2.1,并运行你自己开发的Java应用程序(以jar包的形式)。Storm是一个分布式实时计算系统,它允许开发者处理无界数据流,实现低延迟和高吞吐量的数据...

    第一个Storm应用

    写第一个Storm应用--数单词数量(一个spout读取文本,第一个bolt用来标准化单词,第二个bolt为单词计数) 一、Storm运行模式: 1.本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细讲解) 运行在本地...

    论文研究-Storm集群下一种基于Topology的任务调度策略.pdf

    Storm作为开源的分布式实时计算系统在业界得到了广泛应用,针对Storm自带调度策略忽略了Topology组件任务间的逻辑耦合性,从而引起大量tuple传输产生较大网络时延问题,结合进程代数将Topology等效简化为具有明显...

    Storm实现的应用模型研究

    ### Storm实现的应用模型研究 #### 一、Storm概述 Storm是一种开源的分布式实时计算框架,由Twitter公司开发并开源。它能够高效、可靠地处理大量的数据流,适用于实时数据分析、在线机器学习、持续计算、ETL(提取...

    Storm企业级应用实战、运维和调优.zip

    10. **最佳实践**:总结Storm在企业应用中的最佳实践,包括开发、部署、运维等方面的建议,帮助读者避免常见的陷阱和误区。 通过对这本书的学习,读者不仅可以掌握Storm的基本用法,还能深入了解企业级实时处理系统...

    flume及kafka及storm搭建.rar

    在大数据处理领域,Flume、Kafka和Storm是三个至关重要的工具,它们分别在数据采集、数据分发和实时处理方面发挥着核心作用。这里我们将深入探讨这三个组件以及如何搭建它们。 1. Flume:Flume是Apache软件基金会的...

    storm组件应用说明书

    **Storm组件应用说明书** 本文档将全面介绍Apache Storm的基础应用,旨在为初学者提供一份详细的入门教程,帮助读者理解并掌握Storm在大数据流计算中的核心功能和操作步骤。Storm是一款开源的分布式实时计算系统,...

    STORM流计算Topology性能监控

    STORM的TOPOLOGY在线上运行时,随着数据量的增加,在一定的服务器性能及集群规模下,会渐渐达到一个极限,到达极限后,服务器的load、io、cpu、mem等可能会出现耗尽,系统很卡,storm吞吐量骤降的情况。本文档中截图...

    Storm 源码分析

    - **Nimbus**:Nimbus是Storm集群的核心组件之一,负责整个集群的管理和协调工作,包括任务调度、故障恢复等。Nimbus通过Zookeeper来实现状态同步和集群协调。 - **Supervisor**:Supervisor运行在每个Worker节点上...

    Storm实时数据处理.pdf

    - **定义**:Storm中的应用被称为Topology。一个Topology是由多个Spout和Bolt组成的有向无环图(DAG)。 - **作用**:负责定义数据流的流向和处理逻辑。 #### 2. **Spout** - **定义**:Spout是数据源,负责将数据...

    storm入门.pdf

    Storm的基本单位是“topology”(拓扑结构),它可以理解为一个实时计算的网络图,包含spouts和bolts两个主要组件。 Spout是拓扑中的数据源组件,主要负责从外部数据源如文件、数据库或者消息队列中获取原始数据,...

Global site tag (gtag.js) - Google Analytics