一、前言
对于不使用trident的人来说,使用基本的storm spout,bolt操作,需要理解storm的ack机制,保证消息的完整性,Storm 提供了三种不同层次的消息保证机制,分别是 At Most Once、At Least Once 以及 Exactly Once。消息保证机制依赖于消息是否被完全处理。
怎样才认为消息被完全处理?每个从 Spout发出的 Tuple可能会生成成千上万个新的 Tuple,形成一棵 Tuple 树,当整棵 Tuple 树的节点都被成功处理了,我们就说从 Spout 发出的 Tuple 被完全处理了。
这里我主要给不使用trident实现业务的同事讲如何实现可靠的spout,bolt。
二、实现可靠的spout
让我们先来看下ISpout接口的几个方法
public class ISpout接口测试 implements ISpout {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
/**
* 1、在任务集群的工作进程内被初始化,提供spout执行所需要的环境
* 2、conf参数是这个spout的strom配置,提供给拓扑与这台主机上的集群配置一起合并
* 3、context主要用来获取这个任务在拓扑中的位置信息,包括该任务的id,该任务的组件id,输入和输出消息等
* 4、collector是收集器,用于从spout发送元祖,收集器是线程安全的,应该作为这个spout对象的实例变量进行保存。
*
*/
}
@Override
public void close() {
/**
* 1、当ISpout关闭时被调用,不能保证close一定被调用,因为在集群中可以使用kill -9 直接杀死工作进程/本地模式除外
*/
}
@Override
public void activate() {
/**
* 当spout从失效模式中激活的时候被调用
*/
}
@Override
public void deactivate() {
/**
* 当spout已经失效的时候被调用,在失效期间,nextTuple()方法不会被调用
*/
}
@Override
public void nextTuple() {
/**
* 1、非阻塞,如果没有元祖可以发送,可休眠,不浪费CPU
* 2、发送元祖到输出收集器SpoutOutputCollector
*/
}
@Override
public void ack(Object msgId) {
/**
* 1、storm断定该spout发送的标识符msgId的元祖已经被成功处理时调用
* 2、ack()方法调用后将消息移除队列(之前的消息是挂起的)
*/
}
@Override
public void fail(Object msgId) {
/**
* 1、storm断定该spout发送的标识符msgId的元祖没有被成功处理时调用
* 2、fail()方法调用后将消息放入队列(之前的消息是挂起的)
*/
}
}
那么我们如何实现可靠的spout呢?
1. 在 nextTuple 函数中调用 emit 函数时需要带一个msgId,用来表示当前的消息(如果消息发送失败会用 msgId 作为参数回调 fail 函数)
2. 自己实现 fail 函数,进行重发(注意,在 storm 中没有 msgId 和消息的对应关系,需要自己进行维护,这点比较坑)
例子:
public class 可靠的spout implements ISpout{ private SpoutOutputCollector collector; @SuppressWarnings("rawtypes") @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } @Override public void close() { } @Override public void activate() { } @Override public void deactivate() { } @Override public void nextTuple() { String curMsg = "发送消息"; String msgId = "发送消息"; //这里我假设MsgId和发送的消息一样,便于维护msgId和消息之间的对应关系 collector.emit(new Values(curMsg),msgId); } @Override public void ack(Object msgId) { } @Override public void fail(Object msgId) { String tmp = (String)msgId; //上面我们设置了 msgId 和消息相同,这里通过 msgId 解析出具体的消息 //消息进行重发 collector.emit(new Values(tmp), msgId); }
三、实现可靠的bolt
同样,先看看IBolt接口提供的几个方法
public class IBolt接口测试 implements IBolt{ @SuppressWarnings("rawtypes") @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { /** * 1、提供bolt运行的一些环境 */ } @Override public void execute(Tuple input) { /** * 1、一次处理一个输入的元祖,元祖对象包括来自哪个组件/流/任务的元数据 * 2、IBolt没有立即处理元祖,而是完整的捕获一个元祖并在以后进行处理 * 3、如果实现basicBolt则不用手动ack() */ } @Override public void cleanup() { /** * 1、当一个bolt即将关闭时调用,不能保证一定被调用,集群的kill -9 不行 * */ } /** * bolt的生命周期:在客户端主机上创建Ibolt对象,bolt被序列化到拓扑,并提及到nimbus,然后nimbus * 启动工作进程(worker)进行反序列化,调用其prepare()方法开始处理 */ }
那我们如何实现可靠的bolt呢,主要有2种方式
3.1 继承 BaseBasicBolt
public final class 第一种可靠的bolt extends BaseBasicBolt {
private static final long serialVersionUID = 1L;
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String sentence = tuple.getString(0);
for (String word : sentence.split("\\s+")) {
//storm自动ack和fail
collector.emit(new Values(word));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
对于继承BaseBasicBolt的Bolt来说,storm内部已经替我们自动ack和fail了,不需我们手动ack,然而这个抽象类不太使用,使用场景单一。
3.2 继承 BaseRichBolt
package com.storm.bolt.可靠性; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; public class 可靠的bolt extends BaseRichBolt { private static final long serialVersionUID = 1L; OutputCollector _collector; @SuppressWarnings("rawtypes") @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this._collector = collector; } @Override public void execute(Tuple tuple) { String sentence = tuple.getString(0); for(String word: sentence.split("\\s+")) { // 建立 anchor 树 _collector.emit(tuple, new Values(word)); } //手动ack _collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
需要我们自己手动ack,但其适应场景更广泛。
四、ack原理
对于每个spout tuple保存一个ack_val值,初始值为0,然后每发射一个tuple或者ack 一个tuple, tuple的 id都要和这个校验值(ack_val)异或,并更新ack_val,如果每个发射出去的tuple都ack了,那么最后ack_val一定是0。
五、ack流程
1、spout发射消息生成一个messageId对象{属性Map<RootId,消息ID>}放入pendingMap中,在未超时时间内保留
2、spout发出消息后给acker bolt(ack其实也是一个特殊的bolt)发射tuple消息 {tuple-id,ack_val,task_id}:
tupe_id:实际上就是rootId
ack_val:刚开始为初始值0
task_id:为spout的id,这样acker才知道是哪个spout发射过来的,如果有多个acker,那么根据task_id哈希取模也能找到对应的acker,保证同一个spout发射出来的消息被同一个acker处理.然后acker bolt从自己的pending对象中新增一条记录{tuple_id,{task_id,ack_val}}
3、bolt接收到消息后(该bolt可能是第一个也可能是最后一个,原理都一样),发射消息给下一个task的过程中也会构建一个MessageId对象,messageId中会进行消息ID(本身消息id)和接收到的消息ID(上一个bolt或者spout传过来)进行异或得到ack_val发给acker.
4、acker收的后根据tuple_id从penging中取出旧的ack_val然后进行异或。
5、继续bolt处理...........
6、如果最终异或结果为0,调用spout的ack方法,如果失败,调用fail方法。
5.1 例子:
1:spout产生一个tuple,初始值0100,同时发送给ack和Bolt1 {acker 值 0100}
2:bolt1接收spout发送过来的0100消息,经过处理后产生了新消息0010,那么bolt1就讲0100^0010发送给acker {
acker值 0100^0010 = 0110
0110^0100=0010
}
3:bolt2接收bolt1发送过来的消息,没有产生任何消息(直接持久化了),那么Bolt2将bolt1的消息 0010发送给acker {
acker值 0100^0010 = 0110
0110^0100=0010
0010^0010=0000
}
4:acker进行整个流程的异或操作 {acker求最终的异或值}
相关推荐
【storm demo】是一个基于Apache Storm的实践项目,旨在教授如何运用Storm的核心组件——Spout、Bolt和Topology来构建实时数据处理系统。Apache Storm是一个开源的分布式实时计算系统,能够处理大量的流数据,并保证...
Topology是Storm中的核心概念,它定义了数据如何在Spout和Bolt之间流动。在这个案例中,我们可以创建一个`WordCountTopology`,其中包含一个Spout和两个Bolt,用DAG(有向无环图)的形式连接它们:Spout -> ...
这部分详细讲解了Spout和Bolt的实现细节,包括它们的核心方法、生命周期管理,以及如何编写自定义的Spout和Bolt。 通过以上步骤,开发者可以深入理解Storm的实时数据处理机制,并具备开发实际应用的能力。在实践中...
通过这个简单的“storm统计单词数”的demo,初学者可以了解Storm的基本操作,包括编写Spout和Bolt,构建Topology,以及如何在本地和集群中运行。同时,这个例子也展示了Storm在处理实时数据流时的灵活性和强大功能。...
7. **Java编程**:由于标签为"java",因此"storm入门"的学习中,会涉及到使用Java语言编写spout和bolt的逻辑,理解如何通过Java API与Storm交互。 8. **实时数据处理**:Storm非常适合实时数据分析,例如实时日志...
**Mutilang 协议**:Storm 支持使用非 JVM 语言编写 Spout 和 Bolt,通过 Mutilang 协议进行通信。 **初始握手**:建立 Spout 或 Bolt 与 Storm 集群之间的连接。 **启动循环并读写元组**:实现非 JVM 语言的 ...
Storm的核心概念包括拓扑(Topology)、工作者(Worker)、节点(Task)以及 Bolt 和 Spout。拓扑是Storm中的工作单元,它由多个Spout和Bolt组成,定义了数据流的处理逻辑。Spout是数据流的来源,它可以是任何类型的...
用户通过编写Spout和Bolt的代码,定义自己的数据处理流程。 在"storm-starter-master"这个项目中,通常会包含一系列的示例,如简单的单词计数(WordCount)、日志分析等,这些示例有助于初学者理解如何在Storm中...
总结来说,Apache Storm是一个功能强大的实时计算框架,它支持可扩展、可靠和容错的实时数据处理。这本书为初学者提供了一条学习和掌握Storm的途径,从安装配置到理解其架构、组件、操作模式和高级特性,再到编写和...
Apache Storm的核心概念包括:拓扑(Topology)、工作者(Worker)、节点(Spout)和 bolt(Bolt)。拓扑是 Storm 应用的基本结构,由多个节点和bolt组成,它们之间通过流(Stream)进行连接。节点负责产生数据流(Spout),而...
3. **编写Spout和Bolt**:根据需求,创建自定义的Spout和Bolt类。Spout可能会包含一个生成随机数的方法,而Bolt则接收这些随机数并打印出来。 4. **定义Topology**:在Java代码中,定义Topology,连接Spout和Bolt,...
8. **开发与部署**:书中会涵盖开发环境的搭建、编写Spout和Bolt、测试及优化,以及在生产环境中部署和监控Storm集群的方法。 9. **与其他系统的集成**:Storm可以与Hadoop、Cassandra、Kafka等其他大数据技术集成...
1. **定义拓扑**:编写Spout和Bolt,定义它们之间的连接关系,以及数据流的处理逻辑。 2. **提交拓扑**:使用`StormSubmitter`类将拓扑提交到Storm集群。集群可以是本地模式(仅用于测试),也可以是分布式模式。 ...
Storm是Apache的一个分布式实时计算系统,它允许开发者编写能够处理无界数据流的复杂事件处理逻辑。在Storm中,核心概念是`拓扑(Topology)`,它代表了数据处理的计算图。拓扑由节点(Node)组成,这些节点可以是`...
此外,书中的实例部分将展示如何编写自定义的Spout和Bolt,以及如何使用 Trident API 实现更复杂的实时处理逻辑。Trident是Storm的一个高级接口,它提供了状态管理和事务性保证,使得实时处理更加稳定可靠。 书中还...
3. **编写代码**:用Java或Clojure编写Spout和Bolt,实现具体的数据处理逻辑。 4. **本地模式**:在本地环境中运行拓扑,进行测试和调试。 5. **提交到集群**:了解如何配置和提交拓扑到真实的Storm集群,以处理大...
Storm 提供了一个灵活的编程模型,允许开发者使用任何编程语言编写Spout和Bolt。这意味着开发者可以根据具体的需求选择最适合的语言来实现计算逻辑。 #### 使用场景 Storm 非常适合以下几种场景: - **实时数据分析...
Storm的编程和开发通常涉及到编写Spout和Bolt,组装成一个Topology,然后提交给Nimbus。例如,使用TopologyBuilder类来构建Topology,配置执行的任务数(Number of Workers),并使用StormSubmitter类提交Topology到...