`

Flink1.4 Fault Tolerance源码解析

阅读更多

前言:本篇关注Flink,对Fault Tolerance的源码实现进行阐述,主要介绍Api层及Flink现有实现

本篇文章重点关注以下问题:

  • 具备Fault Tolerance能力的两种对象:Function和Operator
  • 分析两个接口,列举典型实现,并做简要分析

1. 具备Fault Tolerance能力的两种对象

  • Function
  • Operator

1.1 Function对象

       org.apache.flink.api.common.functions.Function

     所有用户自定义函数的基本接口,如已经预定义的FlatMapFunction就是基础自Function,Function并未定义任何方法,只是作为标识接口。

     所有Function对象的Fault Tolerance都是通过继承CheckpointedFunction接口实现的,换话说,容错能力是Function的可选项,这点与Operator不同

1.2 Operator对象

         org.apache.flink.streaming.api.operators.StreamOperator

        所有Operator的基本接口,如已经预定义的StreamFilter、StreamFlatMap就是StreamOperator的实现。

与Function是标识接口不同,StreamOperator内置了几个和检查点相关的接口方法,因此,在Operator中,容错能力是实现Operator的必选项,这点不难理解,因为Operator处于运行时时,诸如分区信息都是必要要做快照的。

 2. CheckpointedFunction

         org.apache.flink.streaming.api.checkpoint. CheckpointedFunction

 

 CheckpointedFunction接口是有状态转换函数的核心接口,两个接口方法:

  • initializeState:Function初始化的时候调用,一般用作初始化state数据结构。
  • snapshotState:请求state快照时被调用,方法签名中的参数FunctionSnapshotContext可以获取此Function中的所有State信息(快照),通过该上下文,可以获取该Function之前变更所产生的最终结果。

2.1 FlinkKafkaProducerBase

       org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase


     
方法签名

public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements

           CheckpointListener,

           ResultTypeQueryable<T>,

          CheckpointedFunction

      FlinkKafkaConsumerBaseFlink实现基于KafkaSource的基类,Kafka提供基于offset并且可重复消费的机制,使其非常容易实现Fault Tolerance机制。

关键代码:

/** Consumer从各topic partitions读取的初始offsets. */
private Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets;

/** 保存已消费的、但是Offset未提交至Broken或Zk的数据. */
private final LinkedMap pendingOffsetsToCommit = new LinkedMap();

/**
 * 如果程序从Checkpoint启动,此变量保存此Consumer上次消费的offset</br>
 * 
 * <p>此变量主要由 {@link #initializeState(FunctionInitializationContext)} 进行赋值.
 *
 */
private transient volatile TreeMap<KafkaTopicPartition, Long> restoredState;

/** 在state backend上保存的State信息(Offset信息) . */
private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;

@Override
public final void initializeState(FunctionInitializationContext context) throws Exception {

	OperatorStateStore stateStore = context.getOperatorStateStore();
	
	// 兼容1.2.0版本的State,可无视
	ListState<Tuple2<KafkaTopicPartition, Long>> oldRoundRobinListState =
		stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);

	// 各Partition的offset信息
	this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(
			OFFSETS_STATE_NAME,
			TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));

	if (context.isRestored() && !restoredFromOldState) {
		restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());

		// 兼容1.2.0版本的State,可无视
		for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : oldRoundRobinListState.get()) {
			restoredFromOldState = true;
			unionOffsetStates.add(kafkaOffset);
		}
		oldRoundRobinListState.clear();

		if (restoredFromOldState && discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) {
			throw new IllegalArgumentException(
				"Topic / partition discovery cannot be enabled if the job is restored from a savepoint from Flink 1.2.x.");
		}

		// 将待恢复的State信息保存进‘restoredState’变量中,以便程序异常时用于恢复
		for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
			restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
		}

		LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoredState);
	} else {
		LOG.info("No restore state for FlinkKafkaConsumer.");
	}
}

@Override
public final void snapshotState(FunctionSnapshotContext context) throws Exception {
	if (!running) {
		LOG.debug("snapshotState() called on closed source");
	} else {
		// 首先清空state backend对应offset的全局存储(State信息)
		unionOffsetStates.clear();

		// KafkaServer的连接器,根据Kafka版本由子类实现
		final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
		if (fetcher == null) {
			// 连接器还未初始化,unionOffsetStates的值从 restored offsets 或是 subscribedPartition上读取
			for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
				unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
			}

			if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
				// 如果启用快照时同步提交Offset,则在初始化时,用restoredState给pendingOffsetsToCommit赋值
				pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
			}
		} else {
			// 通过连接器获取当前消费的Offsets
			HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();

			if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
				// 保存当前消费的Offset
				pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
			}

			// 给state backend对应offset的全局存储(State信息)赋值
			for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
				unionOffsetStates.add(
						Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
			}
		}

		if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
			// pendingOffsetsToCommit的保护机制,最多存储100个元素,正也是此Map需要有序的原因
			while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
				pendingOffsetsToCommit.remove(0);
			}
		}
	}
}

快照总结:

  • initializeState方法从state backend中恢复State,并将相关信息保存入restoredState;
  • snapshotState方法将当前准备放入state backend的state信息保存至unionOffsetStates,如果应用需要在快照的同时提交Offset,则将消费的Offset信息保存至pendingOffsetsToCommit。
        FlinkKafkaConsumerBase继承了CheckpointListener接口,此接口是一个监听接口,以便当快照完成时通知Function进行一些必要处理;FlinkKafkaConsumerBase借用此接口来提交Offset,代码如下:
@Override
public final void notifyCheckpointComplete(long checkpointId) throws Exception {
	if (!running) {
		LOG.debug("notifyCheckpointComplete() called on closed source");
		return;
	}

	final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
	if (fetcher == null) {
		LOG.debug("notifyCheckpointComplete() called on uninitialized source");
		return;
	}

	if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
		try {
			// 在pendingOffsetsToCommit中找出checkpointId对应的offset信息
			final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
			if (posInMap == -1) {
				LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
				return;
			}

			@SuppressWarnings("unchecked")
			// 取出checkpointId对应的Offset信息
			Map<KafkaTopicPartition, Long> offsets =
				(Map<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);

			// 将该checkpointId之前的Offset信息移除(pendingOffsetsToCommit有序的原因)
			for (int i = 0; i < posInMap; i++) {
				pendingOffsetsToCommit.remove(0);
			}

			if (offsets == null || offsets.size() == 0) {
				LOG.debug("Checkpoint state was empty.");
				return;
			}

			// 通过连接器向Broken或Zk提交Offset信息
			fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
		} catch (Exception e) {
			if (running) {
				throw e;
			}
		}
	}
}

 2.2 其他实现

   因项目接Kafka,故只研究了KafkaConsumerFunction的容错处理实现,其他诸如StatefulSequenceSource、MessageAcknowledgingSourceBase实现类似。

3. StreamOperator

   org.apache.flink.streaming.api.operators.StreamOperator

   
   
StreamOperator内置了我们上面谈到的几个跟检查点相关的接口方法:

  • initializeState
  • snapshotState
  • notifyOfCompletedCheckpoint

        正因为快照相关方法都已内置在StreamOperator这个顶层接口中,所以operator中快照机制由可选项变成了必选项。

        这里需要注意的是snapshotState方法,它返回值为OperatorSnapshotResult。它是一个可以存储四种State类型的容器:

  • keyedStateManagedFuture
  • keyedStateRawFuture
  • operatorStateManagedFuture
  • operatorStateRawFuture
  • 有关四种State类型不是本节重点,可参考:https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/state.html

       下面以Flink内置的一个Operator(StreamFlatMap)为切入点,介绍一些常用类。

  

3.1 AbstractStreamOperator

         org.apache.flink.streaming.api.operators.AbstractStreamOperator

   AbstractStreamOperatorStreamOperator的抽象类,为operator的实现提供模板,当然也为以上的三个跟快照相关的接口方法的实现提供了模板。

3.2 AbstractUdfStreamOperator

         org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator

       该抽象类继承自AbstractStreamOperator,用于进一步为operator的实现提供模板,不过从类名可以看出来,它主要是为用户定义函数(udf)operator提供模板。

值得注意的是,方法snapshotState中,有如下代码:

 

if (userFunction instanceof CheckpointedFunction) {
	((CheckpointedFunction) userFunction).snapshotState(context);
	return true;
}
          Operator中出现了CheckpointedFunction,这是因为function只是静态的函数,它的运行还必须借助于operator,因此其状态也必须借助于operator来帮助其与Flink的运行时交互以达到最终的持久化的目的。

3.3 StreamFlatMap

StreamFlatMap代码较为简单,专注于使用FlatMap对应的Function实现业务逻辑。

 

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
	collector.setTimestamp(element);
	userFunction.flatMap(element.getValue(), collector);
}

 4. Function和StreamOperator之间的关联

       观察AbstractUdfStreamOperator的构造函数:

 

public AbstractUdfStreamOperator(F userFunction) {
	this.userFunction = requireNonNull(userFunction);
	checkUdfCheckpointingPreconditions();
}

         可以发现,所有UDFOperator都内嵌了对应的Function,这是因为Function仅仅是一个静态的函数,其真正需要发挥作用依赖于Operator,以便在Flink运行时进行交互达到持久化目的。

 小结

本篇剖析了Flink针对Function以及Operator如何做快照以及如何恢复的实现。虽然,还没有涉及到fault tolerance的最终实现机制,但是这是我们的入口。

 

  • 大小: 5.2 KB
  • 大小: 17.8 KB
  • 大小: 10.9 KB
  • 大小: 16.7 KB
分享到:
评论

相关推荐

    flink-1.5.4-src.tgz

    《Apache Flink 1.5.4 源码解析》 Apache Flink 是一个用于处理无界和有界数据的开源流处理框架,它在大数据处理领域扮演着重要的角色。Flink 1.5.4 版本是其发展历史上的一个重要里程碑,提供了丰富的功能和优化。...

    flink-1.13.6-bin-scala_2.11.tgz

    5. Checkpointing & Fault Tolerance:Flink 提供了基于状态的检查点机制,保证了系统在发生故障时能够恢复到一致性状态,实现了精确一次的语义。 三、Flink 的运行模式 Flink 可以在多种环境中运行,包括本地模式...

    透过源码看懂Flink核心框架的执行流程.pdf

    #### 5.1 Fault Tolerance演进之路 从Storm的RecordAcknowledgement模式到SparkStreaming的Microbatch模式,再到Google Cloud Dataflow的事务式模型,Flink借鉴并发展了这一系列容错机制。 #### 5.2 Checkpoint的...

    flink-1.7.zip

    - ** Fault Tolerance**:通过检查点(Checkpoint)和两个阶段提交协议,Flink 可以保证故障发生时的精确一次状态一致性。 4. **连接器与格式** - **Connectors**:Flink 支持多种数据源和接收器,如Kafka、HDFS...

    flink技术参考手册

    Flink 的主要特点包括高性能、低延迟、灵活的流处理能力、 fault-tolerance、可扩展性和可配置性等。Flink 可以处理高效的大规模数据流,满足实时处理和.offline 处理的需求。 二、Flink demo 项目 Flink demo 项目...

    基于flink的分布式数据分析系统.zip

    同时,Flink的 Fault Tolerance机制通过状态快照和检查点确保了系统的高可用性,即使在节点故障的情况下也能恢复计算。 再者,Flink在数据集成方面表现出色,它可以与多种数据源和存储系统无缝对接,如Kafka、HDFS...

    flink代码学习求通过

    7. ** fault tolerance**:Flink通过状态快照和回溯实现容错,保证了高可用性。 为了更好地理解`TuikeSortProcessFunction.scala`和`TuikeSortMain.scala`中的代码,你需要熟悉上述Flink的核心概念,并结合实际代码...

    flink样例完整代码

    6. ** fault tolerance**:Flink通过分布式快照实现容错,即使在集群部分节点失败的情况下,也能保证作业的连续运行。 7. **Exactly-once语义**:Flink支持精确一次的语义,保证在故障后恢复时不会丢失或重复处理...

    藏经阁-Flink Forward China 2018——Redefining Computation.pdf

    * Fault Tolerance:阿里云对Flink的故障恢复机制进行了改进,使其具有更高的可用性。 * Flink SQL Improvements:阿里云对Flink的SQL支持进行了改进,使其具有更强的数据处理能力。 Flink是一种功能强大且灵活的流...

    FlinkExample.rar

    3. **FlinkExample中的源码解析** - **数据源(Source)**:FlinkExample通常会包含一个自定义的数据源,如SocketTextStreamSource,用于模拟或者读取实际生产环境中的数据流。 - **转换(Transformations)**:如map...

    Java基于Flink的车联网实时数据平台源码.zip

    6. **容错机制(Fault Tolerance)**:Flink采用检查点和保存点的机制实现容错,保证在节点故障时能够恢复到一致的状态。 7. **部署与扩展性(Deployment & Scalability)**:Flink支持YARN、Kubernetes和...

    FlinkProj-master.zip

    - ** fault tolerance**:Flink通过容错设计,能够在任务失败后自动恢复,保证服务的高可用性。 - **Event Time**:Flink支持事件时间的概念,允许根据事件生成的时间进行处理,而非系统时间,这在处理乱序数据时尤...

    flink-study:Flink学习

    5. Fault Tolerance:通过检查点和保存点机制,Flink能够提供精确一次的状态一致性,确保系统在故障后的恢复。 二、Flink Java API详解 1. 基本操作:使用Java API创建DataStream,可以使用addSource()方法从数据...

    经典英文入门PPT_Stream Processing with Flink 2017(QCon London)_Robert Metzger

    经典英文入门PPT_Stream Processing with Flink 2017...第四、Fault tolerance and correctness 第五、Performance: Low Latency & High Throughput 第六、Closing 简单,精辟,易于抓住重点理解学习,入门经典资料。

    myflink:我的flink练习项目

    - ** fault tolerance**:Flink能够在节点故障后自动重新调度任务,确保系统的高可用性。 5. **性能优化** - **并行度调整**:根据硬件资源和任务需求,调整operator的并行度,提升处理速度。 - **Backpressure...

    藏经阁-Improved Reliable StreamingPro.pdf

    Storm 的解决方案包括分布式架构、 fault-tolerance 和高性能的处理能力。 知识点3: Fingerprint 和 share-split 机制 Fingerprint 和 share-split 机制是一种新的approach,用于提高流处理的可靠性。该机制通过对...

    flinkTest

    12. ** fault-tolerance**: 测试将检查 Flink 的容错机制,确保在故障发生后能够恢复到一致状态。 通过深入研究 "flinkTest-master" 仓库,我们可以获取更多关于如何在实际环境中测试和调试 Flink 应用程序的细节。...

    快手基于 Apache Flink 的优化实践

    - Checkpointing & Fault Tolerance:Flink通过周期性插入barrier执行checkpoint,确保在故障发生时可以从检查点快速恢复。 - Event Time Processing:Flink基于事件时间的处理模型,允许在乱序数据流中保证正确的...

    实时流数据平台架构实践共16页.pdf.zip

    4. 容错性(Fault Tolerance):考虑到硬件和网络的不稳定性,系统应能自动处理错误和恢复。 5. 数据一致性(Data Consistency):保证数据在处理过程中的准确性和完整性。 6. 安全性(Security):确保数据在传输...

Global site tag (gtag.js) - Google Analytics