前言:本篇关注Flink,对Fault Tolerance的源码实现进行阐述,主要介绍Api层及Flink现有实现。
本篇文章重点关注以下问题:
- 具备Fault Tolerance能力的两种对象:Function和Operator
-
分析两个接口,列举典型实现,并做简要分析
1. 具备Fault Tolerance能力的两种对象
- Function
- Operator
1.1 Function对象
org.apache.flink.api.common.functions.Function
所有用户自定义函数的基本接口,如已经预定义的FlatMapFunction就是基础自Function,Function并未定义任何方法,只是作为标识接口。
所有Function对象的Fault Tolerance都是通过继承CheckpointedFunction接口实现的,换话说,容错能力是Function的可选项,这点与Operator不同
1.2 Operator对象
org.apache.flink.streaming.api.operators.StreamOperator
所有Operator的基本接口,如已经预定义的StreamFilter、StreamFlatMap就是StreamOperator的实现。与Function是标识接口不同,StreamOperator内置了几个和检查点相关的接口方法,因此,在Operator中,容错能力是实现Operator的必选项,这点不难理解,因为Operator处于运行时时,诸如分区信息都是必要要做快照的。
2. CheckpointedFunction
org.apache.flink.streaming.api.checkpoint. CheckpointedFunction
CheckpointedFunction接口是有状态转换函数的核心接口,两个接口方法:
- initializeState:Function初始化的时候调用,一般用作初始化state数据结构。
- snapshotState:请求state快照时被调用,方法签名中的参数FunctionSnapshotContext可以获取此Function中的所有State信息(快照),通过该上下文,可以获取该Function之前变更所产生的最终结果。
2.1 FlinkKafkaProducerBase
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
方法签名:
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction |
FlinkKafkaConsumerBase是Flink实现基于Kafka的Source的基类,Kafka提供基于offset并且可重复消费的机制,使其非常容易实现Fault Tolerance机制。
关键代码:
/** Consumer从各topic partitions读取的初始offsets. */ private Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets; /** 保存已消费的、但是Offset未提交至Broken或Zk的数据. */ private final LinkedMap pendingOffsetsToCommit = new LinkedMap(); /** * 如果程序从Checkpoint启动,此变量保存此Consumer上次消费的offset</br> * * <p>此变量主要由 {@link #initializeState(FunctionInitializationContext)} 进行赋值. * */ private transient volatile TreeMap<KafkaTopicPartition, Long> restoredState; /** 在state backend上保存的State信息(Offset信息) . */ private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates; @Override public final void initializeState(FunctionInitializationContext context) throws Exception { OperatorStateStore stateStore = context.getOperatorStateStore(); // 兼容1.2.0版本的State,可无视 ListState<Tuple2<KafkaTopicPartition, Long>> oldRoundRobinListState = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); // 各Partition的offset信息 this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>( OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {}))); if (context.isRestored() && !restoredFromOldState) { restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator()); // 兼容1.2.0版本的State,可无视 for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : oldRoundRobinListState.get()) { restoredFromOldState = true; unionOffsetStates.add(kafkaOffset); } oldRoundRobinListState.clear(); if (restoredFromOldState && discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) { throw new IllegalArgumentException( "Topic / partition discovery cannot be enabled if the job is restored from a savepoint from Flink 1.2.x."); } // 将待恢复的State信息保存进‘restoredState’变量中,以便程序异常时用于恢复 for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) { restoredState.put(kafkaOffset.f0, kafkaOffset.f1); } LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoredState); } else { LOG.info("No restore state for FlinkKafkaConsumer."); } } @Override public final void snapshotState(FunctionSnapshotContext context) throws Exception { if (!running) { LOG.debug("snapshotState() called on closed source"); } else { // 首先清空state backend对应offset的全局存储(State信息) unionOffsetStates.clear(); // KafkaServer的连接器,根据Kafka版本由子类实现 final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher; if (fetcher == null) { // 连接器还未初始化,unionOffsetStates的值从 restored offsets 或是 subscribedPartition上读取 for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) { unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue())); } if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) { // 如果启用快照时同步提交Offset,则在初始化时,用restoredState给pendingOffsetsToCommit赋值 pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState); } } else { // 通过连接器获取当前消费的Offsets HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState(); if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) { // 保存当前消费的Offset pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets); } // 给state backend对应offset的全局存储(State信息)赋值 for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) { unionOffsetStates.add( Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue())); } } if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) { // pendingOffsetsToCommit的保护机制,最多存储100个元素,正也是此Map需要有序的原因 while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) { pendingOffsetsToCommit.remove(0); } } } }
快照总结:
- initializeState方法从state backend中恢复State,并将相关信息保存入restoredState;
- snapshotState方法将当前准备放入state backend的state信息保存至unionOffsetStates,如果应用需要在快照的同时提交Offset,则将消费的Offset信息保存至pendingOffsetsToCommit。
相关推荐
《Apache Flink 1.5.4 源码解析》 Apache Flink 是一个用于处理无界和有界数据的开源流处理框架,它在大数据处理领域扮演着重要的角色。Flink 1.5.4 版本是其发展历史上的一个重要里程碑,提供了丰富的功能和优化。...
5. Checkpointing & Fault Tolerance:Flink 提供了基于状态的检查点机制,保证了系统在发生故障时能够恢复到一致性状态,实现了精确一次的语义。 三、Flink 的运行模式 Flink 可以在多种环境中运行,包括本地模式...
#### 5.1 Fault Tolerance演进之路 从Storm的RecordAcknowledgement模式到SparkStreaming的Microbatch模式,再到Google Cloud Dataflow的事务式模型,Flink借鉴并发展了这一系列容错机制。 #### 5.2 Checkpoint的...
- ** Fault Tolerance**:通过检查点(Checkpoint)和两个阶段提交协议,Flink 可以保证故障发生时的精确一次状态一致性。 4. **连接器与格式** - **Connectors**:Flink 支持多种数据源和接收器,如Kafka、HDFS...
Flink 的主要特点包括高性能、低延迟、灵活的流处理能力、 fault-tolerance、可扩展性和可配置性等。Flink 可以处理高效的大规模数据流,满足实时处理和.offline 处理的需求。 二、Flink demo 项目 Flink demo 项目...
同时,Flink的 Fault Tolerance机制通过状态快照和检查点确保了系统的高可用性,即使在节点故障的情况下也能恢复计算。 再者,Flink在数据集成方面表现出色,它可以与多种数据源和存储系统无缝对接,如Kafka、HDFS...
7. ** fault tolerance**:Flink通过状态快照和回溯实现容错,保证了高可用性。 为了更好地理解`TuikeSortProcessFunction.scala`和`TuikeSortMain.scala`中的代码,你需要熟悉上述Flink的核心概念,并结合实际代码...
6. ** fault tolerance**:Flink通过分布式快照实现容错,即使在集群部分节点失败的情况下,也能保证作业的连续运行。 7. **Exactly-once语义**:Flink支持精确一次的语义,保证在故障后恢复时不会丢失或重复处理...
* Fault Tolerance:阿里云对Flink的故障恢复机制进行了改进,使其具有更高的可用性。 * Flink SQL Improvements:阿里云对Flink的SQL支持进行了改进,使其具有更强的数据处理能力。 Flink是一种功能强大且灵活的流...
3. **FlinkExample中的源码解析** - **数据源(Source)**:FlinkExample通常会包含一个自定义的数据源,如SocketTextStreamSource,用于模拟或者读取实际生产环境中的数据流。 - **转换(Transformations)**:如map...
6. **容错机制(Fault Tolerance)**:Flink采用检查点和保存点的机制实现容错,保证在节点故障时能够恢复到一致的状态。 7. **部署与扩展性(Deployment & Scalability)**:Flink支持YARN、Kubernetes和...
- ** fault tolerance**:Flink通过容错设计,能够在任务失败后自动恢复,保证服务的高可用性。 - **Event Time**:Flink支持事件时间的概念,允许根据事件生成的时间进行处理,而非系统时间,这在处理乱序数据时尤...
5. Fault Tolerance:通过检查点和保存点机制,Flink能够提供精确一次的状态一致性,确保系统在故障后的恢复。 二、Flink Java API详解 1. 基本操作:使用Java API创建DataStream,可以使用addSource()方法从数据...
经典英文入门PPT_Stream Processing with Flink 2017...第四、Fault tolerance and correctness 第五、Performance: Low Latency & High Throughput 第六、Closing 简单,精辟,易于抓住重点理解学习,入门经典资料。
- ** fault tolerance**:Flink能够在节点故障后自动重新调度任务,确保系统的高可用性。 5. **性能优化** - **并行度调整**:根据硬件资源和任务需求,调整operator的并行度,提升处理速度。 - **Backpressure...
Storm 的解决方案包括分布式架构、 fault-tolerance 和高性能的处理能力。 知识点3: Fingerprint 和 share-split 机制 Fingerprint 和 share-split 机制是一种新的approach,用于提高流处理的可靠性。该机制通过对...
12. ** fault-tolerance**: 测试将检查 Flink 的容错机制,确保在故障发生后能够恢复到一致状态。 通过深入研究 "flinkTest-master" 仓库,我们可以获取更多关于如何在实际环境中测试和调试 Flink 应用程序的细节。...
- Checkpointing & Fault Tolerance:Flink通过周期性插入barrier执行checkpoint,确保在故障发生时可以从检查点快速恢复。 - Event Time Processing:Flink基于事件时间的处理模型,允许在乱序数据流中保证正确的...
4. 容错性(Fault Tolerance):考虑到硬件和网络的不稳定性,系统应能自动处理错误和恢复。 5. 数据一致性(Data Consistency):保证数据在处理过程中的准确性和完整性。 6. 安全性(Security):确保数据在传输...