`

Twitter Storm源代码分析之acker工作流程

 
阅读更多

转载:http://xumingming.sinaapp.com/410/twitter-storm-code-analysis-acker-merchanism/

Twitter Storm源代码分析之acker工作流程

 

概述

我们知道storm一个很重要的特性是它能够保证你发出的每条消息都会被完整处理, 完整处理的意思是指:

一个tuple被完全处理的意思是: 这个tuple以及由这个tuple所导致的所有的tuple都被成功处理。而一个tuple会被认为处理失败了如果这个消息在timeout所指定的时间内没有成功处理。

也就是说对于任何一个spout-tuple以及它的所有子孙到底处理成功失败与否我们都会得到通知。关于如果做到这一点的原理,可以看看Twitter Storm如何保证消息不丢失这篇文章。从那篇文章里面我们可以知道,storm里面有个专门的acker来跟踪所有tuple的完成情况。这篇文章就来讨论acker的详细工作流程。

源代码列表

这篇文章涉及到的源代码主要包括:

  1. backtype.storm.daemon.acker
  2. backtype.storm.daemon.task
  3. backtype.storm.task.OutputCollectorImpl

算法简介

acker对于tuple的跟踪算法是storm的主要突破之一, 这个算法使得对于任意大的一个tuple树, 它只需要恒定的20字节就可以进行跟踪了。原理很简单:acker对于每个spout-tuple保存一个ack-val的校验值,它的初始值是0, 然后每发射一个tuple/ack一个tuple,那么tuple的id都要跟这个校验值异或一下,并且把得到的值更新为ack-val的新值。那么假设每个发射出去的tuple都被ack了, 那么最后ack-val一定是0(因为一个数字跟自己异或得到的值是0)。

进入正题

那么下面我们从源代码层面来看看哪些组件在哪些时候会给acker发送什么样的消息来共同完成这个算法的。acker对消息进行处理的主要是下面这块代码:

01
02
03
04
05
06
07
08
09
10
11
(let [id (.getValue tuple 0)
^TimeCacheMap pending @pending
curr (.get pending id)
curr (condp = (.getSourceStreamId tuple)
ACKER-INIT-STREAM-ID (->; curr
(update-ack id)
(assoc :spout-task (.getValue tuple 1)))
ACKER-ACK-STREAM-ID (update-ack
curr (.getValue tuple 1))
ACKER-FAIL-STREAM-ID (assoc curr :failed true))]
...)

Spout创建一个新的tuple的时候给acker发送消息

消息格式(看上面代码的第1行和第7行对于tuple.getValue()的调用)

1
(spout-tuple-id, task-id)

消息的streamId是__ack_init(ACKER-INIT-STREAM-ID)

这是告诉acker, 一个新的spout-tuple出来了, 你跟踪一下,它是由id为task-id的task创建的(这个task-id在后面会用来通知这个task:你的tuple处理成功了/失败了)。处理完这个消息之后, acker会在它的pending这个map(类型为TimeCacheMap)里面添加这样一条记录:

1
{spout-tuple-id {:spout-task task-id :val ack-val)}

这就是acker对spout-tuple进行跟踪的核心数据结构, 对于每个spout-tuple所产生的tuple树的跟踪都只需要保存上面这条记录。acker后面会检查:val什么时候变成0,变成0, 说明这个spout-tuple产生的tuple都处理完成了。

Bolt发射一个新tuple的时候会给acker发送消息么?

任何一个bolt在发射一个新的tuple的时候,是不会直接通知acker的,如果这样做的话那么每发射一个消息会有三条消息了:

  1. Bolt创建这个tuple的时候,把它发给下一个bolt的消息
  2. Bolt创建这个tuple的时候,发送给acker的消息
  3. ack tuple的时候发送的ack消息

事实上storm里面只有第一条和第三条消息,它把第二条消息省掉了, 怎么做到的呢?storm这点做得挺巧妙的,bolt在发射一个新的bolt的时候会把这个新tuple跟它的父tuple的关系保存起来。然后在ack每个tuple的时候,storm会把要ack的tuple的id, 以及这个tuple新创建的所有的tuple的id的异或值发送给acker。这样就给每个tuple省掉了一个消息(具体看下一节)。

Tuple被ack的时候给acker发送消息

每个tuple在被ack的时候,会给acker发送一个消息,消息格式是:

1
(spout-tuple-id, tmp-ack-val)

消息的streamId是__ack_ack(ACKER-ACK-STREAM-ID)

注意,这里的tmp-ack-val是要ack的tuple的id与由它新创建的所有的tuple的id异或的结果:

1
tuple-id ^ (child-tuple-id1 ^ child-tuple-id2 ... )

我们可以从task.clj里面的send-ack方法看出这一点:

01
02
03
04
05
06
07
08
09
10
11
12
13
(defn- send-ack [^TopologyContext topology-context
^Tuple input-tuple
^List generated-ids send-fn]
(let [ack-val (bit-xor-vals generated-ids)]
(doseq [
[anchor id] (.. input-tuple
getMessageId
getAnchorsToIds)]
(send-fn (Tuple. topology-context
[anchor (bit-xor ack-val id)]
(.getThisTaskId topology-context)
ACKER-ACK-STREAM-ID))
)))

这里面的generated-ids参数就是这个input-tuple的所有子tuple的id, 从代码可以看出storm会给这个tuple的每一个spout-tuple发送一个ack消息。

为什么说这里的generated-ids是input-tuple的子tuple呢? 这个send-ack是被OutputCollectorImpl里面的ack方法调用的:

1
2
3
4
5
6
7
public void ack(Tuple input) {
List generated = getExistingOutput(input);
// don't just do this directly in case
// there was no output
_pendingAcks.remove(input);
_collector.ack(input, generated);
}

generated是由getExistingOutput(input)方法计算出来的, 我们再来看看这个方法的定义:

1
2
3
4
5
6
7
8
9
private List getExistingOutput(Tuple anchor) {
if(_pendingAcks.containsKey(anchor)) {
return _pendingAcks.get(anchor);
} else {
List ret = new ArrayList();
_pendingAcks.put(anchor, ret);
return ret;
}
}

_pendingAcks里面存的是什么东西呢?

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private Tuple anchorTuple(Collection< Tuple > anchors,
String streamId,
List< Object > tuple) {
// The simple algorithm in this function is the key
// to Storm. It is what enables Storm to guarantee
// message processing.
// 这个map存的东西是 spout-tuple-id到ack-val的映射
Map< Long, Long > anchorsToIds
= new HashMap<Long, Long>();
// anchors 其实就是它的所有父亲:spout-tuple
if(anchors!=null) {
for(Tuple anchor: anchors) {
long newId = MessageId.generateId();
// 告诉每一个父亲,你们又多了一个儿子了。
getExistingOutput(anchor).add(newId);
for(long root: anchor.getMessageId()
.getAnchorsToIds().keySet()) {
Long curr = anchorsToIds.get(root);
if(curr == null) curr = 0L;
 
// 更新spout-tuple-id的ack-val
anchorsToIds.put(root, curr ^ newId);
}
}
}
return new Tuple(_context, tuple,
_context.getThisTaskId(),
streamId,
MessageId.makeId(anchorsToIds));
}

从上面代码里面的红色部分我们可以看出, _pendingAcks里面维护的其实就是tuple到自己儿子的对应关系。

Tuple处理失败的时候会给acker发送失败消息

acker会忽略这种消息的消息内容(消息的streamId为ACKER-FAIL-STREAM-ID), 直接将对应的spout-tuple标记为失败(最上面代码第9行)

最后Acker发消息通知spout-tuple对应的Worker

最后, acker会根据上面这些消息的处理结果来通知这个spout-tuple对应的task:

<!-- .entry-utility -->
<!-- #post-## -->
分享到:
评论

相关推荐

    Storm Acker机制

    自诞生之日起,Storm就以独特的设计理念、优良的产品质量表现及年轻、积极、性感的品牌形象迅速吸引了一大批品牌簇拥者。Storm仅仅用了几年时间,就跻身伦敦时尚的前沿,迅速成为时尚腕表的领航者。

    storm利用ack保证数据的可靠性源码

    在分布式计算领域,Apache Storm是一个实时处理系统,它允许开发者处理和分析连续的数据流。Storm的设计目标是确保数据处理的高可靠性和低延迟。在Storm中,"ack机制"是实现数据可靠传输的关键特性,它确保了每个...

    apache-storm-2.1.0-src.tar.gz

    这个“apache-storm-2.1.0-src.tar.gz”文件包含了Apache Storm 2.1.0版本的源代码,允许开发者深入理解其内部工作原理,进行定制化开发或优化性能。 Apache Storm的核心概念包括以下几个方面: 1. **拓扑...

    Flink,Storm,Spark Streaming三种流框架的对比分析

    Flink、Storm、Spark Streaming三种流框架的对比分析 Flink架构及特性分析 Flink是一个原生的流处理系统,提供高级的API。Flink也提供API来像Spark一样进行批处理,但两者处理的基础是完全不同的。Flink把批处理...

    storm企业应用 实战 运维和调优

    Storm集群是由一个主节点(Nimbus)和多个工作节点(Supervisor)构成的。Nimbus负责任务分配,而Supervisor负责执行任务。Storm的处理单元是Topology,它包含了Spout和Bolt。Spout负责数据的输入,而Bolt负责处理...

    storm中文学习资料

    在这个压缩包中,我们可以预期找到一系列帮助理解和掌握Storm技术的文档、教程或者代码示例。 描述虽然简洁,但暗示了这个压缩包的内容可能包括基础概念讲解、配置教程、实战案例分析等,旨在帮助用户从零开始学习...

    Apache Storm-0.8.1 api文档 (html)

    Storm 的核心概念包括拓扑、工作者、任务、 bolts 和 spouts,这些组件共同构成了一个强大的实时处理框架。 在 Apache Storm 0.8.1 版本中,API 文档提供了对这些关键概念的深入理解。以下是一些主要知识点: 1. *...

    Storm实战构建大数据实时计算

    - **拓扑(Topology)**:在Storm中,一个拓扑是由多个Bolt和Spout组成的数据处理流程。Spout负责生成数据流,而Bolt则用于处理和转换数据。 - **工作者(Worker)**:运行在节点上的进程,负责执行拓扑中的任务...

    storm之Tuple元组分词操作Java代码.zip

    综上所述,"storm之Tuple元组分词操作Java代码"这个主题涵盖了Storm的核心概念,如tuples、Bolts、Spouts以及在Java环境中实现分词处理的细节,对于理解和实践Storm实时流处理具有重要意义。通过学习和理解这些代码...

    02、Storm入门到精通storm3-0.pptx

    在本节中,我们将深入探讨Storm的核心概念和特性,包括它的记录级容错原理、配置详解、批处理、TOPN操作、流程聚合、DRPC(Direct RPC)以及executor、worker、task之间的关系和调优。 **Storm记录级容错原理** ...

    Apache+Storm+快速起步.pdf 亲测 好评哦

    Storm提供了acker机制来确保每个消息都会被完全处理,即使在组件失败或者机器崩溃的情况下也不会丢失消息。同时,Storm还支持消息的事务,保证消息的处理是一致的。 Storm的快速起步还包括理解其性能优化。由于...

    storm1.2.1-helloword可靠消息

    Storm由多个组件构成,包括Nimbus(集群协调者)、Supervisor(工作节点管理器)、Worker进程(运行拓扑任务)以及Spout(数据源)和Bolt(数据处理器)。在“Hello World”示例中,我们将创建一个简单的拓扑,该...

    Storm深入学习.pdf

    Storm 允许 Spout 在发射源 Tuple 时指定一个 Message ID,这个 ID 可以是任意对象,用于追踪消息处理的完整性和一致性。记录级容错意味着 Storm 能够告诉用户每个消息单元是否在设定的时间内得到了完全处理。完全...

    storm集群搭建Java客户端测试代码.zip

    在本压缩包“storm集群搭建Java客户端测试代码.zip”中,包含了有关Apache Storm集群的搭建教程以及使用Java客户端进行测试的代码示例。Apache Storm是一个分布式实时计算系统,它允许开发者处理无界数据流,常用于...

    storm记录级容错.docx

    **Storm记录级容错机制详解** Storm作为实时流处理框架,提供了一种高效且精确的一次处理(Exactly-once processing guarantee)方式,即记录级容错。这种机制确保每个消息单元在拓扑中的处理状态可追踪,从而实现...

    第3章storm1

    **第3章 Storm** Storm是实时大数据处理的重要框架,它为分布式、容错且可扩展的数据流处理提供了强大的支持。本章将深入探讨Storm的架构、编程模型以及一个物联网WiFi项目的实现。 **3.1 Storm架构** Storm的...

    从零开始学Storm 第2版

    Storm是一个开源的、用于处理大规模流式数据的工具,广泛应用于实时分析、在线机器学习、持续计算、系统集成等多个领域。本书的第二版更新了最新的技术动态,提供了更丰富的实战案例和更详尽的讲解。 Storm的核心...

    Storm的集群搭建实战课程代码和PPT.rar

    【标题】:Storm的集群搭建实战课程代码和PPT 【描述】:这份资源包含了关于Storm集群搭建的实战课程代码和配套的PPT讲解材料,是学习和掌握Apache Storm分布式流处理系统的重要参考资料。 【标签】:代码 【知识...

    storm提交topology的过程共1页.pdf.zip

    在Storm中,topology是一个逻辑工作单元,它定义了数据流的处理方式,包括从spout(数据源)到bolt(处理组件)的数据流图。提交topology到Storm集群主要包括以下几个步骤: 1. **构建topology**:首先,开发者需要...

Global site tag (gtag.js) - Google Analytics