无论在 OLAP 还是 OLTP 领域,Join 都是业务常会涉及到且优化规则比较复杂的 SQL 语句。对于离线计算而言,经过数据库领域多年的积累,Join 语义以及实现已经十分成熟,然而对于近年来刚兴起的 Streaming SQL 来说 Join 却处于刚起步的状态。
其中最为关键的问题在于 Join 的实现依赖于缓存整个数据集,而 Streaming SQL Join 的对象却是无限的数据流,内存压力和计算效率在长期运行来说都是不可避免的问题。下文将结合 SQL 的发展解析 Flink SQL 是如何解决这些问题并实现两个数据流的 Join。
离线 Batch SQL Join 的实现
---------------------
传统的离线 Batch SQL (面向有界数据集的 SQL)有三种基础的实现方式,分别是 Nested-loop Join、Sort-Merge Join 和 Hash Join。
* Nested-loop Join 最为简单直接,将两个数据集加载到内存,并用内嵌遍历的方式来逐个比较两个数据集内的元素是否符合 Join 条件。Nested-loop Join 虽然时间效率以及空间效率都是最低的,但胜在比较灵活适用范围广,因此其变体 BNL 常被传统数据库用作为 Join 的默认基础选项。
* Sort-Merge Join 顾名思义,分为两个 Sort 和 Merge 阶段。首先将两个数据集进行分别排序,然后对两个有序数据集分别进行遍历和匹配,类似于归并排序的合并。值得注意的是,Sort-Merge 只适用于 Equi-Join(Join 条件均使用等于作为比较算子)。Sort-Merge Join 要求对两个数据集进行排序,成本很高,通常作为输入本就是有序数据集的情况下的优化方案。
* Hash Join 同样分为两个阶段,首先将一个数据集转换为 Hash Table,然后遍历另外一个数据集元素并与 Hash Table 内的元素进行匹配。第一阶段和第一个数据集分别称为 build 阶段和 build table,第二个阶段和第二个数据集分别称为 probe 阶段和 probe table。Hash Join 效率较高但对空间要求较大,通常是作为 Join 其中一个表为适合放入内存的小表的情况下的优化方案。和 Sort-Merge Join 类似,Hash Join 也只适用于 Equi-Join。
实时 Streaming SQL Join
---------------------
相对于离线的 Join,实时 Streaming SQL(面向无界数据集的 SQL)无法缓存所有数据,因此 Sort-Merge Join 要求的对数据集进行排序基本是无法做到的,而 Nested-loop Join 和 Hash Join 经过一定的改良则可以满足实时 SQL 的要求。
我们通过例子来看基本的 Nested Join 在实时 Streaming SQL 的基础实现(案例及图来自 Piotr Nowojski 在 Flink Forward San Francisco 的分享\[2\])。
![img1.join-in-continuous-query-1.png](https://ucc.alicdn.com/pic/developer-ecology/4ce65c36482b46a6bc816208a87c9d04.png)
图1. Join-in-continuous-query-1
Table A 有 1、42 两个元素,Table B 有 42 一个元素,所以此时的 Join 结果会输出 42。
![img2.join-in-continuous-query-2.png](https://ucc.alicdn.com/pic/developer-ecology/c288b65ca37c4f20b3d25ea5ebff6aed.png)
图2. Join-in-continuous-query-2
接着 Table B 依次接受到三个新的元素,分别是 7、3、1。因为 1 匹配到 Table A 的元素,因此结果表再输出一个元素 1。
![img3.join-in-continuous-query-3.png](https://ucc.alicdn.com/pic/developer-ecology/2e05aac69f6546a795bc51fbc4b76159.png)
图3. Join-in-continuous-query-3
随后 Table A 出现新的输入 2、3、6,3 匹配到 Table B 的元素,因此再输出 3 到结果表。
可以看到在 Nested-Loop Join 中我们需要保存两个输入表的内容,而随着时间的增长 Table A 和 Table B 需要保存的历史数据无止境地增长,导致很不合理的内存磁盘资源占用,而且单个元素的匹配效率也会越来越低。类似的问题也存在于 Hash Join 中。
那么有没有可能设置一个缓存剔除策略,将不必要的历史数据及时清理呢?答案是肯定的,关键在于缓存剔除策略如何实现,这也是 Flink SQL 提供的三种 Join 的主要区别。
Flink SQL 的 Join
----------------
* **Regular Join**
Regular Join 是最为基础的没有缓存剔除策略的 Join。Regular Join 中两个表的输入和更新都会对全局可见,影响之后所有的 Join 结果。举例,在一个如下的 Join 查询里,Orders 表的新纪录会和 Product 表所有历史纪录以及未来的纪录进行匹配。
```
SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id
```
![](data:image/gif;base64,R0lGODlhAQABAPABAP///wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw==)![](data:image/gif;base64,R0lGODlhAQABAPABAP///wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw== "点击并拖拽以移动")
因为历史数据不会被清理,所以 Regular Join 允许对输入表进行任意种类的更新操作(insert、update、delete)。然而因为资源问题 Regular Join 通常是不可持续的,一般只用做有界数据流的 Join。
* **Time-Windowed Join**
Time-Windowed Join 利用窗口给两个输入表设定一个 Join 的时间界限,超出时间范围的数据则对 JOIN 不可见并可以被清理掉。值得注意的是,这里涉及到的一个问题是时间的语义,时间可以指计算发生的系统时间(即 Processing Time),也可以指从数据本身的时间字段提取的 Event Time。如果是 Processing Time,Flink 根据系统时间自动划分 Join 的时间窗口并定时清理数据;如果是 Event Time,Flink 分配 Event Time 窗口并依据 Watermark 来清理数据。
以更常用的 Event Time Windowed Join 为例,一个将 Orders 订单表和 Shipments 运输单表依据订单时间和运输时间 Join 的查询如下:
```
SELECT *
FROM
Orders o,
Shipments s
WHERE
o.id = s.orderId AND
s.shiptime BETWEEN o.ordertime AND o.ordertime + INTERVAL '4' HOUR
```
![](data:image/gif;base64,R0lGODlhAQABAPABAP///wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw==)![](data:image/gif;base64,R0lGODlhAQABAPABAP///wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw== "点击并拖拽以移动")
这个查询会为 Orders 表设置了 o.ordertime > s.shiptime- INTERVAL ‘4’ HOUR 的时间下界(图4)。
![img4.time-window-orders-lower-bound.png](https://ucc.alicdn.com/pic/developer-ecology/dc4c086da40c450d950e175afc04ce3a.png)
图4. Time-Windowed Join 的时间下界 - Orders 表
并为 Shipmenets 表设置了 s.shiptime >= o.ordertime 的时间下界(图5)。
![img5.time-window-shipment-lower-bound.png](https://ucc.alicdn.com/pic/developer-ecology/31c0210141d24d3d9c6f645301f36cab.png)
图5. Time-Windowed Join 的时间下界 - Shipment 表
因此两个输入表都只需要缓存在时间下界以上的数据,将空间占用维持在合理的范围。
不过虽然底层实现上没有问题,但如何通过 SQL 语法定义时间仍是难点。尽管在实时计算领域 Event Time、Processing Time、Watermark 这些概念已经成为业界共识,但在 SQL 领域对时间数据类型的支持仍比较弱\[4\]。因此,定义 Watermark 和时间语义都需要通过编程 API 的方式完成,比如从 DataStream 转换至 Table ,不能单纯靠 SQL 完成。这方面的支持 Flink 社区计划通过拓展 SQL 方言来完成,感兴趣的读者可以通过 FLIP-66\[7\] 来追踪进度。
* **Temporal Table Join**
虽然 Timed-Windowed Join 解决了资源问题,但也限制了使用场景: Join 两个输入流都必须有时间下界,超过之后则不可访问。这对于很多 Join 维表的业务来说是不适用的,因为很多情况下维表并没有时间界限。针对这个问题,Flink 提供了 Temporal Table Join 来满足用户需求。
Temporal Table Join 类似于 Hash Join,将输入分为 Build Table 和 Probe Table。前者一般是纬度表的 changelog,后者一般是业务数据流,典型情况下后者的数据量应该远大于前者。在 Temporal Table Join 中,Build Table 是一个基于 append-only 数据流的带时间版本的视图,所以又称为 Temporal Table。Temporal Table 要求定义一个主键和用于版本化的字段(通常就是 Event Time 时间字段),以反映记录在不同时间的内容。
比如典型的一个例子是对商业订单金额进行汇率转换。假设有一个 Orders 流记录订单金额,需要和 RatesHistory 汇率流进行 Join。RatesHistory 代表不同货币转为日元的汇率,每当汇率有变化时就会有一条更新记录。两个表在某一时间节点内容如下:
![img6.temporal-table-join-example.png](https://ucc.alicdn.com/pic/developer-ecology/faacdeeffe7a428491cd89653e2be8c5.png)
图6. Temporal Table Join Example\]
我们将 RatesHistory 注册为一个名为 Rates 的 Temporal Table,设定主键为 currency,版本字段为 time。
![img7.temporal-table-registration.png](https://ucc.alicdn.com/pic/developer-ecology/df9b8e27af024ea28acf2c4f03d5e0ed.png)
图7. Temporal Table Registration\]
此后给 Rates 指定时间版本,Rates 则会基于 RatesHistory 来计算符合时间版本的汇率转换内容。
![img8.temporal-table-content.png](https://ucc.alicdn.com/pic/developer-ecology/e05d691334fa43f6ad2444eb00118d5b.png)
图8. Temporal Table Content\]
在 Rates 的帮助下,我们可以将业务逻辑用以下的查询来表达:
```
SELECT
o.amount * r.rate
FROM
Orders o,
LATERAL Table(Rates(o.time)) r
WHERE
o.currency = r.currency
```
![](data:image/gif;base64,R0lGODlhAQABAPABAP///wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw==)![](data:image/gif;base64,R0lGODlhAQABAPABAP///wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw== "点击并拖拽以移动")
值得注意的是,不同于在 Regular Join 和 Time-Windowed Join 中两个表是平等的,任意一个表的新记录都可以与另一表的历史记录进行匹配,在 Temporal Table Join 中,Temoparal Table 的更新对另一表在该时间节点以前的记录是不可见的。这意味着我们只需要保存 Build Side 的记录直到 Watermark 超过记录的版本字段。因为 Probe Side 的输入理论上不会再有早于 Watermark 的记录,这些版本的数据可以安全地被清理掉。
总结
--
实时领域 Streaming SQL 中的 Join 与离线 Batch SQL 中的 Join 最大不同点在于无法缓存完整数据集,而是要给缓存设定基于时间的清理条件以限制 Join 涉及的数据范围。根据清理策略的不同,Flink SQL 分别提供了 Regular Join、Time-Windowed Join 和 Temporal Table Join 来应对不同业务场景。
另外,尽管在实时计算领域 Join 可以灵活地用底层编程 API 来实现,但在 Streaming SQL 中 Join 的发展仍处于比较初级的阶段,其中关键点在于如何将时间属性合适地融入 SQL 中,这点 ISO SQL 委员会制定的 SQL 标准并没有给出完整的答案。或者从另外一个角度来讲,作为 Streaming SQL 最早的开拓者之一,Flink 社区很适合探索出一套合理的 SQL 语法反过来贡献给 ISO。
[原文链接](https://yq.aliyun.com/articles/739792?utm_content=g_1000094977)
本文为阿里云内容,未经允许不得转载。
分享到:
相关推荐
Blink项目是基于Apache Flink进行大量改进与增强的结果,旨在提供更加强大、稳定且易用的数据流处理能力。随着Flink SQL的发展,它已经逐渐成为处理大规模流式数据的重要工具之一。 #### Alibaba Blink及Flink SQL...
主要实现了流与维表的join”揭示了这个项目的核心内容,即在Flink开源框架上扩展实时SQL功能,特别是流处理与维表(通常用于存储静态参考数据)的连接操作。Flink作为一个强大的流处理和批处理框架,提供了一种高效...
Flink SQL 是 Flink 提供的一种用于数据处理的高级接口,它允许开发者用 SQL 语法来操作数据流,极大地降低了实时计算的门槛。本教程基于 2022 年的最新版本 Flink 1.14.3,将帮助你深入理解和应用 Flink SQL。 一...
Flink SQL的应用场景非常广泛,包括实时数据处理、流计算、批处理、大数据分析等。Flink SQL能够满足各个行业的需求,例如电商、金融、医疗等。 阿里巴巴Blink团队的贡献 阿里巴巴Blink团队是Flink的主要贡献者之...
Flink SQL是Flink为了简化数据处理而提供的SQL接口,允许开发者使用熟悉的SQL语法来操作数据流。 压缩包内的文件名称列表揭示了包含的表格数据: 1. `orders.tbl`: 这可能代表订单数据,通常包含关于订单编号、...
- **SQL查询**:用户可以使用标准的SQL语法来定义数据源,处理数据流,如JOIN,GROUP BY,窗口操作等,并可以提交作业到Flink集群执行。 4. **Flink SQL的特点** - **SQL支持**:Flink SQL支持标准的SQL语法,...
1. 数据接入:Web平台通常通过HTTP API或者Kafka、RabbitMQ等消息队列接收实时数据流,Flink SQL可以轻松地从这些数据源读取数据,实现数据的实时摄入。 2. 实时处理:通过Flink SQL,Web平台可以实现实时数据清洗、...
在本文中,我们将深入探讨Apache Flink的SQL支持,即FlinkSQL的入门概述。Flink是一个批流统一的数据处理框架,它的Table API和SQL提供了一种统一的方式来处理批处理和流处理任务。Table API是Java和Scala语言内置的...
动态表视图(Dynamic Tables)允许 Flink SQL 实时响应数据流的变化,提供实时更新的结果。 ### 八、Flink SQL 在实际项目中的应用 1. **实时监控**:例如,实时分析网站日志,统计在线用户、点击率等。 2. **异常...
Flink是一个流行的开源流处理框架,其SQL支持使得数据处理更加易用且强大。 在Flink 1.9之前,架构并未实现统一,对流式和批处理有不同API、翻译路径和代码实现。这导致了开发者需要对两种模式分别处理,增加了开发...
Apache Flink是一个分布式的数据流处理框架,它基于DataStream API和DataSet API,支持事件驱动的实时计算和批处理。Flink SQL是Flink生态系统的一部分,提供了SQL接口,使得非Java/Scala背景的开发者也能轻松地进行...
`FlinkJDBCInputFormat`通常用于全量数据加载,而`DebeziumDeserializationSchema`则更适合处理持续的数据流。对于自定义反序列化,可以继承`DeserializationSchema`接口,定义自己的反序列化逻辑,以适应特定的数据...
大数据之Flink教程-Table API和SQL Flink Table API和SQL是Flink的一部分,提供了批流统一的处理框架。Table API是一套内嵌在...通过学习和使用Flink Table API和SQL,我们可以更好地处理大数据和实现数据分析和挖掘。
总的来说,阿里云流计算Flink SQL通过StreamCompute 2.0平台提供了一种高效、易用的实时数据分析解决方案,不仅简化了流处理的复杂性,也提升了数据分析的时效性和准确性。这使得企业和开发者能够更好地应对大数据...
在构建实时数仓的过程中,Flink on Hive 是一种常见的解决方案,它允许用户通过Apache Flink进行批处理和流处理,实现流批一体的实时数据处理。这篇文章主要探讨了如何使用Flink与Hive进行集成,以及如何利用Flink对...
Flink SQL是Apache Flink对SQL标准的实现,它允许用户使用SQL语法来处理实时数据流。这为熟悉SQL的开发人员提供了一个友好的接口,无需学习新的API就能进行流处理。 1. **表和视图**:在Flink中,Table和View是数据...
Apache Flink SQL Cookbook 是一个非常宝贵的资源,专为开发者和数据工程师设计,旨在提供一系列实用的示例、模式和用例,以展示如何利用Apache Flink的SQL接口进行流处理和批处理任务。Flink是一个强大的开源大数据...
Flink支持跨流连接操作,如join,用于合并来自不同数据流的信息。 5.2 事件时间处理 Flink提供了强大的事件时间处理能力,可以精确地处理乱序事件,确保结果的正确性。 5.3 容器化部署 Flink可以轻松地在Docker或...
例如,使用Map函数进行简单的值变换,使用Filter进行数据过滤,使用KeyBy进行分组,使用Window进行时间窗口聚合,以及Join操作进行不同数据流的合并。此外,Flink 还支持用户自定义函数(UDF)和用户自定义算子...