概述
我们知道storm一个很重要的特性是它能够保证你发出的每条消息都会被完整处理, 完整处理的意思是指:
一个tuple被完全处理的意思是: 这个tuple以及由这个tuple所导致的所有的tuple都被成功处理。而一个tuple会被认为处理失败了如果这个消息在timeout所指定的时间内没有成功处理。
也就是说对于任何一个spout-tuple以及它的所有子孙到底处理成功失败与否我们都会得到通知。关于如果做到这一点的原理,可以看看Twitter Storm如何保证消息不丢失这篇文章。从那篇文章里面我们可以知道,storm里面有个专门的acker来跟踪所有tuple的完成情况。这篇文章就来讨论acker的详细工作流程。
源代码列表
这篇文章涉及到的源代码主要包括:
算法简介
acker对于tuple的跟踪算法是storm的主要突破之一, 这个算法使得对于任意大的一个tuple树, 它只需要恒定的20字节就可以进行跟踪了。原理很简单:acker对于每个spout-tuple保存一个ack-val的校验值,它的初始值是0, 然后每发射一个tuple/ack一个tuple,那么tuple的id都要跟这个校验值异或一下,并且把得到的值更新为ack-val的新值。那么假设每个发射出去的tuple都被ack了, 那么最后ack-val一定是0(因为一个数字跟自己异或得到的值是0)。
进入正题
那么下面我们从源代码层面来看看哪些组件在哪些时候会给acker发送什么样的消息来共同完成这个算法的。acker对消息进行处理的主要是下面这块代码:
01
02
03
04
05
06
07
08
09
10
11
|
( let [ id (.getValue tuple 0)
^TimeCacheMap pending @pending
curr (.get pending id)
curr (condp = (.getSourceStreamId tuple)
ACKER-INIT-STREAM-ID (-> ; curr
(update-ack id)
(assoc :spout-task (.getValue tuple 1)))
ACKER-ACK-STREAM-ID (update-ack
curr (.getValue tuple 1))
ACKER-FAIL-STREAM-ID (assoc curr :failed true)) ]
...)
|
Spout创建一个新的tuple的时候给acker发送消息
消息格式(看上面代码的第1行和第7行对于tuple.getValue()
的调用)
1
|
(spout-tuple-id, task-id) |
消息的streamId是__ack_init(ACKER-INIT-STREAM-ID)
这是告诉acker, 一个新的spout-tuple出来了, 你跟踪一下,它是由id为task-id的task创建的(这个task-id在后面会用来通知这个task:你的tuple处理成功了/失败了)。处理完这个消息之后, acker会在它的pending这个map(类型为TimeCacheMap)里面添加这样一条记录:
1
|
{spout-tuple-id { :spout-task task-id :val ack-val)}
|
这就是acker对spout-tuple进行跟踪的核心数据结构, 对于每个spout-tuple所产生的tuple树的跟踪都只需要保存上面这条记录。acker后面会检查:val什么时候变成0,变成0, 说明这个spout-tuple产生的tuple都处理完成了。
Bolt发射一个新tuple的时候会给acker发送消息么?
任何一个bolt在发射一个新的tuple的时候,是不会直接通知acker的,如果这样做的话那么每发射一个消息会有三条消息了:
- Bolt创建这个tuple的时候,把它发给下一个bolt的消息
Bolt创建这个tuple的时候,发送给acker的消息- ack tuple的时候发送的ack消息
事实上storm里面只有第一条和第三条消息,它把第二条消息省掉了, 怎么做到的呢?storm这点做得挺巧妙的,bolt在发射一个新的bolt的时候会把这个新tuple跟它的父tuple的关系保存起来。然后在ack每个tuple的时候,storm会把要ack的tuple的id, 以及这个tuple新创建的所有的tuple的id的异或值发送给acker。这样就给每个tuple省掉了一个消息(具体看下一节)。
Tuple被ack的时候给acker发送消息
每个tuple在被ack的时候,会给acker发送一个消息,消息格式是:
1
|
(spout-tuple-id, tmp-ack-val) |
消息的streamId是__ack_ack(ACKER-ACK-STREAM-ID)
注意,这里的tmp-ack-val是要ack的tuple的id与由它新创建的所有的tuple的id异或的结果:
1
|
tuple-id ^ (child-tuple-id1 ^ child-tuple-id2 ... ) |
我们可以从task.clj里面的send-ack方法看出这一点:
01
02
03
04
05
06
07
08
09
10
11
12
13
|
( defn - send -ack [ ^TopologyContext topology-context
^Tuple input-tuple
^ List generated-ids send - fn ]
( let [ ack-val (bit-xor-vals generated-ids) ]
( doseq [
[ anchor id ] (.. input-tuple
getMessageId
getAnchorsToIds) ]
( send - fn (Tuple. topology-context
[ anchor (bit-xor ack-val id) ]
(.getThisTaskId topology-context)
ACKER-ACK-STREAM-ID))
)))
|
这里面的generated-ids
参数就是这个input-tuple的所有子tuple的id, 从代码可以看出storm会给这个tuple的每一个spout-tuple发送一个ack消息。
为什么说这里的generated-ids
是input-tuple的子tuple呢? 这个send-ack是被OutputCollectorImpl里面的ack方法调用的:
1
2
3
4
5
6
7
|
public void ack(Tuple input) {
List generated = getExistingOutput(input);
// don't just do this directly in case
// there was no output
_pendingAcks.remove(input);
_collector.ack(input, generated);
} |
generated是由getExistingOutput(input)
方法计算出来的, 我们再来看看这个方法的定义:
1
2
3
4
5
6
7
8
9
|
private List getExistingOutput(Tuple anchor) {
if (_pendingAcks.containsKey(anchor)) {
return _pendingAcks.get(anchor);
} else {
List ret = new ArrayList();
_pendingAcks.put(anchor, ret);
return ret;
}
} |
_pendingAcks
里面存的是什么东西呢?
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
private Tuple anchorTuple(Collection< Tuple > anchors,
String streamId,
List< Object > tuple) {
// The simple algorithm in this function is the key
// to Storm. It is what enables Storm to guarantee
// message processing.
// 这个map存的东西是 spout-tuple-id到ack-val的映射
Map< Long, Long > anchorsToIds
= new HashMap<Long, Long>();
// anchors 其实就是它的所有父亲:spout-tuple
if (anchors!= null ) {
for (Tuple anchor: anchors) {
long newId = MessageId.generateId();
// 告诉每一个父亲,你们又多了一个儿子了。
getExistingOutput(anchor).add(newId);
for ( long root: anchor.getMessageId()
.getAnchorsToIds().keySet()) {
Long curr = anchorsToIds.get(root);
if (curr == null ) curr = 0L;
// 更新spout-tuple-id的ack-val
anchorsToIds.put(root, curr ^ newId);
}
}
}
return new Tuple(_context, tuple,
_context.getThisTaskId(),
streamId,
MessageId.makeId(anchorsToIds));
} |
从上面代码里面的红色部分我们可以看出, _pendingAcks
里面维护的其实就是tuple到自己儿子的对应关系。
Tuple处理失败的时候会给acker发送失败消息
acker会忽略这种消息的消息内容(消息的streamId为ACKER-FAIL-STREAM-ID
), 直接将对应的spout-tuple标记为失败(最上面代码第9行)
最后Acker发消息通知spout-tuple对应的Worker
最后, acker会根据上面这些消息的处理结果来通知这个spout-tuple对应的task:
相关推荐
自诞生之日起,Storm就以独特的设计理念、优良的产品质量表现及年轻、积极、性感的品牌形象迅速吸引了一大批品牌簇拥者。Storm仅仅用了几年时间,就跻身伦敦时尚的前沿,迅速成为时尚腕表的领航者。
在分布式计算领域,Apache Storm是一个实时处理系统,它允许开发者处理和分析连续的数据流。Storm的设计目标是确保数据处理的高可靠性和低延迟。在Storm中,"ack机制"是实现数据可靠传输的关键特性,它确保了每个...
这个“apache-storm-2.1.0-src.tar.gz”文件包含了Apache Storm 2.1.0版本的源代码,允许开发者深入理解其内部工作原理,进行定制化开发或优化性能。 Apache Storm的核心概念包括以下几个方面: 1. **拓扑...
Flink、Storm、Spark Streaming三种流框架的对比分析 Flink架构及特性分析 Flink是一个原生的流处理系统,提供高级的API。Flink也提供API来像Spark一样进行批处理,但两者处理的基础是完全不同的。Flink把批处理...
Storm集群是由一个主节点(Nimbus)和多个工作节点(Supervisor)构成的。Nimbus负责任务分配,而Supervisor负责执行任务。Storm的处理单元是Topology,它包含了Spout和Bolt。Spout负责数据的输入,而Bolt负责处理...
在这个压缩包中,我们可以预期找到一系列帮助理解和掌握Storm技术的文档、教程或者代码示例。 描述虽然简洁,但暗示了这个压缩包的内容可能包括基础概念讲解、配置教程、实战案例分析等,旨在帮助用户从零开始学习...
Storm 的核心概念包括拓扑、工作者、任务、 bolts 和 spouts,这些组件共同构成了一个强大的实时处理框架。 在 Apache Storm 0.8.1 版本中,API 文档提供了对这些关键概念的深入理解。以下是一些主要知识点: 1. *...
- **拓扑(Topology)**:在Storm中,一个拓扑是由多个Bolt和Spout组成的数据处理流程。Spout负责生成数据流,而Bolt则用于处理和转换数据。 - **工作者(Worker)**:运行在节点上的进程,负责执行拓扑中的任务...
综上所述,"storm之Tuple元组分词操作Java代码"这个主题涵盖了Storm的核心概念,如tuples、Bolts、Spouts以及在Java环境中实现分词处理的细节,对于理解和实践Storm实时流处理具有重要意义。通过学习和理解这些代码...
在本节中,我们将深入探讨Storm的核心概念和特性,包括它的记录级容错原理、配置详解、批处理、TOPN操作、流程聚合、DRPC(Direct RPC)以及executor、worker、task之间的关系和调优。 **Storm记录级容错原理** ...
Storm提供了acker机制来确保每个消息都会被完全处理,即使在组件失败或者机器崩溃的情况下也不会丢失消息。同时,Storm还支持消息的事务,保证消息的处理是一致的。 Storm的快速起步还包括理解其性能优化。由于...
Storm由多个组件构成,包括Nimbus(集群协调者)、Supervisor(工作节点管理器)、Worker进程(运行拓扑任务)以及Spout(数据源)和Bolt(数据处理器)。在“Hello World”示例中,我们将创建一个简单的拓扑,该...
Storm 允许 Spout 在发射源 Tuple 时指定一个 Message ID,这个 ID 可以是任意对象,用于追踪消息处理的完整性和一致性。记录级容错意味着 Storm 能够告诉用户每个消息单元是否在设定的时间内得到了完全处理。完全...
在本压缩包“storm集群搭建Java客户端测试代码.zip”中,包含了有关Apache Storm集群的搭建教程以及使用Java客户端进行测试的代码示例。Apache Storm是一个分布式实时计算系统,它允许开发者处理无界数据流,常用于...
**Storm记录级容错机制详解** Storm作为实时流处理框架,提供了一种高效且精确的一次处理(Exactly-once processing guarantee)方式,即记录级容错。这种机制确保每个消息单元在拓扑中的处理状态可追踪,从而实现...
**第3章 Storm** Storm是实时大数据处理的重要框架,它为分布式、容错且可扩展的数据流处理提供了强大的支持。本章将深入探讨Storm的架构、编程模型以及一个物联网WiFi项目的实现。 **3.1 Storm架构** Storm的...
Storm是一个开源的、用于处理大规模流式数据的工具,广泛应用于实时分析、在线机器学习、持续计算、系统集成等多个领域。本书的第二版更新了最新的技术动态,提供了更丰富的实战案例和更详尽的讲解。 Storm的核心...
【标题】:Storm的集群搭建实战课程代码和PPT 【描述】:这份资源包含了关于Storm集群搭建的实战课程代码和配套的PPT讲解材料,是学习和掌握Apache Storm分布式流处理系统的重要参考资料。 【标签】:代码 【知识...
在Storm中,topology是一个逻辑工作单元,它定义了数据流的处理方式,包括从spout(数据源)到bolt(处理组件)的数据流图。提交topology到Storm集群主要包括以下几个步骤: 1. **构建topology**:首先,开发者需要...