https://storm.apache.org/documentation/Transactional-topologies.html
NOTE: Transactional topologies have been deprecated -- use the Trident framework instead.
Storm guarantees data processing by providing an at least once processing guarantee. The most common question asked about Storm is "Given that tuples can be replayed, how do you do things like counting on top of Storm? Won't you overcount?"
Storm 0.7.0 introduces transactional topologies, which enable you to get exactly once messaging semantics for pretty much any computation. So you can do things like counting in a fully-accurate, scalable, and fault-tolerant way.
Like Distributed RPC, transactional topologies aren't so much a feature of Storm as they are a higher level abstraction built on top of Storm's primitives of streams, spouts, bolts, and topologies.
This page explains the transactional topology abstraction, how to use the API, and provides details as to its implementation.
Design 1
The core idea behind transactional topologies is to provide a strong ordering on the processing of data. The simplest manifestation of this, and the first design we'll look at, is processing the tuples one at a time and not moving on to the next tuple until the current tuple has been successfully processed by the topology.
Each tuple is associated with a transaction id. If the tuple fails and needs to be replayed, then it is emitted with the exact same transaction id. A transaction id is an integer that increments for every tuple, so the first tuple will have transaction id 1
, the second id 2
, and so on.
The strong ordering of tuples gives you the capability to achieve exactly-once semantics even in the case of tuple replay. Let's look at an example of how you would do this.
Suppose you want to do a global count of the tuples in the stream. Instead of storing just the count in the database, you instead store the count and the latest transaction id together as one value in the database. When your code updates the count in the db, it should update the count only if the transaction id in the database differs from the transaction id for the tuple currently being processed. Consider the two cases:
- The transaction id in the database is different than the current transaction id: Because of the strong ordering of transactions, we know for sure that the current tuple isn't represented in that count. So we can safely increment the count and update the transaction id.
- The transaction id is the same as the current transaction id: Then we know that this tuple is already incorporated into the count and can skip the update. The tuple must have failed after updating the database but before reporting success back to Storm.
This logic and the strong ordering of transactions ensures that the count in the database will be accurate even if tuples are replayed. Credit for this trick of storing a transaction id in the database along with the value goes to the Kafka devs, particularly this design document.
Furthermore, notice that the topology can safely update many sources of state in the same transaction and achieve exactly-once semantics. If there's a failure, any updates that already succeeded will skip on the retry, and any updates that failed will properly retry. For example, if you were processing a stream of tweeted urls, you could update a database that stores a tweet count for each url as well as a database that stores a tweet count for each domain.
There is a significant problem though with this design of processing one tuple at time. Having to wait for each tuple to be completely processed before moving on to the next one is horribly inefficient. It entails a huge amount of database calls (at least one per tuple), and this design makes very little use of the parallelization capabilities of Storm. So it isn't very scalable.
Design 2
Instead of processing one tuple at a time, a better approach is to process a batch of tuples for each transaction. So if you're doing a global count, you would increment the count by the number of tuples in the entire batch. If a batch fails, you replay the exact batch that failed. Instead of assigning a transaction id to each tuple, you assign a transaction id to each batch, and the processing of the batches is strongly ordered. Here's a diagram of this design:
So if you're processing 1000 tuples per batch, your application will do 1000x less database operations than design 1. Additionally, it takes advantage of Storm's parallelization capabilities as the computation for each batch can be parallelized.
While this design is significantly better than design 1, it's still not as resource-efficient as possible. The workers in the topology spend a lot of time being idle waiting for the other portions of the computation to finish. For example, in a topology like this:
After bolt 1 finishes its portion of the processing, it will be idle until the rest of the bolts finish and the next batch can be emitted from the spout.
Design 3 (Storm's design)
A key realization is that not all the work for processing batches of tuples needs to be strongly ordered. For example, when computing a global count, there's two parts to the computation:
- Computing the partial count for the batch
- Updating the global count in the database with the partial count
The computation of #2 needs to be strongly ordered across the batches, but there's no reason you shouldn't be able to pipeline the computation of the batches by computing #1 for many batches in parallel. So while batch 1 is working on updating the database, batches 2 through 10 can compute their partial counts.
Storm accomplishes this distinction by breaking the computation of a batch into two phases:
- The processing phase: this is the phase that can be done in parallel for many batches
- The commit phase: The commit phases for batches are strongly ordered. So the commit for batch 2 is not done until the commit for batch 1 has been successful.
The two phases together are called a "transaction". Many batches can be in the processing phase at a given moment, but only one batch can be in the commit phase. If there's any failure in the processing or commit phase for a batch, the entire transaction is replayed (both phases).
Design details
When using transactional topologies, Storm does the following for you:
- Manages state: Storm stores in Zookeeper all the state necessary to do transactional topologies. This includes the current transaction id as well as the metadata defining the parameters for each batch.
- Coordinates the transactions: Storm will manage everything necessary to determine which transactions should be processing or committing at any point.
- Fault detection: Storm leverages the acking framework to efficiently determine when a batch has successfully processed, successfully committed, or failed. Storm will then replay batches appropriately. You don't have to do any acking or anchoring -- Storm manages all of this for you.
- First class batch processing API: Storm layers an API on top of regular bolts to allow for batch processing of tuples. Storm manages all the coordination for determining when a task has received all the tuples for that particular transaction. Storm will also take care of cleaning up any accumulated state for each transaction (like the partial counts).
Finally, another thing to note is that transactional topologies require a source queue that can replay an exact batch of messages. Technologies like Kestrel can't do this. Apache Kafka is a perfect fit for this kind of spout, and storm-kafka contains a transactional spout implementation for Kafka.
相关推荐
Learning Storm 英文版 Real-time data processing is no longer a ...and how to create transactional topologies. Finally, the last two chapters cover case studies for log processing and machine learning.
3. Storm的高级特性:Storm不仅仅是一个简单的流处理工具,书中会对Storm的高级特性进行讲解,比如事务拓扑(Transactional Topologies)、可靠消息处理机制、状态管理等。 4. Storm的集群管理和部署:从零开始搭建...
此外,书中的内容也可能涵盖了Storm的高级特性,比如事务性拓扑(Transactional Topologies)和消息可靠性(Message Reliability)。事务性拓扑允许开发者开发出保证消息至少处理一次的应用程序,这对于需要强事务...
在Java后端开发中,Spring框架提供了强大的事务管理能力,特别是在使用Spring Data JPA时,`@Transactional`注解使得事务处理变得简单易用。这个注解是Spring框架中的核心部分,它允许开发者声明性地控制事务边界,...
@Transactional实现原理.txt
《事务内存(Transactional Memory):理解并行编程的新范式》 事务内存(Transactional Memory,简称TM)是一种在多核处理器时代重新引起关注的并行编程技术,它为协调并发线程提供了一种可能更优越的方法。TM的...
Spring3引入了基于注解的事务管理,极大地简化了事务配置,使得开发者可以在方法级别声明事务边界,这就是`@Transactional`注解的用处。本文将深入探讨这个注解以及如何在Spring3中有效地使用它。 `@Transactional`...
本教程将深入探讨如何在Spring中实现自定义事务管理器、编程式事务处理以及声明式事务`@Transactional`的使用。 首先,让我们了解事务管理的基本概念。事务是一组数据库操作,这些操作要么全部执行,要么全部回滚,...
1、aopalliance.jar 这个包是AOP联盟的API包,里面包含了针对面向切面的接口。(通常Spring等其它具备动态织入功能的框架依赖此包) 2、aspectjrt.jar 处理事务和AOP所需的包 3、aspectjweaver.jar 处理事务和AOP所需...
PDF 高清 Rachid Guerraoui, Michal Kapalka, "Principles of Transactional Memory (Synthesis Lectures on Distributed Computing Theory)" ISBN: 1608450112
事务信息系统是一门关于事务处理系统理论、算法以及并发控制和恢复实践的学科。在信息技术中,事务通常是指一系列的操作,这些操作要么全部完成,要么完全不做,以保持数据的完整性。在文件中提到的Gerhard Weikum和...
### 事务内存(Transactional Memory) #### 一、事务内存概述 事务内存(Transactional Memory, TM)是一种编程模型和实现机制,旨在简化并发程序中的数据共享和同步问题。随着多核处理器时代的到来,如何有效地...
TPI – TRANSACTIONAL PROCESS IMPROVEMENT 事务性流程改善 (DBS 工具)
在Spring框架中,`@Transactional`注解是一个强大的工具,用于声明式地管理事务。它使得开发者无需显式地在代码中控制事务的开始、提交和回滚,从而提高了代码的可读性和可维护性。下面我们将深入探讨`@...
在Spring框架中,`@Transactional`注解是事务管理的核心组件,它允许开发者在方法级别声明事务边界。本文将深入探讨这个注解的工作原理、如何配置以及如何在遇到异常时触发事务回滚。 首先,`@Transactional`是...
《Transactional Memory》这本书是关于计算机架构合成讲座系列的一部分,由James R. Larus和Ravi Rajwar撰写,于2007年出版。这本书深入探讨了事务性内存(Transactional Memory,简称TM)这一关键的计算机系统设计...