`
greemranqq
  • 浏览: 976946 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

flink-reduce

阅读更多

一.背景

     有时候我们需要过滤数据,有些中间数据是不需要的,比如场景:

     binlog 数据更新的时候,我们仅仅需要最新数据。会根据ID 分组,然后取version 最大的一条,存储

 

二.简单实例

  

@Data
@ToString
public class Order {
    // 主键id
    private Integer id;
    // 版本
    private Integer version;
    private Timestamp mdTime;

    public Order(int id, Integer version) {
        this.id = id;
        this.version = version;
        this.mdTime = new Timestamp(System.currentTimeMillis());
    }

    public Order() {
    }
}

 

 

public class OrderSource implements SourceFunction<Order> {
    Random random = new Random();

    @Override
    public void run(SourceContext<Order> ctx) throws Exception {
        while (true) {
            TimeUnit.MILLISECONDS.sleep(100);
            // 为了区分,我们简单生0~2的id, 和版本0~99
            int id = random.nextInt(3);
            Order o = new Order(id, random.nextInt(100));
            ctx.collect(o);
        }
    }
    @Override
    public void cancel() {

    }
}

 

public class ReduceApp {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<Order> userInfoDataStream = env.addSource(new OrderSource());

        DataStream<Order> timedData = userInfoDataStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Order>() {
            @Override
            public long extractAscendingTimestamp(Order element) {
                return element.getMdTime().getTime();
            }
        });

        SingleOutputStreamOperator<Order> reduce = timedData
                .keyBy("id")
                .timeWindow(Time.seconds(10), Time.seconds(5))
                .reduce((ReduceFunction<Order>) (v1, v2) -> v1.getVersion() >= v2.getVersion() ? v1 : v2);

        reduce.print();
        env.execute("test");
    }
}

 

结果:

Order(id=2, version=97, mdTime=2019-03-11 17:39:34.052)

Order(id=0, version=99, mdTime=2019-03-11 17:39:32.913)

Order(id=1, version=96, mdTime=2019-03-11 17:39:34.155)

Order(id=2, version=97, mdTime=2019-03-11 17:39:34.052)

Order(id=1, version=96, mdTime=2019-03-11 17:39:34.155)

Order(id=0, version=99, mdTime=2019-03-11 17:39:32.913)

 

这个会对同一个窗口做过滤,比如同步到另一个mysql,hdfs,就能减少数据量

 

 

 

0
0
分享到:
评论

相关推荐

    flink-1.13.6-bin-scala_2.11.tgz

    1. DataStream API:针对无界数据流的处理,提供了丰富的操作符,如 map、filter、reduce 等,用于构建数据处理逻辑。 2. Batch API(DataSet API):针对有界数据集的处理,适合离线批处理任务。 3. Table & SQL ...

    flink-1.14.4 安装包 scala 2.12

    - **Transformation**: 包括 Map、FlatMap、Filter、KeyBy、Reduce 等操作,用于对数据进行处理。 - **Stateful Processing**: 支持状态计算,允许程序记住之前处理过的数据。 - **Windowing**: 提供时间窗口和滑动...

    flink-1.10.1-bin-scala_2.11.tgz

    2. 处理算子:包括 Map、Filter、Reduce、Join 等基础操作,以及 Key-Value、Window、State 等高级操作。 3. 窗口机制:Flink 提供了基于时间、滑动、会话等多种窗口操作,适应不同实时场景的需求。 4. 批处理:...

    flink-1.14.0.rar

    4. **运算符与窗口**:Flink提供了一系列的运算符,如Map、Filter、KeyBy、Reduce等,以及时间窗口和滑动窗口等窗口操作,用于处理流数据。窗口操作允许开发者根据时间或数据量对数据流进行分组,进行聚合或计算。 ...

    flink-1.8.0-bin-scala_2.12

    3. 应用map、flatMap、keyBy和reduce操作进行数据处理。 4. 将结果写入输出(如Console或File)。 五、总结 Apache Flink 1.8.0作为实时计算的重要平台,通过不断优化和创新,为大数据处理提供了高效、可靠和灵活...

    flink-1.2.1安装包

    - **DataStream API和DataSet API**: 分别用于处理无界和有界数据,提供丰富的算子集如map、filter、reduce等,以及窗口操作,满足不同场景需求。 - **JobManager和TaskManager**: JobManager负责任务调度和资源管理...

    flink-kafka-consumer-master.zip_agodss_flink_flinlk_java_pour2gn

    Flink是一个开源的流处理框架,它支持事件时间和窗口计算,提供了丰富的数据处理算子,如`map`、`filter`、`reduce`等,用于对数据流进行转换和聚合。`map`是Flink中最基本的操作之一,它接受一个函数,将输入数据流...

    flink-1.7.zip

    它提供了丰富的算子,如map、filter、reduce等,以及窗口操作,用于处理实时数据流。 - **Event Time**:Flink 支持事件时间处理,即按照数据发生的时间顺序进行处理,这在处理乱序事件时非常关键。 - **Stateful ...

    项目3-Flink-流批一体API1

    接下来是Transformation,它定义了数据流的操作,如map、filter、reduce等,这些操作在数据流上进行,形成新的数据流。最后,Sink将处理后的数据输出到指定的目标,如文件、数据库或消息队列。 总结来说,Flink的流...

    flink-1.5.4-src.tgz

    在 1.5.4 版本中,DataStream API 提供了丰富的算子,如 Map、Filter、Reduce、Join 等,以及窗口操作,支持时间驱动和事件驱动的窗口计算。源码中可以看到,这些算子的实现细节和如何与其他组件交互。 3. **State...

    flink读取kafka数据.zip

    Flink 提供丰富的操作符,如 map、filter、reduce、keyBy 和 window,用于对数据流进行各种业务逻辑计算。KeyBy 操作用于将数据流按照特定字段分组,以便在每个分组内进行聚合操作。Window 可以定义时间或事件触发...

    HadoopCon2016-Apache-Flink-Tutorial-DataStream-API

    ### Apache Flink DataStream API 教程:深入理解流处理技术 #### 一、概述 在本次教程中,我们将深入探讨 Apache Flink 的 DataStream API,这是一个强大的工具,用于实现高性能的大规模数据流处理任务。Flink 的...

    flink-study:Flink学习

    《Flink学习:深入探索Java实现的流处理框架》 Apache Flink是一个开源的流处理框架,它在大数据处理领域有着广泛的应用。Flink的设计目标是支持低延迟、高吞吐量的数据处理,并且能够同时处理实时数据流和批处理...

    flink-1.6.4-src.tgz

    Stream API 是Flink处理无界数据流的核心接口,包括各种算子如map、filter、reduce等,以及窗口操作,如时间窗口、滑动窗口和Session窗口。在1.6.4版本中,DataStream API 已经相当成熟,能够支持复杂事件处理和...

    flink-training-exercises

    - **Transformation**:包括Map、FlatMap、Filter、KeyBy、Reduce、Join等操作,用于对数据进行转换和处理。 - **Sink Functions**:定义如何将处理后的数据写入目标系统,如HDFS、Cassandra或Kafka。 3. **窗口...

    flink-1.7超详细中文教程

    4. **转换与操作**:详细介绍各种转换操作,如map、filter、keyBy、reduce等。 5. **数据 sink**:介绍如何将处理结果写入到不同的数据存储,如HDFS、Cassandra等。 **三、Flink实例解析** 实例部分通过实际案例...

    Flink教程收集

    3. **数据源与数据转换**:介绍如何定义数据源,如Kafka、Socket或自定义数据源,以及如何通过各种转换操作(如map、filter、reduce)处理数据。 4. **数据并行与数据分布**:讲解Flink如何实现数据的并行处理,...

    flink-learning-from-zhisheng:学习Flink的基本知识

    数据经过Source后,可以通过各种Transformation(如Map、Filter、KeyBy、Reduce等)进行处理,这些操作可以构建出复杂的数据处理逻辑。 三、窗口与时间语义 Flink提供了滑动窗口、会话窗口和 tumbling 窗口等多种...

    apache-flink

    数据集变换包含了一系列的转换操作,比如连接(join)、笛卡尔积(cross)、规约(reduce)、聚合(aggregate)、共组(cogroup)、映射(map)、扁平映射(flatMap)、过滤(filter)、投影(project)、去重...

    flink-samples:Flink样本

    4. **数据转换**:包括 Map、Filter、KeyBy、Window、Reduce、Join 等操作的示例,以及状态管理和时间窗口的概念。 5. **数据聚合**:如何对数据进行分组、求和、平均等统计计算。 6. **状态管理**:理解 Flink 的...

Global site tag (gtag.js) - Google Analytics