`
yangyangmyself
  • 浏览: 232965 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

STORM 拓扑构建

阅读更多

一、Storm大数据位置

 

解决方案

开发商

类型

描述

Storm

Twitter

流式处理

Twitter 的新流式大数据分析解决方案

S4

Yahoo!

流式处理

来自 Yahoo! 的分布式流计算平台

Hadoop

Apache

批处理

MapReduce 范式的第一个开源实现

Spark

UC Berkeley AMPLab

批处理/流处理

支持内存中数据集和恢复能力的最新分析平台

Disco

Nokia

批处理

Nokia 的分布式 MapReduce 框架

HPCC

LexisNexis

批处理

HPC 大数据集群

 

 

二、Storm概念及组件

 

  在Storm拓扑构建前我们先复习一下Storm概念及组件:

 

     Nimbus:负责资源分配和任务调度。

 

     Supervisor:负责接受nimbus分配的任务,启动和停止属于自己管理的worker进程。

 

     Worker:运行具体处理组件逻辑的进程。

 

      Taskworker中每一个spout/bolt的线程称为一个task. storm0.8之后,task不再与物理线程对应,同一个   

 

      spout/bolttask可能会共享一个物理线程,该线程称为executor

 

      Topologystorm中运行的一个实时应用程序,因为各个组件间的消息流动形成逻辑上的一个拓扑结构。

 

      Spout:在一个topology中产生源数据流的组件。通常情况下spout会从外部数据源中读取数据,然后转换为

 

       topology内部的源数据。Spout是一个主动的角色,其接口中有个nextTuple()函数,storm框架会不停地调用

 

   此函数,用户只要在其中生成源数据即可。

 

       Bolt:在一个topology中接受数据然后执行处理的组件。Bolt可以执行过滤、函数操作、合并、写数据库等

 

   任何操作。Bolt是一个被动的角色,其接口中有个execute(Tuple input)函数,在接受到消息后会调用此函

 

   数,用户可以在其中执行自己想要的操作。

 

       TupleStorm SpoutBolt组件消息传递的基本单元(数据模型),Tuple是包含名称的列表,Storm支持所

 

   有原生类型,字节数组为Tuple字段传递,如果要传递自定义对象,需要实现接口serializer

 

       Stream:源源不断传递的tuple就组成了stream

  

  

 

图(一)

 

 三、创建逻缉组件

      

       以下图二所示,创建Storm组件SpoutBoltStorm拓扑构建并结合拓扑描述并发情况(Worker

 

   ExecutorTask关系)

 



 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 图(二)

 

      SPOUT:自定义类实现IRichSpout接口或继承BaseRichSpout即创建Spout组件。Spout从外部数据源读取数据

 

(队列、DRPC等)为Storm拓扑提供数据,Storm可以实现可靠或不可靠,可靠性表现在Storm可以对Storm

 

 toplogy处理失败的tuple进行重发,反之不处理。

 

    Spout可以发送多个流,通过在调用SpoutOutputCollector类的emit方法的同时使用declareStream方法申

 

 明并指定多个流发送。


     Spout中主要方法nextTuple,能够发送新流到toplopy及无数据流发送时直接返回。最重要的是不要阻塞

 

 nextTuple方法,因为Spout在同一个线程执行。Spout主要将读取到的数据组织成Tuple发送到Bolt组件处

 

 理。


     Spout另一个重要的方法时ack和fail,Storm监控到tuple从Spout发送到toplogy成功完成或失败时调用ack

 

 和fail(数据可靠性请参考其它文档)


     示例Spout发tuple到默认流:

 

package com.sunshine.spout;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
/**
* 绿色Spout,参考图二
*/
public class GreenSpout extends BaseRichSpout{
	private static final long serialVersionUID = -1215556162813479167L;
	private SpoutOutputCollector collector;
	/**
	 * Storm自动初始化
	 */
	@Override
	public void open(Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
		this.collector = collector;
	}
	/**
	 * Storm不断调用此方法,传递单词到Bolt组件处理
	 */
	@Override
	public void nextTuple() {
		String[] words = {"green", "yellow", "blue"}; 
		for(String word : words){
			/**
			 * Values是List子类,发送数据需要封装在Values
			 * 本次存储一个单词发送与declareOutputFields()
			 * 方法申明输出一个字段对应
			 */
			collector.emit(new Values(word), word); // 1
		}
	}
	/**
	 * 申明发送tuple字段名称
	 * 在Bolt组件可以根所名称或索引获取Spout传递的Tuple
	 * tuple.getValue(0)
	 * 或tuple.getValueByField("word");
	 */
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {	
		declarer.declare(new Fields("word")); // 1
	}
	@Override
	public void ack(Object msgId) {
		System.out.println("success-->" + msgId);
	}
	@Override
	public void fail(Object msgId) {
		System.out.println("fail-->" + msgId);
	}
}

 
 

      BOLT:自定义类实现IRichBolt接口或者继承BaseRichBolt类即Bolt组件。Toplogy里所有处理都在Bolt完成,

 

 Bolt可以做任何事,如过滤、聚合、连接、操作数据库等,也可以将数据传递一下Bolt组件处理。

 

       Bolt可以做简单流转化或发送多个流(参考Spout发送多个流的方式),Bolt成功将消息处理后通知ACK

 

 件(可靠)。

 

package com.sunshine.bolt;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
/**
*黄色Spout,参考图二
*/
public class YellowBolt extends BaseRichBolt{
	private static final long serialVersionUID = 7593355203928566992L;
	private OutputCollector collector;
	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		this.collector = collector;
	}
	@Override
	public void execute(Tuple input) {
		String word = (String)input.getValueByField("word");
		if(word != null){
			collector.emit(new Values(word, word));
		}
		collector.ack(input);
	}
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("yword1","yword2"));
	}
}

 

package com.sunshine.bolt;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
/**
* 蓝色Spout,参考图二
*/
public class BlueBolt extends BaseRichBolt{
	private static final long serialVersionUID = 4342676753918989102L;
	private OutputCollector collector;
	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		this.collector = collector;
	}
	@Override
	public void execute(Tuple input) {
		String yword1 = (String)input.getValueByField("yword1");
		String yword2 = (String)input.getValueByField("yword2");
		System.out.println("yword1:" + yword1 +",yword2:" + yword2);
		collector.ack(input);
	}
	/**
	 * 结束
	 */
	@Override
	public void cleanup() {
		super.cleanup();
	}
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// 不再传递下一个Bolt组件处理
	}
}

 

       Toplogy(拓扑):参考图二及上述代码,构建Storm拓扑代码如下;在WIN7上使用本地运行模式,存在

 

        Zookeeper连接问题:

 

java.net.SocketException: Address family not supported by protocol family: connect”

 

 原因:Win7启动IPV6

 

 解决:增加配置属性System.setProperty("java.net.preferIPv4Stack", "true");

 

package com.sunshine;
import backtype.storm.Config;
import backtype.storm.topology.TopologyBuilder;
import com.sunshine.bolt.BlueBolt;
import com.sunshine.bolt.YellowBolt;
import com.sunshine.spout.GreenSpout;
import com.sunshine.tools.StormRunner;
/**
 * STROM启动类
 * @author OY
 * @version 0.1
 */
public class SimpleTopolog {
	public static void main(String[] args) throws Exception {
		// 解决ZOOKEEPER客户端连接服务端问题(IPV6)
		System.setProperty("java.net.preferIPv4Stack", "true");
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("green-spout", new GreenSpout(), 2) // 2 executor(线程)
		       .setNumTasks(4); // 4 task 对应2个executor
		builder.setBolt("yellow-bolt", new YellowBolt(),6) // 6 executor(线程)
				.shuffleGrouping("green-spout"); //tuple随机分发到bolt处理
		builder.setBolt("blue-bolt", new BlueBolt(), 2) // 2 executor(线程)
			   .shuffleGrouping("yellow-bolt");
		Config conf = new Config();
		/*设置工作进程数*/
		conf.setNumWorkers(2); 
		conf.setDebug(true);
		/*本地运行模式*/
		StormRunner.runTopologyLocally(builder.createTopology(), "simpleTopology", conf, 0);
	}
}

 

Storm运行模式工具类

 

 

package com.sunshine.tools;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
public final class StormRunner {
  private static final int MILLIS_IN_SEC = 1000;
  private StormRunner() {
  }
  public static void runTopologyLocally(StormTopology topology, String topologyName, Config conf, int runtimeInSeconds)
      throws InterruptedException {
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology(topologyName, conf, topology);
    Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC);
    cluster.killTopology(topologyName);
    cluster.shutdown();
  }
  public static void runTopologyRemotely(StormTopology topology, String topologyName, Config conf)
      throws AlreadyAliveException, InvalidTopologyException {
    StormSubmitter.submitTopology(topologyName, conf, topology);
  }
}

 

 

 

  • 大小: 84.4 KB
  • 大小: 79.4 KB
  • 大小: 30.2 KB
分享到:
评论

相关推荐

    Storm实战构建大数据实时计算

    《Storm实战构建大数据实时计算》一书主要涵盖了利用Apache Storm进行大数据实时处理的核心技术和实践案例。Apache Storm是一个开源的分布式实时计算系统,它允许开发者处理无界数据流,具有高吞吐量、低延迟和容错...

    《Storm实战构建大数据实时计算》PDF

    读者将了解到Storm中的spout(数据源)、bolt(数据处理)以及topology(拓扑结构)等关键组件,理解它们如何协同工作来实现数据的实时处理。 接下来,书中详细讲解了如何安装和配置Storm环境,以及如何创建和部署...

    Storm 实战:构建大数据实时计算 PDF带书签完整版

    3. **Spout组件**:Spout是Storm拓扑中的数据源,它可以是任何产生数据流的源头,如消息队列、数据库或是传感器等。Spout负责读取数据并以流的形式分发到Bolt。 4. **Bolt组件**:Bolt是Storm处理逻辑的核心,它们...

    Storm实战构建大数据实时计算( 带书签目录 高清完整版)

    《Storm实战构建大数据实时计算》是一本专注于大数据处理领域的专著,主要围绕开源分布式实时计算系统Apache Storm展开。Apache Storm是一个强大的工具,用于处理大规模的数据流处理,它以高吞吐量、容错性以及实时...

    storm-config:将配置文件传递到Storm拓扑的示例

    这是如何将配置参数传递到Storm拓扑中的示例。 它建立在基础上。 与简单的Echo拓扑一样,包括基于的生产者。 配置 配置文件存储在config目录中。 其中包含的两个是docker.properties和aws.properties 。 码头工人 ...

    Storm事务性拓扑详解教程.docx

    在标准的Storm拓扑中,处理数据主要依赖于`ack`和`fail`机制。当一个元组被成功处理后,会发送一个`ack`信号,如果处理失败则发送`fail`。然而,当元组需要重发时,可能会导致重复处理,这就引入了不准确性和一致性...

    storm demo 单机版 maven

    描述提到"stormdemo实现不安装storm系列软件的基础上,运行调试storm拓扑",这暗示了该项目包含了一种方法,可以在没有预先安装Apache Storm的情况下运行和调试Storm拓扑。通常,这可以通过使用Maven的`storm-...

    apache-pig-on-storm:使用Apache Pig的Pig Latin生成并运行Apache Storm拓扑

    Pig Latin的抽象层次使得它成为构建Storm拓扑的理想选择,尤其是对于需要复杂数据转换和分析的场景。 描述中的"使用Apache Pig的Pig Latin生成并运行Apache Storm拓扑"进一步强调了这个过程:首先,我们使用Pig ...

    Building Python Real-Time Applications with Storm - Kartik Bhatnagar.pdf.pdf

    第三部分介绍了Petrel,这是一个轻量级的封装,用于简化Storm拓扑的构建和打包过程。这部分解释了如何使用Petrel构建一个基本的拓扑,以及如何进行事件日志记录和错误管理。同时,也讲解了如何管理第三方依赖,并对...

    storm的jar包

    6. **Examples**: 压缩包可能还包括示例代码,帮助初学者理解如何构建和部署Storm拓扑。 7. **Libraries**: 额外的库文件,可能包含Storm与外部系统(如Hadoop、Cassandra等)集成所需的依赖。 8. **Docs**: 文档...

    Getting.Started.with.Storm(2012.8)].Jonathan.Leibiusky.文字版.pdf

    Storm拓扑的构建需要定义Spout、Bolt以及它们之间的连接关系,然后提交到Storm集群中运行。 本书《Getting Started with Storm》由Jonathan Leibiusky、Gabriel Eisbruch和Dario Simonassi编写,由O’Reilly Media...

    storm开发jar包以及storm例子源码

    10. **Topologies的生命周期**:创建、提交、激活、停止和重新平衡,这些都是控制Storm拓扑运行状态的重要操作。 在压缩包中的"strom开发"文件可能包含了上述过程的详细步骤,源码示例,以及可能的配置文件和文档。...

    storm实时代码

    storm-kafka是Storm的一个集成组件,它允许Storm拓扑从Kafka主题中消费消息。这个组件负责管理和维护与Kafka的连接,以及从Kafka消费者组中拉取数据,然后分发到Storm的各个worker节点进行处理。 描述中提到的...

    简单的storm例子.rar

    这个简单的storm例子为学习者提供了一个动手实践的机会,帮助他们掌握如何设计、编写、构建和部署Storm拓扑,从而更好地理解和运用Storm在大数据实时处理中的能力。同时,通过分析源代码,开发者还可以了解到Storm的...

    workberch-tolopogy:由 Taverna Workbench 上的工作流文件创建的动态 Apache Storm 拓扑

    在本项目中,"workberch-tolopogy" 提供了一种方法,将 Taverna 工作流转换为 Apache Storm 的动态拓扑,从而利用 Storm 的实时处理能力。 1. **Taverna Workbench**: Taverna 是一个开放源代码的工作流管理系统...

    StormStorm集成Kafka 从Kafka中读取数据

    `KafkaSpout`是一个特殊的Spout,它负责从Kafka获取数据并将其作为流传递到Storm拓扑的其余部分。以下步骤概述了这一过程: 1. 添加依赖:在项目中引入Storm和Kafka相关的库,如storm-kafka或storm-kafka-client。 ...

    storm-kafka整合代码

    1. **拓扑(Topology)**:拓扑是 Storm 应用的核心,它定义了数据流的处理逻辑,由 spout 和 bolt 组成。 2. **worker**:worker 是运行在集群节点上的进程,负责执行拓扑中的任务。 3. **spout**:spout 是数据源...

Global site tag (gtag.js) - Google Analytics