一、Storm大数据位置
解决方案 |
开发商 |
类型 |
描述 |
Storm |
|
流式处理 |
Twitter 的新流式大数据分析解决方案 |
S4 |
Yahoo! |
流式处理 |
来自 Yahoo! 的分布式流计算平台 |
Hadoop |
Apache |
批处理 |
MapReduce 范式的第一个开源实现 |
Spark |
UC Berkeley AMPLab |
批处理/流处理 |
支持内存中数据集和恢复能力的最新分析平台 |
Disco |
Nokia |
批处理 |
Nokia 的分布式 MapReduce 框架 |
HPCC |
LexisNexis |
批处理 |
HPC 大数据集群 |
二、Storm概念及组件
在Storm拓扑构建前我们先复习一下Storm概念及组件:
Nimbus:负责资源分配和任务调度。
Supervisor:负责接受nimbus分配的任务,启动和停止属于自己管理的worker进程。
Worker:运行具体处理组件逻辑的进程。
Task:worker中每一个spout/bolt的线程称为一个task. 在storm0.8之后,task不再与物理线程对应,同一个
spout/bolt的task可能会共享一个物理线程,该线程称为executor。
Topology:storm中运行的一个实时应用程序,因为各个组件间的消息流动形成逻辑上的一个拓扑结构。
Spout:在一个topology中产生源数据流的组件。通常情况下spout会从外部数据源中读取数据,然后转换为
topology内部的源数据。Spout是一个主动的角色,其接口中有个nextTuple()函数,storm框架会不停地调用
此函数,用户只要在其中生成源数据即可。
Bolt:在一个topology中接受数据然后执行处理的组件。Bolt可以执行过滤、函数操作、合并、写数据库等
任何操作。Bolt是一个被动的角色,其接口中有个execute(Tuple input)函数,在接受到消息后会调用此函
数,用户可以在其中执行自己想要的操作。
Tuple:Storm Spout、Bolt组件消息传递的基本单元(数据模型),Tuple是包含名称的列表,Storm支持所
有原生类型,字节数组为Tuple字段传递,如果要传递自定义对象,需要实现接口serializer。
Stream:源源不断传递的tuple就组成了stream。
图(一)
三、创建逻缉组件
以下图二所示,创建Storm组件Spout、Bolt及Storm拓扑构建并结合拓扑描述并发情况(Worker、
Executor、Task关系)
图(二)
SPOUT:自定义类实现IRichSpout接口或继承BaseRichSpout即创建Spout组件。Spout从外部数据源读取数据
(队列、DRPC等)为Storm拓扑提供数据,Storm可以实现可靠或不可靠,可靠性表现在Storm可以对Storm
toplogy处理失败的tuple进行重发,反之不处理。
Spout可以发送多个流,通过在调用SpoutOutputCollector类的emit方法的同时使用declareStream方法申
明并指定多个流发送。
Spout中主要方法nextTuple,能够发送新流到toplopy及无数据流发送时直接返回。最重要的是不要阻塞
nextTuple方法,因为Spout在同一个线程执行。Spout主要将读取到的数据组织成Tuple发送到Bolt组件处
理。
Spout另一个重要的方法时ack和fail,Storm监控到tuple从Spout发送到toplogy成功完成或失败时调用ack
和fail(数据可靠性请参考其它文档)
示例Spout发tuple到默认流:
package com.sunshine.spout; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; /** * 绿色Spout,参考图二 */ public class GreenSpout extends BaseRichSpout{ private static final long serialVersionUID = -1215556162813479167L; private SpoutOutputCollector collector; /** * Storm自动初始化 */ @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } /** * Storm不断调用此方法,传递单词到Bolt组件处理 */ @Override public void nextTuple() { String[] words = {"green", "yellow", "blue"}; for(String word : words){ /** * Values是List子类,发送数据需要封装在Values * 本次存储一个单词发送与declareOutputFields() * 方法申明输出一个字段对应 */ collector.emit(new Values(word), word); // 1 } } /** * 申明发送tuple字段名称 * 在Bolt组件可以根所名称或索引获取Spout传递的Tuple * tuple.getValue(0) * 或tuple.getValueByField("word"); */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); // 1 } @Override public void ack(Object msgId) { System.out.println("success-->" + msgId); } @Override public void fail(Object msgId) { System.out.println("fail-->" + msgId); } }
BOLT:自定义类实现IRichBolt接口或者继承BaseRichBolt类即Bolt组件。Toplogy里所有处理都在Bolt完成,
Bolt可以做任何事,如过滤、聚合、连接、操作数据库等,也可以将数据传递一下Bolt组件处理。
Bolt可以做简单流转化或发送多个流(参考Spout发送多个流的方式),Bolt成功将消息处理后通知ACK组
件(可靠)。
package com.sunshine.bolt; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; /** *黄色Spout,参考图二 */ public class YellowBolt extends BaseRichBolt{ private static final long serialVersionUID = 7593355203928566992L; private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String word = (String)input.getValueByField("word"); if(word != null){ collector.emit(new Values(word, word)); } collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("yword1","yword2")); } }
package com.sunshine.bolt; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; /** * 蓝色Spout,参考图二 */ public class BlueBolt extends BaseRichBolt{ private static final long serialVersionUID = 4342676753918989102L; private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String yword1 = (String)input.getValueByField("yword1"); String yword2 = (String)input.getValueByField("yword2"); System.out.println("yword1:" + yword1 +",yword2:" + yword2); collector.ack(input); } /** * 结束 */ @Override public void cleanup() { super.cleanup(); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // 不再传递下一个Bolt组件处理 } }
Toplogy(拓扑):参考图二及上述代码,构建Storm拓扑代码如下;在WIN7上使用本地运行模式,存在
Zookeeper连接问题:
“java.net.SocketException: Address family not supported by protocol family: connect”
原因:Win7启动IPV6
解决:增加配置属性System.setProperty("java.net.preferIPv4Stack", "true");
package com.sunshine; import backtype.storm.Config; import backtype.storm.topology.TopologyBuilder; import com.sunshine.bolt.BlueBolt; import com.sunshine.bolt.YellowBolt; import com.sunshine.spout.GreenSpout; import com.sunshine.tools.StormRunner; /** * STROM启动类 * @author OY * @version 0.1 */ public class SimpleTopolog { public static void main(String[] args) throws Exception { // 解决ZOOKEEPER客户端连接服务端问题(IPV6) System.setProperty("java.net.preferIPv4Stack", "true"); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("green-spout", new GreenSpout(), 2) // 2 executor(线程) .setNumTasks(4); // 4 task 对应2个executor builder.setBolt("yellow-bolt", new YellowBolt(),6) // 6 executor(线程) .shuffleGrouping("green-spout"); //tuple随机分发到bolt处理 builder.setBolt("blue-bolt", new BlueBolt(), 2) // 2 executor(线程) .shuffleGrouping("yellow-bolt"); Config conf = new Config(); /*设置工作进程数*/ conf.setNumWorkers(2); conf.setDebug(true); /*本地运行模式*/ StormRunner.runTopologyLocally(builder.createTopology(), "simpleTopology", conf, 0); } }
Storm运行模式工具类
package com.sunshine.tools; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.generated.StormTopology; public final class StormRunner { private static final int MILLIS_IN_SEC = 1000; private StormRunner() { } public static void runTopologyLocally(StormTopology topology, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(topologyName, conf, topology); Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC); cluster.killTopology(topologyName); cluster.shutdown(); } public static void runTopologyRemotely(StormTopology topology, String topologyName, Config conf) throws AlreadyAliveException, InvalidTopologyException { StormSubmitter.submitTopology(topologyName, conf, topology); } }
相关推荐
《Storm实战构建大数据实时计算》一书主要涵盖了利用Apache Storm进行大数据实时处理的核心技术和实践案例。Apache Storm是一个开源的分布式实时计算系统,它允许开发者处理无界数据流,具有高吞吐量、低延迟和容错...
读者将了解到Storm中的spout(数据源)、bolt(数据处理)以及topology(拓扑结构)等关键组件,理解它们如何协同工作来实现数据的实时处理。 接下来,书中详细讲解了如何安装和配置Storm环境,以及如何创建和部署...
3. **Spout组件**:Spout是Storm拓扑中的数据源,它可以是任何产生数据流的源头,如消息队列、数据库或是传感器等。Spout负责读取数据并以流的形式分发到Bolt。 4. **Bolt组件**:Bolt是Storm处理逻辑的核心,它们...
《Storm实战构建大数据实时计算》是一本专注于大数据处理领域的专著,主要围绕开源分布式实时计算系统Apache Storm展开。Apache Storm是一个强大的工具,用于处理大规模的数据流处理,它以高吞吐量、容错性以及实时...
这是如何将配置参数传递到Storm拓扑中的示例。 它建立在基础上。 与简单的Echo拓扑一样,包括基于的生产者。 配置 配置文件存储在config目录中。 其中包含的两个是docker.properties和aws.properties 。 码头工人 ...
在标准的Storm拓扑中,处理数据主要依赖于`ack`和`fail`机制。当一个元组被成功处理后,会发送一个`ack`信号,如果处理失败则发送`fail`。然而,当元组需要重发时,可能会导致重复处理,这就引入了不准确性和一致性...
描述提到"stormdemo实现不安装storm系列软件的基础上,运行调试storm拓扑",这暗示了该项目包含了一种方法,可以在没有预先安装Apache Storm的情况下运行和调试Storm拓扑。通常,这可以通过使用Maven的`storm-...
Pig Latin的抽象层次使得它成为构建Storm拓扑的理想选择,尤其是对于需要复杂数据转换和分析的场景。 描述中的"使用Apache Pig的Pig Latin生成并运行Apache Storm拓扑"进一步强调了这个过程:首先,我们使用Pig ...
第三部分介绍了Petrel,这是一个轻量级的封装,用于简化Storm拓扑的构建和打包过程。这部分解释了如何使用Petrel构建一个基本的拓扑,以及如何进行事件日志记录和错误管理。同时,也讲解了如何管理第三方依赖,并对...
`KafkaSpout`是一个特殊的Spout,它负责从Kafka获取数据并将其作为流传递到Storm拓扑的其余部分。以下步骤概述了这一过程: 1. 添加依赖:在项目中引入Storm和Kafka相关的库,如storm-kafka或storm-kafka-client。 ...
6. **Examples**: 压缩包可能还包括示例代码,帮助初学者理解如何构建和部署Storm拓扑。 7. **Libraries**: 额外的库文件,可能包含Storm与外部系统(如Hadoop、Cassandra等)集成所需的依赖。 8. **Docs**: 文档...
Storm拓扑的构建需要定义Spout、Bolt以及它们之间的连接关系,然后提交到Storm集群中运行。 本书《Getting Started with Storm》由Jonathan Leibiusky、Gabriel Eisbruch和Dario Simonassi编写,由O’Reilly Media...
10. **Topologies的生命周期**:创建、提交、激活、停止和重新平衡,这些都是控制Storm拓扑运行状态的重要操作。 在压缩包中的"strom开发"文件可能包含了上述过程的详细步骤,源码示例,以及可能的配置文件和文档。...
storm-kafka是Storm的一个集成组件,它允许Storm拓扑从Kafka主题中消费消息。这个组件负责管理和维护与Kafka的连接,以及从Kafka消费者组中拉取数据,然后分发到Storm的各个worker节点进行处理。 描述中提到的...
这个简单的storm例子为学习者提供了一个动手实践的机会,帮助他们掌握如何设计、编写、构建和部署Storm拓扑,从而更好地理解和运用Storm在大数据实时处理中的能力。同时,通过分析源代码,开发者还可以了解到Storm的...
在本项目中,"workberch-tolopogy" 提供了一种方法,将 Taverna 工作流转换为 Apache Storm 的动态拓扑,从而利用 Storm 的实时处理能力。 1. **Taverna Workbench**: Taverna 是一个开放源代码的工作流管理系统...
1. **拓扑(Topology)**:拓扑是 Storm 应用的核心,它定义了数据流的处理逻辑,由 spout 和 bolt 组成。 2. **worker**:worker 是运行在集群节点上的进程,负责执行拓扑中的任务。 3. **spout**:spout 是数据源...