有状态的计算作为容错以及数据一致性的保证,是当今实时计算必不可少的特性之一,流行的实时计算引擎包括 Google Dataflow、Flink、Spark (Structure) Streaming、Kafka Streams 都分别提供对内置 State 的支持。State 的引入使得实时应用可以不依赖外部数据库来存储元数据及中间数据,部分情况下甚至可以直接用 State 存储结果数据,这让业界不禁思考: State 和 Database 是何种关系?有没有可能用 State 来代替数据库呢?
在这个课题上,Flink 社区是比较早就开始探索的。总体来说,Flink 社区的努力可以分为两条线: 一是在作业运行时通过作业查询接口访问 State 的能力,即 QueryableState;二是通过 State 的离线 dump 文件(Savepoint)来离线查询和修改 State 的能力,即即将引入的 Savepoint Processor API。
QueryableState
--------------
在 2017 年发布的 Flink 1.2 版本,Flink 引入了 QueryableState 的特性以允许用户通过特定的 client 查询作业 State 的内容 \[1\],这意味着 Flink 应用可以在完全不依赖 State 存储介质以外的外部存储的情况下提供实时访问计算结果的能力。
![数据库.jpg](https://ucc.alicdn.com/pic/developer-ecology/71e06755e9154a899da79595cd8e6104.jpg)
只通过 Queryable State 提供实时数据访问
然而,QueryableState 虽然设想上比较理想化,但由于依赖底层架构的改动较多且功能也比较受限,它一直处于 Beta 版本并不能用于生产环境。针对这个问题,在前段时间腾讯的工程师杨华提出 QueryableState 的改进计划 \[2\]。在邮件列表中,社区就 QueryableState 是否可以用于代替数据库作了讨论并出现了不同的观点。笔者结合个人见解将 State as Database 的主要优缺点整理如下。
**优点:**
* **更低的数据延迟**。一般情况下 Flink 应用的计算结果需要同步到外部的数据库,比如定时触发输出窗口计算结果,而这种同步通常是定时的会带来一定的延迟,导致计算是实时的而查询却不是实时的尴尬局面,而直接 State 则可以避免这个问题。
* **更强的数据一致性保证**。根据外部存储的特性不同,Flink Connector 或者自定义的 SinkFunction 提供的一致性保障也有所差别。比如对于不支持多行事务的 HBase,Flink 只能通过业务逻辑的幂等性来保障 Exactly-Once 投递。相比之下 State 则有妥妥的 Exactly-Once 投递保证。
* **节省资源**。因为减少了同步数据到外部存储的需要,我们可以节省序列化和网络传输的成本,另外当然还可以节省数据库成本。
**缺点:**
* **SLA 保障不足**。数据库技术已经非常成熟,在可用性、容错性和运维上都很多的积累,在这点上 State 还相当于是处于原始人时期。另外从定位上来看,Flink 作业有版本迭代维护或者遇到错误自动重启带来的 down time,并不能达到数据库在数据访问上的高可用性。
* **可能导致作业的不稳定**。未经过考虑的 Ad-hoc Query 可能会要求扫描并返回夸张量级的数据,这会系统带来很大的负荷,很可能影响作业的正常执行。即使是合理的 Query,在并发数较多的情况下也可能影响作业的执行效率。
* **存储数据量不能太大**。State 运行时主要存储在 TaskManager 本地内存和磁盘,State 过大会造成 TaskManager OOM 或者磁盘空间不足。另外 State 大意味着 checkpoint 大,导致 checkpoint 可能会超时并显著延长作业恢复时长。
* **只支持最基础的查询**。State 只能进行最简单的数据结构查询,不能像关系型数据库一样提供函数等计算能力,也不支持谓词下推等优化技术。
* **只可以读取,不能修改**。State 在运行时只可以被作业本身修改,如果实在要修改 State 只能通过下文的 Savepoint Processor API 来实现。
总体来说,目前 State 代替数据库的缺点还是远多于其优点,不过对于某些对数据可用性要求不高的作业来说,使用 State 作为数据库还是完全合理的。由于定位上的不同,Flink State 在短时间内很难看到可以完全替代数据库的可能性,但在数据访问特性上 State 往数据库方向发展是无需质疑的。
Savepoint Processor API
-----------------------
Savepoint Processor API 是社区最近提出的一个新特性(见 FLIP-42 \[3\]),用于离线对 State 的 dump 文件 Savepoint 进行分析、修改或者直接根据数据构建出一个初始的 Savepoint。Savepoint Processor API 属于 Flink State Evolution 的 State Management。如果说 QueryableState 是 DSL 的话,Flink State Evolution 就是 DML,而 Savepoint Processor API 就是 DML 中最为重要的部分。
Savepoint Processor API 的前身是第三方的 Bravo 项目 \[4\],主要思路提供 Savepoint 和 DataSet 相互转换的能力,典型应用是 Savepoint 读取成 DataSet,在 DataSet 上进行修改,然后再写为一个新的 Savepoint。这适合用于以下的场景:
* 分析作业 State 以研究其模式和规律
* 排查问题或者审计
* 为新的应用构建的初始 State
* 修改 Savepoint,比如:
* 改变作业最大并行度
* 进行巨大的 Schema 改动
* 修正有问题的 State
Savepoint 作为 State 的 dump 文件,通过 Savepoint Processor API 可以暴露数据查询和修改功能,类似于一个离线的数据库,但 State 的概念和典型关系型数据的概念还是有很多不同,FLIP-43 也对这些差异进行了类比和总结。
首先 Savepoint 是多个 operator 的 state 的物理存储集合,不同 operator 的 state 是独立的,这类似于数据库下不同 namespace 之间的 table。我们可以得到 Savepoint 对应数据库,单个 operator 对应 Namespace。
Database
Savepoint
Namespace
Uid
Table
State
但就 table 而言,其在 Savepoint 里对应的概念根据 State 类型的不同而有所差别。State 有 Operator State、Keyed State 和 Broadcast State 三种,其中 Operator State 和 Broadcast State 属于 non-partitioned state,即没有按 key 分区的 state,而相反地 Keyed State 则属于 partitioned state。对于 non-partitioned state 来说,state 是一个 table,state 的每个元素即是 table 里的一行;而对于 partitioned state 来说,同一个 operator 下的所有 state 对应一个 table。这个 table 像是 HBase 一样有个 row key,然后每个具体的 state 对应 table 里的一个 column。
举个例子,假设有一个游戏玩家得分和在线时长的数据流,我们需要用 Keyed State 来记录玩家所在组的分数和游戏时长,用 Operator State 记录玩家的总得分和总时长。
在一段时间内数据流的输入如下:
user\_id
user\_name
user\_group
score
1001
Paul
A
5,000
1002
Charlotte
A
3,600
1003
Kate
C
2,000
1004
Robert
B
3,900
user\_id
user\_name
user\_group
time
1001
Paul
A
1,800
1002
Charlotte
A
1,200
1003
Kate
C
600
1004
Robert
B
2,000
用 Keyed State ,我们分别注册 group\_score 和 group\_time 两个 MapState 表示组总得分和组总时长,并根据 user\_group keyby 数据流之后将两个指标的累积值更新到 State 里,得到的表如下:
user\_group
group\_score
group\_time
A
8,600
3,000
C
2,00
600
B
3,900
2,000
相对地,假如用 Operator State 来记录总得分和总时长(并行度设为 1),我们注册 total\_score 和 total\_time 两个 State,得到的表有两个:
total\_score |
\------- |
14,500 |
total\_time
5,600
至此 Savepoint 和 Database 的对应关系应该是比较清晰明了的。而对于 Savepoint 来说还有不同的 StateBackend 来决定 State 具体如何持续化,这显然对应的是数据库的存储引擎。在 MySQL 中,我们可以通过简单的一行命令 ALTER TABLE xxx ENGINE = InnoDB; 来改变存储引擎,在背后 MySQL 会自动完成繁琐的格式转换工作。而对于 Savepoint 来说,由于 StateBackend 各自的存储格式不兼容,目前尚不能方便地切换 StateBackend。为此,社区在不久前创建 FLIP-41 \[5\] 来进一步完善 Savepoint 的可操作性。
总结
--
State as Database 是实时计算发展的大趋势,它并不是要代替数据库的使用,而是借鉴数据库领域的经验拓展 State 接口使其操作方式更接近我们熟悉的数据库。对于 Flink 而言,State 的外部使用可以分为在线的实时访问和离线的访问和修改,分别将由 Queryable State 和 Savepoint Processor API 两个特性支持。
---
[原文链接](https://yq.aliyun.com/articles/726754?utm_content=g_1000088921)
本文为云栖社区原创内容,未经允许不得转载。
分享到:
相关推荐
Flink State 主要有五种类型:Value State、List State、Reducing State、Aggregating State 和 Folding State。 1. Value State:用于存储单个值的状态信息,例如计数器、累加器等。 2. List State:用于存储一个...
Flink State 最佳实践 Flink State 是 Apache Flink 中的一种状态管理机制,用于管理流式计算中的状态信息。Flink State 的正确使用是确保 Flink 应用程序正确运行的关键。 Operator State 和 Keyed State 是 ...
11、Flink 源码解析 —— Flink JobManager 有什么作用? 12、Flink 源码解析 —— Flink TaskManager 有什么作用? 13、Flink 源码解析 —— JobManager 处理 SubmitJob 的过程 14、Flink 源码解析 —— ...
"Flink State 优化与 Remote State 探索" Flink state 优化是指对 Flink 中的状态(State)进行优化,以提高 Flink 任务的性能和可靠性。状态是 Flink 任务中的一种特殊数据结构,用于存储任务执行过程中的中间结果...
在实际应用中,Flink 往往需要将处理后的流数据写入数据库,以供进一步分析或实时查询。然而,如果直接使用默认的单条写入方式,可能会导致数据库压力过大,写入速度慢,进而引发反压问题。因此,本文将详细介绍如何...
这通常包括Flink运行时、相关的连接器和可能的配置文件。 标签"flink flink-cdc"进一步确认了这个资源包与Flink和其CDC功能有关。Flink-CDC允许用户从支持的数据库(如MySQL)中实时摄取变更事件,从而构建实时数据...
赠送jar包:flink-queryable-state-client-java-1.10.0.jar; 赠送原API文档:flink-queryable-state-client-java-1.10.0-javadoc.jar; 赠送源代码:flink-queryable-state-client-java-1.10.0-sources.jar; 赠送...
在大数据处理领域,Apache Flink 是一款强大的流处理框架,其Change Data Capture (CDC)功能使得实时监控并处理数据库变化成为可能。在这个场景下,我们将深入探讨如何使用Flink CDC来监测MySQL数据库,并且实现...
Flink State体系剖析以及案例实践,State按照是否有Key划分KeyedState和OperatorState两种。按照数据结构不同,flink定义了多种state,分别应用于不同的场景,具体实现如下:ValueState、ListState、MapState、...
在压缩包文件"**FlinkCDC-PG-main**"中,可能包含了示例代码、配置文件、README文档等资源,用于指导用户如何设置和运行这个特定的Flink CDC监控PG数据库的案例。通过阅读和运行这些示例,你可以更深入地了解Flink ...
本主题将探讨如何自定义 Flink `SourceFunction` 定时从数据库中读取数据,这一方法对于实时监控、数据分析等场景尤其有用。本文将深入讲解实现思路并提供 Java 代码示例,确保对各种关系型数据库通用。 1. **Flink...
Flink1.14.4自定义flink-connector-jdbc连接SQLServer和SAP数据库
4. **批量写入数据库**:为了将处理后的数据批量写入数据库,我们可以使用 Flink 的 JDBC 输出格式。首先,我们需要配置数据库连接参数,然后创建一个 `JDBCOutputFormat` 并设置 SQL 插入语句。 ```java ...
flink state manage 原理, ppt,非常好的资料:State Management in Apache Flink® Consistent Stateful Distributed Stream Processing
如果没有状态的管理,是不会有累计的效果的,所以Flink里面还有state的概念。 在Flink中,我们可以使用托管状态,由Flink框架管理的状态,我们通常使用的就是这种。托管状态有五种类型:ValueState、ListState、...
赠送jar包:flink-queryable-state-client-java-1.13.2.jar; 赠送原API文档:flink-queryable-state-client-java-1.13.2-javadoc.jar; 赠送源代码:flink-queryable-state-client-java-1.13.2-sources.jar; 赠送...
Apache Flink 进阶(十):Flink State 最佳实践 141 Apache Flink 进阶(十一):TensorFlow On Flink 149 Apache Flink 进阶(十二):深度探索 Flink SQL 159 Apache Flink 进阶(十三):Python API 应用实践 ...
标题中的“flink-sql集成rabbitmq”指的是将Apache Flink的数据流处理能力与RabbitMQ消息队列系统相结合,实现数据的实时处理和传输。Flink是一个开源的流处理框架,提供低延迟、高吞吐量的数据处理,而RabbitMQ是一...
赠送jar包:flink-queryable-state-client-java-1.14.3.jar; 赠送原API文档:flink-queryable-state-client-java-1.14.3-javadoc.jar; 赠送源代码:flink-queryable-state-client-java-1.14.3-sources.jar; 赠送...
标题中的“flink cdc postgresql”指的是一项技术整合,即使用Apache Flink的数据变更数据捕获(CDC,Change Data Capture)功能来实时同步PostgreSQL数据库的数据变化。这项技术在大数据实时处理和流计算场景中非常...