一.背景
有时候我们需要过滤数据,有些中间数据是不需要的,比如场景:
binlog 数据更新的时候,我们仅仅需要最新数据。会根据ID 分组,然后取version 最大的一条,存储
二.简单实例
@Data @ToString public class Order { // 主键id private Integer id; // 版本 private Integer version; private Timestamp mdTime; public Order(int id, Integer version) { this.id = id; this.version = version; this.mdTime = new Timestamp(System.currentTimeMillis()); } public Order() { } }
public class OrderSource implements SourceFunction<Order> { Random random = new Random(); @Override public void run(SourceContext<Order> ctx) throws Exception { while (true) { TimeUnit.MILLISECONDS.sleep(100); // 为了区分,我们简单生0~2的id, 和版本0~99 int id = random.nextInt(3); Order o = new Order(id, random.nextInt(100)); ctx.collect(o); } } @Override public void cancel() { } }
public class ReduceApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<Order> userInfoDataStream = env.addSource(new OrderSource()); DataStream<Order> timedData = userInfoDataStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Order>() { @Override public long extractAscendingTimestamp(Order element) { return element.getMdTime().getTime(); } }); SingleOutputStreamOperator<Order> reduce = timedData .keyBy("id") .timeWindow(Time.seconds(10), Time.seconds(5)) .reduce((ReduceFunction<Order>) (v1, v2) -> v1.getVersion() >= v2.getVersion() ? v1 : v2); reduce.print(); env.execute("test"); } }
结果:
Order(id=2, version=97, mdTime=2019-03-11 17:39:34.052)
Order(id=0, version=99, mdTime=2019-03-11 17:39:32.913)
Order(id=1, version=96, mdTime=2019-03-11 17:39:34.155)
Order(id=2, version=97, mdTime=2019-03-11 17:39:34.052)
Order(id=1, version=96, mdTime=2019-03-11 17:39:34.155)
Order(id=0, version=99, mdTime=2019-03-11 17:39:32.913)
这个会对同一个窗口做过滤,比如同步到另一个mysql,hdfs,就能减少数据量
相关推荐
1. DataStream API:针对无界数据流的处理,提供了丰富的操作符,如 map、filter、reduce 等,用于构建数据处理逻辑。 2. Batch API(DataSet API):针对有界数据集的处理,适合离线批处理任务。 3. Table & SQL ...
- **Transformation**: 包括 Map、FlatMap、Filter、KeyBy、Reduce 等操作,用于对数据进行处理。 - **Stateful Processing**: 支持状态计算,允许程序记住之前处理过的数据。 - **Windowing**: 提供时间窗口和滑动...
2. 处理算子:包括 Map、Filter、Reduce、Join 等基础操作,以及 Key-Value、Window、State 等高级操作。 3. 窗口机制:Flink 提供了基于时间、滑动、会话等多种窗口操作,适应不同实时场景的需求。 4. 批处理:...
4. **运算符与窗口**:Flink提供了一系列的运算符,如Map、Filter、KeyBy、Reduce等,以及时间窗口和滑动窗口等窗口操作,用于处理流数据。窗口操作允许开发者根据时间或数据量对数据流进行分组,进行聚合或计算。 ...
3. 应用map、flatMap、keyBy和reduce操作进行数据处理。 4. 将结果写入输出(如Console或File)。 五、总结 Apache Flink 1.8.0作为实时计算的重要平台,通过不断优化和创新,为大数据处理提供了高效、可靠和灵活...
- **DataStream API和DataSet API**: 分别用于处理无界和有界数据,提供丰富的算子集如map、filter、reduce等,以及窗口操作,满足不同场景需求。 - **JobManager和TaskManager**: JobManager负责任务调度和资源管理...
Flink是一个开源的流处理框架,它支持事件时间和窗口计算,提供了丰富的数据处理算子,如`map`、`filter`、`reduce`等,用于对数据流进行转换和聚合。`map`是Flink中最基本的操作之一,它接受一个函数,将输入数据流...
它提供了丰富的算子,如map、filter、reduce等,以及窗口操作,用于处理实时数据流。 - **Event Time**:Flink 支持事件时间处理,即按照数据发生的时间顺序进行处理,这在处理乱序事件时非常关键。 - **Stateful ...
接下来是Transformation,它定义了数据流的操作,如map、filter、reduce等,这些操作在数据流上进行,形成新的数据流。最后,Sink将处理后的数据输出到指定的目标,如文件、数据库或消息队列。 总结来说,Flink的流...
在 1.5.4 版本中,DataStream API 提供了丰富的算子,如 Map、Filter、Reduce、Join 等,以及窗口操作,支持时间驱动和事件驱动的窗口计算。源码中可以看到,这些算子的实现细节和如何与其他组件交互。 3. **State...
Flink 提供丰富的操作符,如 map、filter、reduce、keyBy 和 window,用于对数据流进行各种业务逻辑计算。KeyBy 操作用于将数据流按照特定字段分组,以便在每个分组内进行聚合操作。Window 可以定义时间或事件触发...
### Apache Flink DataStream API 教程:深入理解流处理技术 #### 一、概述 在本次教程中,我们将深入探讨 Apache Flink 的 DataStream API,这是一个强大的工具,用于实现高性能的大规模数据流处理任务。Flink 的...
《Flink学习:深入探索Java实现的流处理框架》 Apache Flink是一个开源的流处理框架,它在大数据处理领域有着广泛的应用。Flink的设计目标是支持低延迟、高吞吐量的数据处理,并且能够同时处理实时数据流和批处理...
Stream API 是Flink处理无界数据流的核心接口,包括各种算子如map、filter、reduce等,以及窗口操作,如时间窗口、滑动窗口和Session窗口。在1.6.4版本中,DataStream API 已经相当成熟,能够支持复杂事件处理和...
- **Transformation**:包括Map、FlatMap、Filter、KeyBy、Reduce、Join等操作,用于对数据进行转换和处理。 - **Sink Functions**:定义如何将处理后的数据写入目标系统,如HDFS、Cassandra或Kafka。 3. **窗口...
4. **转换与操作**:详细介绍各种转换操作,如map、filter、keyBy、reduce等。 5. **数据 sink**:介绍如何将处理结果写入到不同的数据存储,如HDFS、Cassandra等。 **三、Flink实例解析** 实例部分通过实际案例...
3. **数据源与数据转换**:介绍如何定义数据源,如Kafka、Socket或自定义数据源,以及如何通过各种转换操作(如map、filter、reduce)处理数据。 4. **数据并行与数据分布**:讲解Flink如何实现数据的并行处理,...
数据经过Source后,可以通过各种Transformation(如Map、Filter、KeyBy、Reduce等)进行处理,这些操作可以构建出复杂的数据处理逻辑。 三、窗口与时间语义 Flink提供了滑动窗口、会话窗口和 tumbling 窗口等多种...
数据集变换包含了一系列的转换操作,比如连接(join)、笛卡尔积(cross)、规约(reduce)、聚合(aggregate)、共组(cogroup)、映射(map)、扁平映射(flatMap)、过滤(filter)、投影(project)、去重...
4. **数据转换**:包括 Map、Filter、KeyBy、Window、Reduce、Join 等操作的示例,以及状态管理和时间窗口的概念。 5. **数据聚合**:如何对数据进行分组、求和、平均等统计计算。 6. **状态管理**:理解 Flink 的...