本文根据陈肃老师在 Apache Kafka x Flink Meetup 深圳站的分享整理而成,文章首先将从数据融合角度,谈一下 DataPipeline 对批流一体架构的看法,以及如何设计和使用一个基础框架。其次,数据的一致性是进行数据融合时最基础的问题。如果数据无法实现一致,即使同步再快,支持的功能再丰富,都没有意义。
另外,DataPipeline 目前使用的基础框架为 Kafka Connect。为实现一致性的语义保证,我们做了一些额外工作,希望对大家有一定的参考意义。
最后,会提一些我们在应用 Kafka Connect 框架时,遇到的一些现实的工程问题,以及应对方法。尽管大家的场景、环境和数据量级不同,但也有可能会遇到这些问题。希望对大家的工作有所帮助。
一、批流一体架构
批和流是数据融合的两种应用形态
下图来自 Flink 官网。传统的数据融合通常基于批模式。在批的模式下,我们会通过一些周期性运行的 ETL JOB,将数据从关系型数据库、文件存储向下游的目标数据库进行同步,中间可能有各种类型的转换。
另一种是 Data Pipeline 模式。与批模式相比相比, 其最核心的区别是将批量变为实时:输入的数据不再是周期性的去获取,而是源源不断的来自于数据库的日志、消息队列的消息。进而通过一个实时计算引擎,进行各种聚合运算,产生输出结果,并且写入下游。
现代的一些处理框架,包括 Flink、Kafka Streams、Spark,或多或少都能够支持批和流两种概念。只不过像 Kafka,其原生就是为流而生,所以如果基于 Kafka Connect 做批流一体,你可能需要对批量的数据处理做一些额外工作,这是我今天重点要介绍的。
数据融合的基本问题
如果问题简化到你只有一张表,可能是一张 MySQL 的表,里面只有几百万行数据,你可能想将其同步到一张 Hive 表中。基于这种情况,大部分问题都不会遇到。因为结构是确定的,数据量很小,且没有所谓的并行化问题。
但在一个实际的企业场景下,如果做一个数据融合系统,就不可避免要面临几方面的挑战:
第一,“动态性”
数据源会不断地发生变化,主要归因于:表结构的变化,表的增减。针对这些情况,你需要有一些相应的策略进行处理。
第二,“可伸缩性”
任何一个分布式系统,必须要提供可伸缩性。因为你不是只同步一张表,通常会有大量数据同步任务在进行着。如何在一个集群或多个集群中进行统一的调度,保证任务并行执行的效率,这是一个要解决的基本问题。
第三,“容错性”
在任何环境里你都不能假定服务器是永远在正常运行的,网络、磁盘、内存都有可能发生故障。这种情况下一个 Job 可能会失败,之后如何进行恢复?状态能否延续?是否会产生数据的丢失和重复?这都是要考虑的问题。
第四,“异构性”
当我们做一个数据融合项目时,由于源和目的地是不一样的,比如,源是 MySQL,目的地是 Oracle,可能它们对于一个字段类型定义的标准是有差别的。在同步时,如果忽略这些差异,就会造成一系列的问题。
第五,“一致性”
一致性是数据融合中最基本的问题,即使不考虑数据同步的速度,也要保证数据一致。数据一致性的底线为:数据先不丢,如果丢了一部分,通常会导致业务无法使用;在此基础上更好的情况是:源和目的地的数据要完全一致,即所谓的端到端一致性,如何做到呢?
Lambda 架构是批流一体化的必然要求
目前在做这样的平台时,业界比较公认的有两种架构:一种是 Lambda 架构,Lambda 架构的核心是按需使用批量和流式的处理框架,分别针对批式和流式数据提供相应的处理逻辑。最终通过一个服务层进行对外服务的输出。
为什么我们认为 Lambda 架构是批流一体化的必然要求?这好像看起来是矛盾的(与之相对,还有一种架构叫 Kappa 架构,即用一个流式处理引擎解决所有问题)。
实际上,这在很大程度来自于现实中用户的需求。DataPipeline 在刚刚成立时只有一种模式,只支持实时流同步,在我们看来这是未来的一种趋势。
但后来发现,很多客户实际上有批量同步的需求。比如,银行在每天晚上可能会有一些月结、日结,证券公司也有类似的结算服务。基于一些历史原因,或出于对性能、数据库配置的考虑,可能有的数据库本身不能开 change log。所以实际上并不是所有情况下都能从源端获取实时的流数据。
考虑到上述问题,我们认为一个产品在支撑数据融合过程中,必须能同时支撑批量和流式两种处理模式,且在产品里面出于性能和稳定性考虑提供不同的处理策略,这才是一个相对来说比较合理的基础架构。
数据融合的 Ad-Hoc 模式
具体到做这件事,还可以有两种基础的应用模式。假如我需要将数据从 MySQL 同步到 Hive,可以直接建立一个 ETL 的 JOB(例如基于 Flink),其中封装所有的处理逻辑,包括从源端读取数据,然后进行变换写入目的地。在将代码编译好以后,就可以放到 Flink 集群上运行,得到想要的结果。这个集群环境可以提供所需要的基础能力,刚才提到的包括分布式,容错等。
数据融合的 MQ 模式
另一种模式是 ETL JOB 本身输入输出实际上都是面对消息队列的,实际上这是现在最常使用的一种模式。在这种模式下,需要通过一些独立的数据源和目的地连接器,来完成数据到消息队列的输入和输出。ETL JOB 可以用多种框架实现,包括 Flink、Kafka Streams 等,ETL JOB 只和消息队列发生数据交换。
DP 选择 MQ 模式的理由
DataPipeline 选择 MQ 模式,主要有几点考虑:
第一,在我们产品应用中有一个非常常见的场景:要做数据的一对多分发。数据要进行一次读取,然后分发到各种不同的目的地,这是一个非常适合消息队列使用的分发模型。
第二,有时会对一次读取的数据加不同的处理逻辑,我们希望这种处理不要重新对源端产生一次读取。所以在多数情况下,都需将数据先读到消息队列,然后再配置相应的处理逻辑。
第三,Kafka Connect 就是基于 MQ 模式的,它有大量的开源连接器。基于 Kafka Connect 框架,我们可以重用这些连接器,节省研发的投入。
第四,当你把数据抽取跟写入目的地,从处理逻辑中独立出来之后,便可以提供更强大的集成能力。因为你可以在消息队列上集成更多的处理逻辑,而无需考虑重新写整个 Job。
相应而言,如果你选择将 MQ 作为所有 JOB 的传输通道,就必须要克服几个缺点:
第一,所有数据的吞吐都经过 MQ,所以 MQ 会成为一个吞吐瓶颈。
第二,因为是一个完全的流式架构,所以针对批量同步,你需要引入一些边界消息来实现一些批量控制。
第三,Kafka 是一个有持久化能力的消息队列,这意味着数据留存是有极限的。比如,你将源端的读到 Kafka Topic 里面,Topic 不会无限的大,有可能会造成数据容量超限,导致一些数据丢失。
第四,当批量同步在中间因为某种原因被打断,无法做续传时,你需要进行重传。在重传过程中,首先要将数据进行清理,如果基于消息队列模式,清理过程就会带来额外的工作。你会面临两个困境:要么清空原有的消息队列,要么你创造新的消息队列。这肯定不如像直接使用一些批量同步框架那样来的直接。
二、一致性语义保证
用户需求
先简单介绍一下用户对于数据同步方面的一些基本要求:
第一种需求,批量同步需要以一种事务性的方式完成同步
无论是同步一整块的历史数据,还是同步某一天的增量,该部分数据到目的地,必须是以事务性的方式出现的。而不是在同步一半时,数据就已经在目的地出现了,这可能会影响下游的一些计算逻辑。
第二种需求,流式数据尽可能快的完成同步
大家都希望越快越好,但相应的,同步的越快,吞吐量有可能因为你的参数设置出现相应的下降,这可能需要有一个权衡。
第三种需求,批量和流式可能共存于一个 JOB
作为一个数据融合产品,当用户在使用DataPipeline时,通常需要将存量数据同步完,后面紧接着去接增量。然后存量与增量之间需要进行一个无缝切换,中间的数据不要丢、也不要多。
**第四种需求,按需灵活选择一致性语义保证
**
DataPipeline 作为一个产品,在客户的环境中,我们无法对客户数据本身的特性提出强制要求。我们不能要求客户数据一定要有主键或者有唯一性的索引。所以在不同场景下,对于一致性语义保证,用户的要求也不一样的:
比如在有主键的场景下,一般我们做到至少有一次就够了,因为在下游如果对方也是一个类似于关系型数据库这样的目的地,其本身就有去重能力,不需要在过程中间做一个强一致的保证。但是,如果其本身没有主键,或者其下游是一个文件系统,如果不在过程中间做额外的一致性保证,就有可能在目的地产生多余的数据,这部分数据对于下游可能会造成非常严重的影响。
数据一致性的链路视角
如果要解决端到端的数据一致性,我们要处理好几个基本环节:
**第一,在源端做一个一致性抽取
**
一致性抽取是什么含义?即当数据从通过数据连接器写入到 MQ 时,和与其对应的 offset 必须是以事务方式进入 MQ 的。
第二,一致性处理
如果大家用过 Flink,Flink 提供了一个端到端一致性处理的能力,它是内部通过 checkpoint 机制,并结合 Sink 端的二阶段提交协议,实现从数据读取处理到写入的一个端到端事务一致性。其它框架,例如 Spark Streaming 和 Kafka Streams 也有各自的机制来实现一致性处理。
第三,一致性写入
在 MQ 模式下,一致性写入,即 consumer offset 跟实际的数据写入目的时,必须是同时持久化的,要么全都成功,要么全部失败。
第四,一致性衔接
在 DataPipeline 的产品应用中,历史数据与实时数据的传输有时需要在一个任务中共同完成。所以产品本身需要有这种一致性衔接的能力,即历史数据和流式数据,必须能够在一个任务中,由程序自动完成它们之间的切换。
Kafka Connect 的一致性保证
Kafka Connect 如何保证数据同步的一致性?就目前版本,Kafka Connect 只能支持端到端的 at least once,核心原因在于,在 Kafka Connect 里面,其 offset 的持久化与数据发送本身是异步完成的。这在很大程度上是为了提高其吞吐量考虑,但相应产生的问题是,如果使用 Kafka Connect,框架本身只能为你提供 at least once 的语义保证。
在该模式下,如果没有通过主键或下游应用进行额外地去重,同步过程当中的数据会在极端情况下出现重复,比如源端发送出一批数据已经成功,但 offset 持久化失败了,这样在任务恢复之后,之前已经发送成功的数据会再次重新发送一批,而下游对这种现象完全是不知情的。目的端也是如此,因为 consumer 的 offset 也是异步持久化,就会到导致有可能数据已经持久化到 Sink,但实际上 consumer offset 还没有推进。这是我们在应用原生的 Kafka Connect 框架里遇到最大的两个问题。
三、DP 的解决之道
二阶段提交协议
DataPipeline 如何解决上述问题?首先,需要用协议的方式保证每一步都做成事务。一旦做成事务,由于每个环节都是解耦的,其最终数据就可以保证一致性。下图为二阶段提交协议的最基础版本,接下来为大家简单介绍一下。
首先,在二阶段提交协议中,对于分布式事务的参与方,在 DataPipeline 的场景下为数据写入与 offset 写入,这是两个独立组件。两者之间的写入操作由 Coordinator 进行协调。第一步是一个 prepare 阶段,每一个参与方会将数据写入到自己的目的地,具体持久化的位置取决于具体应用的实现。
第二步,当 prepare 阶段完成之后,Coordinator 会向所有参与者发出 commit 指令,所有参与者在完成 commit 之后,会发出一个 ack,Coordinator 收到 ack 之后,事务就完成了。如果出现失败,再进行相应的回滚操作。其实在分布式数据库的设计领域中,单纯应用一个二阶段提交协议会出现非常多的问题,例如 Coordinator 本身如果不是高可用的,在过程当中就有可能出现事务不一致的问题。
所以应用二阶段提交协议,最核心的问题是如何保证 Coordinator 高可用。所幸在大家耳熟能详的各种框架里,包括 Kafka 和 Flink,都能够通过分布式一致协议实现 Coordinator 高可用,这也是为什么我们能够使用二阶段提交来保证事务性。
Kafka 事务消息原理
关于 Kafka 事务消息原理,网上有很多资料,在此简单说一下能够达到的效果。Kafka 通过二阶段提交协议,最终实现了两个最核心的功能。
第一,一致性抽取
上文提到数据要被发送进 Kafka,同时 offset 要被持久化到 Kafka,这是对两个不同 Topic 的写入。通过利用 Kafka 事务性消息,我们能够保证 offset 的写入和数据的发送是一个事务。如果 offset 没有持久化成功,下游是看不到这批数据的,这批数据实际上最终会被丢弃掉。
所以对于源端的发送,我们对 Kafka Connect 的 Source Worker 做了一些改造,让其能够提供两种模式,如果用户的数据本身是具备主键去重能力的,就可以继续使用 Kafka Connect 原生的模式。
如果用户需要强一致时,首先要开启一个源端的事务发送功能,这就实现了源端的一致性抽取。其可以保证数据进 Kafka 一端不会出现数据重复。这里有一个限制,即一旦要开启一致性抽取,根据 Kafka 必须要将 ack 设置成 all,这意味着一批数据有多少个副本,其必须能够在所有的副本所在的 broker 都已经应答的情况下,才可以开始下一批数据的写入。尽管会造成一些性能上的损失,但为了实现强一致,你必须要接受这一事实。
**第二,一致性处理
**
事务性消息最早就是为 Kafka Streams 设计和准备的。可以写一段 Kafka Streams 应用,从 Kafka 里读取数据,然后完成转化逻辑,进而将结果再输出回 Kafka。Sink 端再从 Kafka 中消费数据,写入目的地。
数据一致性写入
之前简要谈了一下二阶段提交协议的原理,DataPipeline 实现的方式不算很深奥,基本是业界的一种统一方式。其中最核心的点是,我们将 consumer offset 管理从 Kafka Connect 框架中独立出来,实现事务一致性提交。另外,在 Sink 端封装了一个类似于 Flink 的 TwoPhaseCommitSinkFunction 方式,其定义了 Sink 若要实现一个二阶段提交所必须要实现的一些功能。
DataPipeline 将 Sink Connector 分为两类,一类是 Connector 本身具备了事务能力,比如绝大部分的关系型数据库,只需将 offset 跟数据同时持久化到目的地即可。额外的可能需要有一张 offset 表来记录提交的 offset。还有一类 Sink 不具备事务性能力,类似像 FTP、OSS 这些对象存储,我们需要去实现一个二阶段提交协议,最终才能保证 Sink 端的数据能够达到一致性写入。
数据一致性衔接
关于批量数据与实时数据如何衔接的问题,主要有两个关键点:
第一,当开始进行一个批量数据同步时,以关系型数据库为例,你应该拿到当时一个整体数据的 Snapshot,并在一个事务中同时记录当时对应的日志起始值。以 MySQL 为例,当要获取一个 Binlog 起始偏移量时,需要开启一个 START TRANSACTION WITH CONSISTENT SNAPSHOT,这样才能保证完成全量之后,后期的读取增量日志同步不会产生重复数据。
第二,如果采用增量同步模式,则必须根据实际的数据业务领域,采用一种比较灵活的增量表达式,才能避免读到写到一半的数据。比如在你的数据中,其 ID 是一个完全自增,没有任何重复的可能,此时只需每次单纯的大于上一次同步的最后一条记录即可。
但如果是一个时间戳,无论精度多高,都有可能在数据库产生相同的时间戳,所以安全的做法是每次迭代时,取比当前时间稍微少一点,保证留出一个安全时间,比如五秒甚至一分钟,这样你永远不会读到一些时间戳可能会产生冲突的这部分数据,避免遗漏数据。这是一个小技巧,但如果没有注意,在使用过程中就会产生各种各样的问题。
还有一点是上面提及的,如何能够在一个流式框架实现批量同步的一致性,对于所有的流式框架,需要引入一些边界条件来标志着一次批量同步的开始和结束。DataPipeline 在每次批量发送开始和结束后,会引入一些控制量信号,然后在 Sink端进行相应处理。同样为了保证事务一致性,在 Sink 端处理这种批量同步时,依然要做一些类似于二阶段提交这样的方式,避免在一些极端情况下出现数据不一致的问题。
四、问题和思考
上文介绍的是 DataPipeline 如何基于 Kafka Connect 做事务同步一致性的方案。
DataPipeline 在使用 Kafka Connect 过程中遇到过一些问题,目前大部分已经有一些解决方案,还有少量问题,可能需要未来采用新的方法/框架才能够更好的解决。
第一,反压的问题
Kafka Connect 设计的逻辑是希望实现源端和目的端完全解耦,这种解偶本身是一个很好的特性。但也带来一些问题,源和目的地的 task 完全不知道彼此的存在。刚才我提到 Kafka 有容量限制,不能假定在一个客户环境里面,会给你无限的磁盘来做缓冲。通常我们在客户那边默认 Topic 为 100G 的容量。如果源端读的过快,大量数据会在 Kafka 里堆积,目的端没有及时消费,就有可能出现数据丢失,这是一个非常容易出现的问题。
怎么解决?DataPipeline 作为一个产品,在 Kafka Connect 之上,做了控制层,控制层中有像 Manager 这样的逻辑组件,会监控每一个 Topic 消费的 lag,当达到一定阈值时,会对源端进行限速,保证源和目的地尽可能匹配。
第二,资源隔离
Connect Worker 集群无法对 task 进行资源预留,多个 task 并行运行会相互影响。Worker 的 rest 接口是队列式的,单个集群任务过多会导致启停缓慢。
我们正在考虑利用外部的资源调度框架,例如 K8s 进行 worker 节点管理;以及通过路由规则将不同优先级任务运行在不同的 worker 集群上,实现预分配和共享资源池的灵活配置。
第三,Rebalance
在 2.3 版本以前,Kafka Connect 的 task rebalance 采用 stop-the-world 模式,牵一发动全身。在 2.3 版本之后,已经做了非常大优化,改为了具有粘性的 rebalance。所以如果使用 Kafka Connect,强烈推荐一定要升级到 2.3 以上的版本,也就是目前的最新版本。
五、未来演进路线
基于 MQ 模式的架构,针对大批量数据的同步,实际上还是容易出现性能瓶颈。主要瓶颈是在 MQ 的集群,我们并不能在客户环境里无限优化 Kafka 集群的性能,因为客户提供的硬件资源有限。所以一旦客户给定了硬件资源,Kafka 吞吐的上限就变为一个固定值。所以针对批量数据的同步,可能未来会考虑用内存队列替代 MQ。
同时,会采用更加灵活的 Runtime,主要是为了解决刚才提到的预分配资源池和共享资源池的统一管理问题。
另外,关于数据质量管理,实际上金融类客户对数据质量的一致性要求非常高。所以对于一些对数据质量要求非常高的客户,我们考虑提供一些后校验功能,尤其是针对批量同步。
本文作者:陈肃
原文链接:https://yq.aliyun.com/articles/719759?utm_content=g_1000079637
本文为云栖社区原创内容,未经允许不得转载。
分享到:
相关推荐
为了更好地融合离线与实时数仓的优点,构建流批一体的近实时数仓,可以考虑以下几点: 1. **统一SQL表达**: - 设计一套统一的SQL语法,使得无论是处理流式还是批量数据,都能够使用相同的查询语言进行操作。 2....
2. **流批一体**:由于Delta Lake支持批处理和流处理,结合Structured Streaming,可以构建流批一体的系统,统一处理批量和实时数据。 3. **高可用与容错**:Structured Streaming的容错机制与Delta Lake的事务保证...
4. 数据架构流批一体:打造一个既能支持低延迟又能满足高吞吐要求的融合型计算引擎,实现大规模复杂实时计算能力。数据中台:融入 AI、BI 技术,构建数据中台架构,通过打破数据壁垒,实现全行数据资产统一加工,...
此外,单任务故障恢复(Single Task Failover)和基于Offset的重放、去重机制,确保了不同一致性语义下的数据准确性和系统鲁棒性。 总结来说,Flink Shuffle 3.0的愿景是打造一个高性能、稳定、云原生且自适应的...
为了解决Lambda架构中的计算逻辑问题,**流批融合**的计算引擎如Spark和Flink应运而生,它们支持统一的处理引擎,实现流批一体,提供exactly once语义和事件时间窗口化。 **Kappa架构**则进一步简化了Lambda,仅...
3. 多元异构节点管理:这个部分允许注册和管理不同类型的实时数据融合节点,通过限制配置控制对数据节点的访问和操作规则,而策略配置则规定了语义映射和多节点降级顺序,确保数据融合过程的顺畅。 4. 动态均衡资源...
相比之下,Flink作为Blink的基础,其流批一体化计算引擎和强大的一致性保证,使得Blink在流处理和批处理场景下都表现出色,且与Flink的API和生态系统保持兼容。 总的来说,Blink计算引擎以其一体化的Table API、...
5. **Exactly-once语义**:Flink 1.13版本中,其支持的Exactly-once语义确保了在系统故障或异常情况下的数据一致性,这对于金融、电商等对数据准确性要求极高的领域尤为重要。 6. **Checkpoints与Savepoints**:...
因此,Ganos采用平台即服务(PaaS)的架构思想,结合多模融合和计算下推技术,以提高处理效率和事务一致性。 Ganos的架构设计注重将计算下推到数据层,减少中间件的使用,同时利用云原生特性实现资源的池化和弹性...
Flink CDC 的技术优势包括:无锁读取、并发读取、断点续传、丰富数据源支持、Exactly-once 语义保障数据一致性、增量快照读取、异构数据源融合、端到端一致性和 Flink 生态集成。这些技术优势使得 Flink CDC 成为...
- **高可用性和语义保障**:提供Exactly Once语义保证,确保数据处理的准确性和一致性。 - **云原生部署**:支持Kubernetes等云基础设施,方便用户进行研发部署。 #### 三、流图引擎架构与原理 TuGraph Analytics...
3. **强一致性的状态管理**: Blink支持Exactly-once语义,确保在异常情况下结果的准确性。 4. **灵活的窗口机制**: 提供滑动窗口、会话窗口等多种窗口类型,适应不同业务需求。 5. **强大的SQL支持**: 支持丰富的SQL...
- **批处理与流处理统一:** Flink 通过流处理模拟批处理的方式,能够同时支持实时流处理和批量数据处理,实现了流批一体的数据处理架构。 - **支持多种应用场景:** 除了流处理外,Flink 还支持交互式查询、机器...
这些数据需要经过处理,如地质语义一致性处理、数据格式标准化处理等,转化为适合建模的源数据。特别是以钻孔数据为代表的勘查工程资料,需要详细记录钻孔编号、坐标、深度、倾角、方位角、岩性等信息。矿井物探中的...
这说明智慧客服平台致力于实现多渠道融合,确保用户体验的一致性。此外,智能路由技术在客服平台中起着核心作用,它通过智能识别用户、智能路由引擎,将最佳的用户分配给最佳的客服,以提高响应效率和转化率。 在多...
- **深度集成**:具备与一体化信息平台其他组成部分的深度集成能力。 - **技术服务**:提供一致、全面、快捷的技术服务。 #### 二、平台建设历程 - **启动时间**:SG-UAP平台自2012年启动建设。 - **版本迭代**...
系统特点包括数据的一致性、语义理解和高效检索,这在消防档案管理中至关重要。 最后,论文提出了一个深入的异构数据集成方法,它以数据信息的语义为基础,将系统间的异构数据集成建立在领域属性之上,而在系统内部...
案例表明,利用CAD/CAM技术能够克服传统模具设计和制造中的不足,实现模具设计的高精度、一致性、互换性以及尺寸的准确性。最终,模具CAD/CAM的一体化有效提高了企业在市场上的竞争能力。 参考文献部分提供了一系列...