Storm guarantees that each message coming off a spout will be fully processed. This page describes how Storm accomplishes this guarantee and what you have to do as a user to benefit from Storm's reliability capabilities.
What does it mean for a message to be "fully processed"?
A tuple coming off a spout can trigger thousands of tuples to be created based on it. Consider, for example, the streaming word count topology:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KestrelSpout("kestrel.backtype.com",
22133,
"sentence_queue",
new StringScheme()));
builder.setBolt("split", new SplitSentence(), 10)
.shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 20)
.fieldsGrouping("split", new Fields("word"));
This topology reads sentences off of a Kestrel queue, splits the sentences into its constituent words, and then emits for each word the number of times it has seen that word before. A tuple coming off the spout triggers many tuples being created based on it: a tuple for each word in the sentence and a tuple for the updated count for each word. The tree of messages looks something like this:
Storm considers a tuple coming off a spout "fully processed" when the tuple tree has been exhausted and every message in the tree has been processed. A tuple is considered failed when its tree of messages fails to be fully processed within a specified timeout. This timeout can be configured on a topology-specific basis using the Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS configuration and defaults to 30 seconds.
What happens if a message is fully processed or fails to be fully processed?
To understand this question, let's take a look at the lifecycle of a tuple coming off of a spout. For reference, here is the interface that spouts implement (see the Javadoc for more information):
public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
void close();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}
First, Storm requests a tuple from the Spout
by calling the nextTuple
method on the Spout
. The Spout
uses the SpoutOutputCollector
provided in the open
method to emit a tuple to one of its output streams. When emitting a tuple, the Spout
provides a "message id" that will be used to identify the tuple later. For example, the KestrelSpout
reads a message off of the kestrel queue and emits as the "message id" the id provided by Kestrel for the message. Emitting a message to the SpoutOutputCollector
looks like this:
_collector.emit(new Values("field1", "field2", 3) , msgId);
Next, the tuple gets sent to consuming bolts and Storm takes care of tracking the tree of messages that is created. If Storm detects that a tuple is fully processed, Storm will call the ack
method on the originating Spout
task with the message id that the Spout
provided to Storm. Likewise, if the tuple times-out Storm will call the fail
method on the Spout
. Note that a tuple will be acked or failed by the exact same Spout
task that created it. So if a Spout
is executing as many tasks across the cluster, a tuple won't be acked or failed by a different task than the one that created it.
Let's use KestrelSpout
again to see what a Spout
needs to do to guarantee message processing. When KestrelSpout
takes a message off the Kestrel queue, it "opens" the message. This means the message is not actually taken off the queue yet, but instead placed in a "pending" state waiting for acknowledgement that the message is completed. While in the pending state, a message will not be sent to other consumers of the queue. Additionally, if a client disconnects all pending messages for that client are put back on the queue. When a message is opened, Kestrel provides the client with the data for the message as well as a unique id for the message. The KestrelSpout
uses that exact id as the "message id" for the tuple when emitting the tuple to the SpoutOutputCollector
. Sometime later on, when ack
or fail
are called on the KestrelSpout
, the KestrelSpout
sends an ack or fail message to Kestrel with the message id to take the message off the queue or have it put back on.
What is Storm's reliability API?
There's two things you have to do as a user to benefit from Storm's reliability capabilities. First, you need to tell Storm whenever you're creating a new link in the tree of tuples. Second, you need to tell Storm when you have finished processing an individual tuple. By doing both these things, Storm can detect when the tree of tuples is fully processed and can ack or fail the spout tuple appropriately. Storm's API provides a concise way of doing both of these tasks.
Specifying a link in the tuple tree is called anchoring. Anchoring is done at the same time you emit a new tuple. Let's use the following bolt as an example. This bolt splits a tuple containing a sentence into a tuple for each word:
public class SplitSentence extends BaseRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
_collector.emit(tuple, new Values(word));
}
_collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
Each word tuple is anchored by specifying the input tuple as the first argument to emit
. Since the word tuple is anchored, the spout tuple at the root of the tree will be replayed later on if the word tuple failed to be processed downstream. In contrast, let's look at what happens if the word tuple is emitted like this:
_collector.emit(new Values(word));
Emitting the word tuple this way causes it to be unanchored. If the tuple fails be processed downstream, the root tuple will not be replayed. Depending on the fault-tolerance guarantees you need in your topology, sometimes it's appropriate to emit an unanchored tuple.
An output tuple can be anchored to more than one input tuple. This is useful when doing streaming joins or aggregations. A multi-anchored tuple failing to be processed will cause multiple tuples to be replayed from the spouts. Multi-anchoring is done by specifying a list of tuples rather than just a single tuple. For example:
List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));
Multi-anchoring adds the output tuple into multiple tuple trees. Note that it's also possible for multi-anchoring to break the tree structure and create tuple DAGs, like so:
Storm's implementation works for DAGs as well as trees (pre-release it only worked for trees, and the name "tuple tree" stuck).
Anchoring is how you specify the tuple tree -- the next and final piece to Storm's reliability API is specifying when you've finished processing an individual tuple in the tuple tree. This is done by using the ack
and fail
methods on the OutputCollector
. If you look back at the SplitSentence
example, you can see that the input tuple is acked after all the word tuples are emitted.
You can use the fail
method on the OutputCollector
to immediately fail the spout tuple at the root of the tuple tree. For example, your application may choose to catch an exception from a database client and explicitly fail the input tuple. By failing the tuple explicitly, the spout tuple can be replayed faster than if you waited for the tuple to time-out.
Every tuple you process must be acked or failed. Storm uses memory to track each tuple, so if you don't ack/fail every tuple, the task will eventually run out of memory.
A lot of bolts follow a common pattern of reading an input tuple, emitting tuples based on it, and then acking the tuple at the end of the execute
method. These bolts fall into the categories of filters and simple functions. Storm has an interface called BasicBolt
that encapsulates this pattern for you. The SplitSentence
example can be written as a BasicBolt
like follows:
public class SplitSentence extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
collector.emit(new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
This implementation is simpler than the implementation from before and is semantically identical. Tuples emitted to BasicOutputCollector
are automatically anchored to the input tuple, and the input tuple is acked for you automatically when the execute method completes.
In contrast, bolts that do aggregations or joins may delay acking a tuple until after it has computed a result based on a bunch of tuples. Aggregations and joins will commonly multi-anchor their output tuples as well. These things fall outside the simpler pattern of IBasicBolt
.
How do I make my applications work correctly given that tuples can be replayed?
As always in software design, the answer is "it depends." Storm 0.7.0 introduced the "transactional topologies" feature, which enables you to get fully fault-tolerant exactly-once messaging semantics for most computations. Read more about transactional topologies here.
How does Storm implement reliability in an efficient way?
A Storm topology has a set of special "acker" tasks that track the DAG of tuples for every spout tuple. When an acker sees that a DAG is complete, it sends a message to the spout task that created the spout tuple to ack the message. You can set the number of acker tasks for a topology in the topology configuration using Config.TOPOLOGY_ACKERS. Storm defaults TOPOLOGY_ACKERS to one task -- you will need to increase this number for topologies processing large amounts of messages.
The best way to understand Storm's reliability implementation is to look at the lifecycle of tuples and tuple DAGs. When a tuple is created in a topology, whether in a spout or a bolt, it is given a random 64 bit id. These ids are used by ackers to track the tuple DAG for every spout tuple.
Every tuple knows the ids of all the spout tuples for which it exists in their tuple trees. When you emit a new tuple in a bolt, the spout tuple ids from the tuple's anchors are copied into the new tuple. When a tuple is acked, it sends a message to the appropriate acker tasks with information about how the tuple tree changed. In particular it tells the acker "I am now completed within the tree for this spout tuple, and here are the new tuples in the tree that were anchored to me".
For example, if tuples "D" and "E" were created based on tuple "C", here's how the tuple tree changes when "C" is acked:
Since "C" is removed from the tree at the same time that "D" and "E" are added to it, the tree can never be prematurely completed.
There are a few more details to how Storm tracks tuple trees. As mentioned already, you can have an arbitrary number of acker tasks in a topology. This leads to the following question: when a tuple is acked in the topology, how does it know to which acker task to send that information?
Storm uses mod hashing to map a spout tuple id to an acker task. Since every tuple carries with it the spout tuple ids of all the trees they exist within, they know which acker tasks to communicate with.
Another detail of Storm is how the acker tasks track which spout tasks are responsible for each spout tuple they're tracking. When a spout task emits a new tuple, it simply sends a message to the appropriate acker telling it that its task id is responsible for that spout tuple. Then when an acker sees a tree has been completed, it knows to which task id to send the completion message.
Acker tasks do not track the tree of tuples explicitly. For large tuple trees with tens of thousands of nodes (or more), tracking all the tuple trees could overwhelm the memory used by the ackers. Instead, the ackers take a different strategy that only requires a fixed amount of space per spout tuple (about 20 bytes). This tracking algorithm is the key to how Storm works and is one of its major breakthroughs.
An acker task stores a map from a spout tuple id to a pair of values. The first value is the task id that created the spout tuple which is used later on to send completion messages. The second value is a 64 bit number called the "ack val". The ack val is a representation of the state of the entire tuple tree, no matter how big or how small. It is simply the xor of all tuple ids that have been created and/or acked in the tree.
When an acker task sees that an "ack val" has become 0, then it knows that the tuple tree is completed. Since tuple ids are random 64 bit numbers, the chances of an "ack val" accidentally becoming 0 is extremely small. If you work the math, at 10K acks per second, it will take 50,000,000 years until a mistake is made. And even then, it will only cause data loss if that tuple happens to fail in the topology.
Now that you understand the reliability algorithm, let's go over all the failure cases and see how in each case Storm avoids data loss:
- A tuple isn't acked because the task died: In this case the spout tuple ids at the root of the trees for the failed tuple will time out and be replayed.
- Acker task dies: In this case all the spout tuples the acker was tracking will time out and be replayed.
- Spout task dies: In this case the source that the spout talks to is responsible for replaying the messages. For example, queues like Kestrel and RabbitMQ will place all pending messages back on the queue when a client disconnects.
As you have seen, Storm's reliability mechanisms are completely distributed, scalable, and fault-tolerant.
Tuning reliability
Acker tasks are lightweight, so you don't need very many of them in a topology. You can track their performance through the Storm UI (component id "__acker"). If the throughput doesn't look right, you'll need to add more acker tasks.
If reliability isn't important to you -- that is, you don't care about losing tuples in failure situations -- then you can improve performance by not tracking the tuple tree for spout tuples. Not tracking a tuple tree halves the number of messages transferred since normally there's an ack message for every tuple in the tuple tree. Additionally, it requires fewer ids to be kept in each downstream tuple, reducing bandwidth usage.
There are three ways to remove reliability. The first is to set Config.TOPOLOGY_ACKERS to 0. In this case, Storm will call the ack
method on the spout immediately after the spout emits a tuple. The tuple tree won't be tracked.
The second way is to remove reliability on a message by message basis. You can turn off tracking for an individual spout tuple by omitting a message id in the SpoutOutputCollector.emit
method.
Finally, if you don't care if a particular subset of the tuples downstream in the topology fail to be processed, you can emit them as unanchored tuples. Since they're not anchored to any spout tuples, they won't cause any spout tuples to fail if they aren't acked.
相关推荐
Like Hadoop, Storm processes large amounts of data but it does it reliably and in real time, guaranteeing that every message will be processed. Storm allows you to scale with your data as it grows, ...
Message Transport and Message Processing The Transport Layer: Ideal Capabilities Streaming Data for a Microservices Architecture Beyond Real-Time Applications Geo-Distributed Replication of ...
在日常的工作和学习中,你是否常常为处理复杂的数据、生成高质量的文本或者进行精准的图像识别而烦恼?DeepSeek 或许就是你一直在寻找的解决方案!它以其高效、智能的特点,在各个行业都展现出了巨大的应用价值。然而,想要充分发挥 DeepSeek 的优势,掌握从入门到精通的知识和技能至关重要。本文将从实际应用的角度出发,为你详细介绍 DeepSeek 的基本原理、操作方法以及高级技巧。通过系统的学习,你将能够轻松地运用 DeepSeek 解决实际问题,提升工作效率和质量,让自己在职场和学术领域脱颖而出。现在,就让我们一起开启这场实用又高效的学习之旅吧!
在日常的工作和学习中,你是否常常为处理复杂的数据、生成高质量的文本或者进行精准的图像识别而烦恼?DeepSeek 或许就是你一直在寻找的解决方案!它以其高效、智能的特点,在各个行业都展现出了巨大的应用价值。然而,想要充分发挥 DeepSeek 的优势,掌握从入门到精通的知识和技能至关重要。本文将从实际应用的角度出发,为你详细介绍 DeepSeek 的基本原理、操作方法以及高级技巧。通过系统的学习,你将能够轻松地运用 DeepSeek 解决实际问题,提升工作效率和质量,让自己在职场和学术领域脱颖而出。现在,就让我们一起开启这场实用又高效的学习之旅吧!
ACM动态规划模板-区间修改线段树问题模板
# 踏入C语言的奇妙编程世界 在编程的广阔宇宙中,C语言宛如一颗璀璨恒星,以其独特魅力与强大功能,始终占据着不可替代的地位。无论你是编程小白,还是有一定基础想进一步提升的开发者,C语言都值得深入探索。 C语言的高效性与可移植性令人瞩目。它能直接操控硬件,执行速度快,是系统软件、嵌入式开发的首选。同时,代码可在不同操作系统和硬件平台间轻松移植,极大节省开发成本。 学习C语言,能让你深入理解计算机底层原理,培养逻辑思维和问题解决能力。掌握C语言后,再学习其他编程语言也会事半功倍。 现在,让我们一起开启C语言学习之旅。这里有丰富教程、实用案例、详细代码解析,助你逐步掌握C语言核心知识和编程技巧。别再犹豫,加入我们,在C语言的海洋中尽情遨游,挖掘无限可能,为未来的编程之路打下坚实基础!
在日常的工作和学习中,你是否常常为处理复杂的数据、生成高质量的文本或者进行精准的图像识别而烦恼?DeepSeek 或许就是你一直在寻找的解决方案!它以其高效、智能的特点,在各个行业都展现出了巨大的应用价值。然而,想要充分发挥 DeepSeek 的优势,掌握从入门到精通的知识和技能至关重要。本文将从实际应用的角度出发,为你详细介绍 DeepSeek 的基本原理、操作方法以及高级技巧。通过系统的学习,你将能够轻松地运用 DeepSeek 解决实际问题,提升工作效率和质量,让自己在职场和学术领域脱颖而出。现在,就让我们一起开启这场实用又高效的学习之旅吧!
本项目为Python语言开发的PersonRelationKnowledgeGraph设计源码,总计包含49个文件,涵盖19个.pyc字节码文件、12个.py源代码文件、8个.txt文本文件、3个.xml配置文件、3个.png图片文件、2个.md标记文件、1个.iml项目配置文件、1个.cfg配置文件。该源码库旨在构建一个用于表示和查询人物关系的知识图谱系统。
在日常的工作和学习中,你是否常常为处理复杂的数据、生成高质量的文本或者进行精准的图像识别而烦恼?DeepSeek 或许就是你一直在寻找的解决方案!它以其高效、智能的特点,在各个行业都展现出了巨大的应用价值。然而,想要充分发挥 DeepSeek 的优势,掌握从入门到精通的知识和技能至关重要。本文将从实际应用的角度出发,为你详细介绍 DeepSeek 的基本原理、操作方法以及高级技巧。通过系统的学习,你将能够轻松地运用 DeepSeek 解决实际问题,提升工作效率和质量,让自己在职场和学术领域脱颖而出。现在,就让我们一起开启这场实用又高效的学习之旅吧!
rtsp实时预览接口URL:/evo-apigw/admin/API/MTS/Video/StartVideo HLS、FLV、RTMP实时预览接口方式 :接口URL/evo-apigw/admin/API/video/stream/realtime 参数名 必选 类型 说明 data true string Json串 +channelId true string 视频通道编码 +streamType true string 码流类型:1=主码流, 2=辅码流,3=辅码流2 +type true string 协议类型:hls,hlss,flv,flvs,ws_flv,wss_flv,rtmp hls:http协议,m3u8格式,端口7086; hlss:https协议,m3u8格式,端口是7096; flv:http协议,flv格式,端口7886; flvs:https协议,flv格式,端口是7896; ws_flv:ws协议,flv格式,端口是7886; wss_flv:wss协议,flv格式,端口是7896; rtmp:rtmp协议,端口是1975;
Simulink永磁风机飞轮储能系统二次调频技术研究:频率特性分析与参数优化,Simulink永磁风机飞轮储能二次调频技术:系统频率特性详解及参数优化研究参考详实文献及两区域系统应用,simulink永磁风机飞轮储能二次调频,系统频率特性如下,可改变调频参数改善频率。 参考文献详细,两区域系统二次调频。 ,核心关键词: 1. Simulink 2. 永磁风机 3. 飞轮储能 4. 二次调频 5. 系统频率特性 6. 调频参数 7. 改善频率 8. 参考文献 9. 两区域系统 以上关键词用分号(;)分隔,结果为:Simulink;永磁风机;飞轮储能;二次调频;系统频率特性;调频参数;改善频率;参考文献;两区域系统。,基于Simulink的永磁风机与飞轮储能系统二次调频研究:频率特性及调频参数优化
MATLAB驱动的ASR防滑转模型:PID与对照控制算法对比,冰雪路面条件下滑移率与车速轮速对照展示,MATLAB驱动的ASR防滑转模型:PID与对照控制算法对比,冰雪路面条件下滑移率与车速轮速对照图展示,MATLAB驱动防滑转模型ASR模型 ASR模型驱动防滑转模型 ?牵引力控制系统模型 选择PID控制算法以及对照控制算法,共两种控制算法,可进行选择。 选择冰路面以及雪路面,共两种路面条件,可进行选择。 控制目标为滑移率0.2,出图显示车速以及轮速对照,出图显示车辆轮胎滑移率。 模型简单,仅供参考。 ,MATLAB; ASR模型; 防滑转模型; 牵引力控制系统模型; PID控制算法; 对照控制算法; 冰路面; 雪路面; 控制目标; 滑移率; 车速; 轮速。,MATLAB驱动的ASR模型:PID与对照算法在冰雪路面的滑移率控制研究
芯片失效分析方法介绍 -深入解析芯片故障原因及预防措施.pptx
4131_127989170.html
内容概要:本文提供了一个全面的PostgreSQL自动化部署解决方案,涵盖智能环境适应、多平台支持、内存与性能优化以及安全性加强等重要方面。首先介绍了脚本的功能及其调用方法,随后详细阐述了操作系统和依赖软件包的准备过程、配置项的自动生成机制,还包括对实例的安全性和监控功能的强化措施。部署指南给出了具体的命令操作指导,便于新手理解和执行。最后强调了该工具对于不同硬件条件和服务需求的有效应对能力,特别是针对云计算环境下应用的支持特点。 适合人群:对PostgreSQL集群运维有一定基础并渴望提高效率和安全性的数据库管理员及工程师。 使用场景及目标:本脚本能够帮助企业在大规模部署时减少人工介入时间,确保系统的稳定性与高性能,适用于各类需要稳定可靠的数据库解决方案的企业或机构,特别是在大数据量和高并发事务处理场合。 其他说明:文中还提及了一些高级功能如自动备份、流复制等设置步骤,使得该方案不仅可以快速上线而且能满足后续维护和发展阶段的要求。同时提到的技术性能数据也为用户评估其能否满足业务需求提供了直观参考。
房地产开发合同[示范文本].doc
在日常的工作和学习中,你是否常常为处理复杂的数据、生成高质量的文本或者进行精准的图像识别而烦恼?DeepSeek 或许就是你一直在寻找的解决方案!它以其高效、智能的特点,在各个行业都展现出了巨大的应用价值。然而,想要充分发挥 DeepSeek 的优势,掌握从入门到精通的知识和技能至关重要。本文将从实际应用的角度出发,为你详细介绍 DeepSeek 的基本原理、操作方法以及高级技巧。通过系统的学习,你将能够轻松地运用 DeepSeek 解决实际问题,提升工作效率和质量,让自己在职场和学术领域脱颖而出。现在,就让我们一起开启这场实用又高效的学习之旅吧!
在日常的工作和学习中,你是否常常为处理复杂的数据、生成高质量的文本或者进行精准的图像识别而烦恼?DeepSeek 或许就是你一直在寻找的解决方案!它以其高效、智能的特点,在各个行业都展现出了巨大的应用价值。然而,想要充分发挥 DeepSeek 的优势,掌握从入门到精通的知识和技能至关重要。本文将从实际应用的角度出发,为你详细介绍 DeepSeek 的基本原理、操作方法以及高级技巧。通过系统的学习,你将能够轻松地运用 DeepSeek 解决实际问题,提升工作效率和质量,让自己在职场和学术领域脱颖而出。现在,就让我们一起开启这场实用又高效的学习之旅吧!
工程技术承包合同[示范文本].doc
蓝桥杯开发赛【作品源码】