`

Storm-源码分析-Topology Submit-Client

 
阅读更多

转载自:http://www.cnblogs.com/fxjwind

1 Storm Client

最开始使用storm命令来启动topology, 如下

storm jar storm-starter-0.0.1-SNAPSHOT-standalone.jar storm.starter.WordCountTopology

这个storm命令是用python实现的, 看看其中的jar函数, 很简单, 调用exec_storm_class, 其中jvmtype="-client"
而exec_storm_class其实就是拼出一条java执行命令, 然后用os.system(command)去执行, 为何用Python写, 简单? 可以直接使用storm命令?
这儿的klass就是topology类, 所以java命令只是调用Topology类的main函数

def jar(jarfile, klass, *args):
    """Syntax: [storm jar topology-jar-path class ...]

Runs the main method of class with the specified arguments.
The storm jars and configs in ~/.storm are put on the classpath.
The process is configured so that StormSubmitter
(http://nathanmarz.github.com/storm/doc/backtype/storm/StormSubmitter.html)
will upload the jar at topology-jar-path when the topology is submitted.
"""
    exec_storm_class(
        klass,
        jvmtype="-client",
        extrajars=[jarfile, CONF_DIR, STORM_DIR + "/bin"],
        args=args,
        childopts="-Dstorm.jar=" + jarfile)

def exec_storm_class(klass, jvmtype="-server", childopts="", extrajars=[], args=[]):
    nativepath = confvalue("java.library.path", extrajars)
    args_str = " ".join(map(lambda s: "\"" + s + "\"", args))
    command = "java" + jvmtype + " -Dstorm.home=" + STORM_DIR + " " + get_config_opts() + " -Djava.library.path=" + nativepath + " " + childopts + " -cp" + get_classpath(extrajars) + " " + klass + " " + args_str
    print "Running:" + command
    os.system(command)

直接看看WordCountTopology例子的main函数都执行什么?

除了定义topology, 最终会调用StormSubmitter.submitTopology(args[0], conf, builder.createTopology()), 来提交topology

    public static void main(String[] args) throws Exception {        
        TopologyBuilder builder = new TopologyBuilder();        
        builder.setSpout("spout", new RandomSentenceSpout(), 5);        
        builder.setBolt("split", new SplitSentence(), 8)
                 .shuffleGrouping("spout");
        builder.setBolt("count", new WordCount(), 12)
                 .fieldsGrouping("split", new Fields("word"));

        Config conf = new Config();
        conf.setDebug(true);
      
        if(args!=null && args.length > 0) {
            conf.setNumWorkers(3);            
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } else {        
            conf.setMaxTaskParallelism(3);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", conf, builder.createTopology());   
            Thread.sleep(10000);
            cluster.shutdown();
        }
    }

StormSubmitter

直接看看submitTopology,
1. 配置参数
把命令行参数放在stormConf, 从conf/storm.yaml读取配置参数到conf, 再把stormConf也put到conf, 可见命令行参数的优先级更高
将stormConf转化为Json, 因为这个配置是要发送到服务器的

2. Submit Jar
StormSubmitter的本质是个Thrift Client, 而Nimbus则是Thrift Server, 所以所有的操作都是通过Thrift RPC来完成, Thrift参考Thrift,Storm-源码分析- Thrift的使用
先判断topologyNameExists, 通过Thrift client得到现在运行的topology的状况, 并check
然后Submit Jar, 通过底下三步
client.getClient().beginFileUpload();
client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
client.getClient().finishFileUpload(uploadLocation);
把数据通过RPC发过去, 具体怎么存是nimbus自己的逻辑的事...

3. Submit Topology
很简单只是简单的调用RPC
client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);

    /**
     * Submits a topology to run on the cluster. A topology runs forever or until 
     * explicitly killed.
     *
     *
     * @param name the name of the storm.
     * @param stormConf the topology-specific configuration. See {@link Config}. 
     * @param topology the processing to execute.
     * @param options to manipulate the starting of the topology
     * @throws AlreadyAliveException if a topology with this name is already running
     * @throws InvalidTopologyException if an invalid topology was submitted
     */
    public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException {
        if(!Utils.isValidConf(stormConf)) {
            throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
        }
        stormConf = new HashMap(stormConf);
        stormConf.putAll(Utils.readCommandLineOpts());
        Map conf = Utils.readStormConfig();
        conf.putAll(stormConf);
        try {
            String serConf = JSONValue.toJSONString(stormConf);
            if(localNimbus!=null) {
                LOG.info("Submitting topology " + name + " in local mode");
                localNimbus.submitTopology(name, null, serConf, topology);
            } else {
                NimbusClient client = NimbusClient.getConfiguredClient(conf);
                if(topologyNameExists(conf, name)) {
                    throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
                }
                submitJar(conf);
                try {
                    LOG.info("Submitting topology " +  name + " in distributed mode with conf " + serConf);
                    if(opts!=null) {
                        client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);                    
                    } else {
                        // this is for backwards compatibility
                        client.getClient().submitTopology(name, submittedJar, serConf, topology);                                            
                    }
                } catch(InvalidTopologyException e) {
                    LOG.warn("Topology submission exception", e);
                    throw e;
                } catch(AlreadyAliveException e) {
                    LOG.warn("Topology already alive exception", e);
                    throw e;
                } finally {
                    client.close();
                }
            }
            LOG.info("Finished submitting topology: " +  name);
        } catch(TException e) {
            throw new RuntimeException(e);
        }
    }
posted @ 2013-06-05 15:52 fxjwind 阅读(14) 评论(0) 编辑
分享到:
评论

相关推荐

    apache-storm-2.4.0.tar.gz

    在大数据领域,Storm 被广泛应用于实时分析、在线机器学习、持续计算、数据集成以及任何需要实时处理的场景。 标题 "apache-storm-2.4.0.tar.gz" 指的是 Apache Storm 的特定版本,即 2.4.0 版本的源码或二进制包,...

    apache-storm-0.9.5源码

    `storm-client`模块则包含了提交拓扑到集群的相关代码,如`Nimbus`接口和其实现,这是Storm集群的主控节点。 对于故障恢复,Storm采用了Zookeeper进行协调,确保任务的容错性。`storm-zookeeper`模块包含了与...

    storm-starter-master

    在"storm-starter-master"这个项目中,通常会包含一系列的示例,如简单的单词计数(WordCount)、日志分析等,这些示例有助于初学者理解如何在Storm中创建拓扑结构(Topology)并部署运行。每个示例都会展示如何定义...

    apache-storm-1.1.2.tar.gz

    在进行云计算实验时,学生可以通过部署这个压缩包来学习Storm的基本概念,如Topology(拓扑)、Bolt(处理逻辑)、Spout(数据源)以及容错机制。同时,也可以结合Hadoop和Spark,探索不同的大数据处理场景和解决...

    storm-0.8.1压缩包

    7. **创建和提交拓扑**:编写Storm拓扑(Topology)定义数据处理逻辑,然后通过Storm CLI提交到集群。 标签"storm"表明这个压缩包是关于Apache Storm的,Storm的核心概念包括: - **Spout**:Spout是数据源,负责...

    apache-storm-1.2.3.tar.gz

    开发 Storm 应用通常涉及定义 bolts(处理逻辑)和 spouts(数据源),并通过 Topology 连接它们形成数据处理流程。 在实际开发中,你可能还需要了解 Storm 的关键概念,如 Trident(一种高级接口,提供更强大的...

    Storm 源码分析

    ### Storm源码分析 #### 一、Storm简介与应用场景 Apache Storm是一款开源的分布式实时计算系统,它提供了简单而强大的API来定义数据流处理逻辑,同时保证了消息处理的容错性和高性能。Storm的设计目标是成为实时...

    apache-storm-2.1.0-src.tar.gz

    Apache Storm是一个开源的...总的来说,Apache Storm 2.1.0源码提供了深入了解和控制实时大数据处理的强大工具,对于需要高性能、低延迟实时分析的应用场景,以及对分布式计算感兴趣的开发者,都是极具价值的资源。

    storm-1.1.1.tar.gz

    Apache Storm的设计目标是处理大规模数据流,提供低延迟、高吞吐量的数据处理能力,常用于实时分析、在线机器学习、持续计算、大数据处理等领域。 描述中提到的"使用zookeeper-3.4.10.tar.gz"表明在搭建Storm集群时...

    storm0.9-源码包

    源码分析对于开发者来说,是提升技能和优化应用的关键步骤。 1. **核心组件** - **Bolt**: Bolt 是 Storm 中的主要处理组件,负责数据清洗、聚合、计算等任务。通过自定义 Bolt,用户可以实现自己的业务逻辑。 - ...

    storm-kakfa使用state例子源码

    在这个"storm-kafka使用state例子源码"中,我们将会探讨如何结合两者,利用 Storm 的 State API 来处理从 Kafka 获取的数据。 首先,`storm-kafka`是Apache Storm的一个扩展,它提供了一种方便的方式去消费Kafka中...

    apache-storm-2.1.0.tar.gz

    这个名为"apache-storm-2.1.0.tar.gz"的压缩包包含了Apache Storm的2.1.0版本,这是一个非源码的发行版,意味着它包含了编译后的二进制文件,可以直接在Linux环境中运行。该版本是由Apache软件基金会维护的,最初由...

    apache-storm-1.0.2-src

    1. **拓扑(Topology)**:在 Storm 中,用户通过编写拓扑来定义数据流的处理逻辑。一个拓扑由多个 bolts 和 spouts 组成,它们通过 streams 相互连接。Bolts 执行复杂的处理任务,而 Spouts 是数据流的源头,通常从...

    kafka-storm-starter-develop

    4. **编写 Storm Topology**:编写 Storm 代码,定义 bolts 和 spouts,处理从 Kafka 消费的数据。 5. **配置 Storm**:配置 Storm 连接 Kafka 的参数,如 zookeeper 地址、topic 名称等。 6. **运行 Storm Topology...

    Storm-EPL-Example-2.0.19(sp2)

    Apache Storm是一个开源的分布式实时计算系统,它能够处理数据流并进行连续计算,常用于大数据实时分析。EPL则是Storm中用于定义数据处理逻辑的一种语言,类似SQL,但针对实时数据流处理进行了优化。 描述中提到,...

    Storm-EPL-Example-2.0.19.zip

    通过阅读和分析示例中的Topology代码,我们可以学习如何定义数据流的处理流程,并理解Storm的事件驱动模型。 EPL则为这些数据流处理带来了SQL的便利。在传统的数据库中,SQL用于查询和操作静态数据;而在EPL中,SQL...

    apache-atlas-2.1.0-storm-hook.tar.gz--基于cdh6.3.1编译完成

    6. 在 Storm 作业中启用 Hook,通常是在 topology 定义中进行,这样 Hook 就会在数据处理过程中触发元数据操作。 一旦设置完成,Apache Atlas Storm Hook 将自动捕获 Storm 作业的元数据信息,包括源、转换和目标。...

    Fundamental-Principles-Behind-the-Sigma-Delta-ADC-Topology-Part-2_cn.pdf

    在本文中,我们将会探讨Sigma-Delta ADC的基本原理,并分析其如何通过不同的噪声消除策略来实现高精度数据采集。 首先,Sigma-Delta ADC的基本工作原理包括以下几个关键步骤: 1. 过采样:Sigma-Delta ADC通过远...

    apache-storm-1.0.2.tar.gz

    总结,Apache Storm 1.0.2 是一个强大的实时大数据处理工具,尤其适用于需要对海量数据进行实时分析的场景。在 Linux 环境下,用户需要了解如何解压和部署这个软件包,理解 Storm 的核心组件和工作原理,以及如何...

    sps-2013-streamlined-topology-model

    标题“sps-2013-streamlined-topology-model”指的是“SharePoint Server 2013 简化拓扑模型”,这是关于微软SharePoint Server 2013的一个特定知识领域。描述“Topology design guidance for maximizing system ...

Global site tag (gtag.js) - Google Analytics