`

storm spout bolt api基本介绍

 
阅读更多

注:转帖请注明,原帖地址:

http://blog.csdn.net/xeseo/article/details/17750379

 


 

Component

Storm中,Spout和Bolt都是其Component。所以,Storm定义了一个名叫IComponent的总接口
 
 
全家普如下:
 
绿色部分是我们最常用、比较简单的部分。红色部分是与事务相关的,在以后的文章会具体讲解。
 
BaseComponent 是Storm提供的“偷懒”的类。为什么这么说呢,它及其子类,都或多或少实现了其接口定义的部分方法。这样我们在用的时候,可以直接继承该类,而不是自己每次都写所有的方法。但值得一提的是,BaseXXX这种定义的类,它所实现的方法,都是空的,直接返回null。 
 


Spout

在前面基本例子中,我们实现了一个RandomSpout,来看看其类图

 

 

  • Spout的最顶层抽象是ISpout接口。

 

 

open方法是初始化动作。允许你在该spout初始化时做一些动作,传入了上下文,方便取上下文的一些数据。

close方法在该spout关闭前执行,但是并不能得到保证其一定被执行。spout是作为task运行在worker内,在cluster模式下,supervisor会直接kill -9 woker的进程,这样它就无法执行了。而在本地模式下,只要不是kill -9, 如果是发送停止命令,是可以保证close的执行的。

activatedeactivate :一个spout可以被暂时激活和关闭,这两个方法分别在对应的时刻被调用。

 

nextTuple 用来发射数据。

ack(Object) 

传入的Object其实是一个id,唯一表示一个tuple。该方法是这个id所对应的tuple被成功处理后执行。

fail(Object)

同ack,只不过是tuple处理失败时执行。

 

我们的RandomSpout 由于继承了BaseRichSpout,所以不用实现close、activate、deactivate、ack、fail和getComponentConfiguration方法,只关心最基本核心的部分。

 

结论:

通常情况下(Shell和事务型的除外),实现一个Spout,可以直接实现接口IRichSpout,如果不想写多余的代码,可以直接继承BaseRichSpout。



Bolt

ExclaimBasicBolt的类图:
这里可以看到一个奇怪的问题:
为什么IBasicBolt并没有继承IBolt?
我们带着问题往下看。
 
IBolt定义了三个方法:

 

  • IBolt继承了java.io.Serializable,我们在nimbus上提交了topology以后,创建出来的bolt会序列化后发送到具体执行的worker上去。worker在执行该Bolt时,会先调用prepare方法传入当前执行的上下文
  • execute接受一个tuple进行处理,并用prepare方法传入的OutputCollector的ack方法(表示成功)或fail(表示失败)来反馈处理结果
  • cleanup 同ISpout的close方法,在关闭前调用。同样不保证其一定执行。
红色部分是Bolt实现时一定要注意的地方。而Storm提供了IBasicBolt接口,其目的就是实现该接口的Bolt不用在代码中提供反馈结果了,Storm内部会自动反馈成功。
如果你确实要反馈失败,可以抛出FailedException。
 
我们来再写一个Bolt继承BaseRichBolt替代ExclaimBasicBolt。代码如下:
  1. public class ExclaimRichBolt extends BaseRichBolt {  
  2.   
  3.     private OutputCollector collector;  
  4.       
  5.     @Override  
  6.     public void prepare(Map stormConf, TopologyContext context,  
  7.             OutputCollector collector) {  
  8.         this.collector = collector;  
  9.     }  
  10.   
  11.     @Override  
  12.     public void execute(Tuple tuple) {  
  13.         this.collector.emit(tuple, new Values(tuple.getString(0)+"!"));  
  14.         this.collector.ack(tuple);  
  15.     }  
  16.   
  17.     @Override  
  18.     public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  19.         declarer.declare(new Fields("after_excl"));  
  20.     }  
  21.   
  22. }  
修改topology
  1. //builder.setBolt("exclaim", new ExclaimBasicBolt(), 2).shuffleGrouping("spout");  
  2. builder.setBolt("exclaim"new ExclaimRichBolt(), 2).shuffleGrouping("spout");  
运行下,结果一致。
 
结论:
通常情况下,实现一个Bolt,可以实现IRichBolt接口或继承BaseRichBolt,如果不想自己处理结果反馈,可以实现IBasicBolt接口或继承BaseBasicBolt,它实际上相当于自动做掉了prepare方法和collector.emit.ack(inputTuple);
分享到:
评论

相关推荐

    Apache Storm-0.8.1 API 参考文档 ( Html版 )

    5. **Tuple**:Storm的基本数据结构,由一组键值对构成,用于在Spouts和Bolts之间传递数据。 二、API详解 1. **TopologyBuilder**:用于构建Topologies的类,提供了添加Spouts和Bolts以及定义Stream Groupings的...

    Storm API实现词频统计

    Storm由多个组件构成,包括Spout(数据源)、Bolt(数据处理逻辑)以及Topology(拓扑结构)。在这个词频统计案例中,Spout可能是我们自定义的Java程序,用于生成随机或特定的文本数据流;Bolt则负责对这些数据进行...

    Apache Storm-0.9.1 API 参考文档

    Apache Storm 是一个开源的分布式...总结,Apache Storm-0.9.1 API 参考文档为开发者提供了详细指导,涵盖了从基本概念、组件实现、拓扑构建到集群管理和性能调优的全方位知识,帮助开发者高效地构建实时数据处理系统。

    Apache Storm-0.8.1 api文档 (html)

    5. **任务(Tasks)**:任务是拓扑中的执行单元,每个任务对应拓扑中定义的一个 bolt 或 spout 的实例。任务由工作者进程托管,负责实际的数据处理工作。 6. **流(Stream)**:流是数据在 Storm 中的传输方式,...

    Storm 源码分析

    Storm的基本概念包括Topology、Spout、Bolt和Stream。其中,Topology是一个计算任务的定义,由一组Spout和Bolt组成;Spout作为数据源,负责发送数据;Bolt作为数据处理组件,可以接收多个Spout或其他Bolt的数据并...

    storm demo

    【storm demo】是一个基于Apache Storm的实践项目,旨在教授如何运用Storm的核心组件——Spout、Bolt和Topology来构建实时数据处理系统。Apache Storm是一个开源的分布式实时计算系统,能够处理大量的流数据,并保证...

    vortex-storm:一个Apache Storm Spout和Bolt,用于使用和生成Vortex数据

    DDS是用于以数据为中心的连接的中间件协议和API标准,并且是唯一能够满足物联网(IoT)的高级要求的标准。 DDS提供了业务和关键任务IoT应用程序所需的低延迟数据连接,极高的可靠性和可扩展性。 有关更多信息,请...

    storm入门 PDF 下载

    7. **Java编程**:由于标签为"java",因此"storm入门"的学习中,会涉及到使用Java语言编写spout和bolt的逻辑,理解如何通过Java API与Storm交互。 8. **实时数据处理**:Storm非常适合实时数据分析,例如实时日志...

    storm的测试源码

    Storm的核心概念包括拓扑(Topology)、工作者(Worker)、节点(Task)以及 Bolt 和 Spout。拓扑是Storm中的工作单元,它由多个Spout和Bolt组成,定义了数据流的处理逻辑。Spout是数据流的来源,它可以是任何类型的...

    Storm的WordCount实例

    Storm由多个组件构成,包括Topology(拓扑结构)、Spout(数据源)和Bolt(处理逻辑)。Topology定义了数据流的处理方式,Spout负责产生数据流,而Bolt则执行实际的数据处理任务。在我们的WordCount例子中,Spout...

    JStorm 2.1.1 API

    在JStorm 2.1.1中,`ISpout`接口定义了Spout的基本行为,包括`nextTuple`(发布新数据)、`ack`(确认消息已被处理)和`fail`(处理失败的消息)。开发者可以自定义实现`ISpout`,创建满足特定数据源需求的Spout。 ...

    storm-kakfa使用state例子源码

    State API 是 Storm 提供的一种机制,允许 bolts(处理数据的基本单元)在执行过程中维护状态。这种状态可以是数据库连接、缓存或其他任何需要在处理过程中保持的数据。在这个例子中,State 被用来存储和更新从Kafka...

    storm-wordcount例子

    在"storm-wordcount"中,通常包括三个主要组件:Spout、Split Bolt和Count Bolt。 1. **Spout**:这是数据流的源头。在storm-wordcount中,Spout可能是从一个数据源(如Kafka、Twitter或者简单的静态文本文件)读取...

    storm-starter-master

    Storm的核心组件包括Spout和Bolt。Spout是数据源,它可以是从Kafka、Twitter等外部系统拉取数据,也可以是生成随机数据。Bolt则是处理逻辑的封装,负责数据清洗、聚合、过滤等操作。用户通过编写Spout和Bolt的代码,...

    Storm笔记-PPT

    1. **Tuple**:Storm的基本数据结构,类似于键值对,用于在Spout和Bolt之间传递数据。 2. **Topology**:一个Storm应用由多个Spout和Bolt组成,它们通过Tuples连接形成拓扑结构,定义数据流的处理逻辑。 3. **Stream...

    storm_jars.zip

    在实际使用中,Storm的JAR包分为两类:一类是Storm自身的JAR,如`storm-core.jar`,包含Storm的核心API,如Bolt、Spout等;另一类是依赖的第三方库,如`zookeeper-3.4.x.jar`,用于提供分布式协调服务,确保系统的高...

    使用Java开发的storm的小案例,实现随机数打印

    2. **添加Storm依赖**:在pom.xml文件中,我们需要添加Apache Storm的依赖,以便在项目中使用Storm的API。 3. **编写Spout和Bolt**:根据需求,创建自定义的Spout和Bolt类。Spout可能会包含一个生成随机数的方法,而...

    storm0.9.0jar包

    - **Topology**:这是Storm的基本计算单元,由多个Bolt和Spout组成,定义了数据流的处理逻辑。 - **Spout**:负责生成数据流,可以是从数据库、消息队列或其他数据源读取。 - **Bolt**:执行数据处理逻辑,如过滤、...

    02、Storm入门到精通storm3-1.pptx

    1. **Spout**: Spout是数据流的源头,它可以是从Kestrel队列读取数据,或者连接到Twitter API获取推文流。Spout负责产生并发出`Tuple`,即Storm的数据模型,这些数据流无边界且持续不断。 2. **Bolt**: Bolt则消费...

Global site tag (gtag.js) - Google Analytics