`
greemranqq
  • 浏览: 975416 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

flink-watermark

阅读更多

一.背景

     当我们统计用户点击的时候,有时候会因为各种情况数据延迟,我们需要一个允许最大的延迟范围进行统计。这里的延迟统计分为两种:

       模拟初始数据:早上10:10:00  用户点击了一次,但是延迟到10:10:05 才发送过来,允许最大延迟5秒, 5秒窗口统计。我们希望还是能统计到

       

二.基本代码

@Data
public class UserTimeInfo implements Serializable {

    private String userId;
    /** 实际时间-偏移量 偏移后的时间*/
    private Timestamp pTime;
    public UserTimeInfo() {
    }
    public UserTimeInfo(String userId, Timestamp pTime) {
        this.userId = userId;
        this.pTime = pTime;
    }
}

 

public class UserTimeSource implements SourceFunction<UserTimeInfo> {


    /**
     * 为了id 统计方便,我们只留一个id
     */
    static String[] userIds = {"id->"};
    Random random = new Random();
    /**
     * 模拟发送20次
     */
    int times = 20;

    @Override
    public void run(SourceContext sc) throws Exception {
        while (true) {
            TimeUnit.SECONDS.sleep(1);
            int m = (int) (System.currentTimeMillis() % userIds.length);
            // 随机延迟几秒
            int defTime = random.nextInt(5);
            // 发送时间
            DateTime dateTime = new DateTime();
            // 计算延迟后的时间,并且打印时间
            DateTime dateTimePrint = dateTime.plusSeconds(-defTime);
            System.out.println("实际时间:" + print(dateTime) + ",延迟:" + defTime + ":-->" + print(dateTimePrint));
            // 发送延迟时间
            dateTime = dateTime.plusSeconds(-defTime);
            sc.collect(new UserTimeInfo(userIds[m], new Timestamp(dateTime.getMillis())));
            // 只持续固定时间方便观察
            if (--times == 0) {
                break;
            }
        }
    }

    @Override
    public void cancel() {
        System.out.println("cancel to do ...");
    }

    private static String print(DateTime dateTime) {
        return dateTime.toString("yyyy-MM-dd hh:mm:ss");
    }
}

 

三.定义我们的两种watermark

    a. 基于系统时间 

   

/**
 * 这里逻辑,模拟按系统时间进行统计
 * 所有数据和系统自身时间有关
 */
public class UserTimeWaterMarkBySystem implements AssignerWithPeriodicWatermarks<UserTimeInfo> {
    /**
     * 默认允许 5秒延迟
     */
    long maxDelayTime = 5000;
    /**
     * 该时间由于基于系统时间来做,
     * 如果10:00 11:10 秒用户点击的数据,然后延迟,实际收到的时间是10.00 11:15  
     * a.根据系统时间 想减,小于5秒就会统计到
     * b.注意,如果程序挂了,12点重启消费这个数据,就统计不到了
     * @return
     */
    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(System.currentTimeMillis() - maxDelayTime);
    }
    @Override
    public long extractTimestamp(UserTimeInfo element, long previousElementTimestamp) {
        long timestamp = element.getPTime().getTime();
        return timestamp;
    }
}

 

  b.根据数据自生时间进行做延迟判断

   

public class UserTimeWaterMarkByRowTime implements AssignerWithPeriodicWatermarks<UserTimeInfo> {
    /**
     * 默认允许 5秒延迟
     */
    long maxDelayTime = 5000;

    /**
     * 该时间由于基于数据时间来做,
     * 如果10:00 11:10 秒用户点击的数据,然后延迟,实际收到的时间是10.00 11:15
     * a.根据系统时间 想减,小于5秒就会统计到
     * b.只要消息 时间延迟小于5 就能被统计。 
     * 这种对点击事件来说,更符合要求
     * @return
     */
    private long currentMaxTimestamp;
    
    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp - maxDelayTime);
    }
    @Override
    public long extractTimestamp(UserTimeInfo element, long previousElementTimestamp) {
        long timestamp = element.getPTime().getTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }
}

 

四.source 类,和以前一样

  

public class UserTimeWaterMarkApp {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<UserTimeInfo> userInfoDataStream = env.addSource(new UserTimeSource());
        //  UserTimeWaterMarkByRowTime 这个时间可以替换
        DataStream<UserTimeInfo> timedData = userInfoDataStream.assignTimestampsAndWatermarks(new UserTimeWaterMarkByRowTime());
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
        tableEnv.registerDataStream("test", timedData, "userId,pTime.rowtime");
        Table result = tableEnv.sqlQuery("SELECT userId,TUMBLE_END(pTime, INTERVAL '5' SECOND) as pTime,count(1) as cnt FROM  test" +
                " GROUP BY TUMBLE(pTime, INTERVAL '5' SECOND),userId ");
        // deal with (Tuple2<Boolean, Row> value) -> out.collect(row)
        SingleOutputStreamOperator allClick = tableEnv.toRetractStream(result, Row.class)
                .flatMap((Tuple2<Boolean, Row> value, Collector<Row> out) -> {
                    out.collect(value.f1);
                }).returns(Row.class);
        // add sink or print
        allClick.print();
        env.execute("test");
    }

}

 

 

public class UserTimeWaterMarkApp {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<UserTimeInfo> userInfoDataStream = env.addSource(new UserTimeSource());

        DataStream<UserTimeInfo> timedData = userInfoDataStream.assignTimestampsAndWatermarks(new UserTimeWaterMarkByRowTime());
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
        tableEnv.registerDataStream("test", timedData, "userId,pTime.rowtime");
        Table result = tableEnv.sqlQuery("SELECT userId,TUMBLE_END(pTime, INTERVAL '5' SECOND) as pTime,count(1) as cnt FROM  test" +
                " GROUP BY TUMBLE(pTime, INTERVAL '5' SECOND),userId ");
        // deal with (Tuple2<Boolean, Row> value) -> out.collect(row)
        SingleOutputStreamOperator allClick = tableEnv.toRetractStream(result, Row.class)
                .flatMap((Tuple2<Boolean, Row> value, Collector<Row> out) -> {
                    out.collect(value.f1);
                }).returns(Row.class);
        // add sink or print
        allClick.print();
        env.execute("test");
    }

 

小结:

   1.这个是基于flink 1.7 跑的

   2.代码比较简单,也好理解,有问题直接私信我 

 

 

 

0
0
分享到:
评论

相关推荐

    flink-java-本地例子

    WATERMARK FOR proctime AS proctime - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'your-topic', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'json' ); CREATE ...

    flink-1.3.0-bin-hadoop2-scala_2.10.tgz

    此外,Flink引入了水印(Watermark)机制,用于处理乱序事件,确保处理时间接近于事件时间,提高了实时处理的准确性。 3. **状态管理**:Flink 强大的状态管理能力在1.3.0版本中得到进一步强化,提供了更可靠的...

    Stream Processing with Apache Flink-epub.zip

    书中会详细介绍如何配置和使用Flink的事件时间机制,以及如何设置水印(Watermark)来处理延迟到达的数据。 Flink的DataStream API是用于处理无界和有界数据流的主要接口。书中有详细的章节讲解如何使用这些API构建...

    项目4-Flink-高级API1

    3. Time:Flink的时间处理机制是另一个关键特性,它支持事件时间(EventTime)处理,通过Watermark机制解决了数据乱序和迟到问题。事件时间允许系统根据事件发生的实际时间进行计算,而不是处理时间,增强了处理实时...

    Flink SQL大数据项目实战(基于Flink1.14.3版本)

    本课程以FlinkSQL流批一体技术为主线,全面讲解Flink Table编程、SQL编程、Time与WaterMark、Window操作、函数使用、元数据管理,最后以一个完整的实战项目为例,详细讲解FlinkSQL的流式项目开发。学完本课程,希望...

    flink-local-train:flink入门到项目实践

    文章会对Flink中基本API如:DataSet、DataStream、Table、Sql和常用特性如:Time&Window、窗口函数、Watermark、触发器、分布式缓存、异步IO、侧输出、广播和高级应用如:ProcessFunction、状态管理等知识点进行整理...

    flink多并行数据源下的waterMark触发机制1.pdf

    Flink 多并行数据源下的 WaterMark 触发机制 Flink 是一个基于事件时间的流处理引擎,它可以处理大规模数据流。然而,在处理多并行数据源时,Flink 需要一种机制来触发 watermark,以便正确地处理事件时间。这篇...

    flink-source-code-analysis:Apache Flink源码分析系列,基于git tag 1.1.2

    使用WaterMark技术实现了窗口计算中延迟数据的处理,同时对流式计算的窗口时间定义分类:处理时间,摄取时间,事件时间本人觉得flink的这些特性一定程序上可以窥探出大数据的未来方向,所以花了些时间来阅读源码,先...

    Flink实战脑图.mmap.xmind

    内容包含(Flink简介-Flink编程模型-重要概念-Task划分-共享资源槽-Flink的时间-Flink的Window-Flink的WaterMark-重启策略)等内容

    flink入门到精通视频教程

    7.Flink watermark与侧道输出 8.Flink状态计算 9.Flink容错checkpoint与一致性语义 10.Flink进阶 异步IO,背压,内存管理 11.Flink Table API与SQL 课程目录介绍 第一章 Flink简介 01.Flink的引入 02.什么是Flink 03...

    flink-redis-sink:使用TableSQL API帮助沉入Redis

    Thirdly, watermark needs to be supported too. 但是,如果不更改源代码,它看起来并不容易。 我想写我自己的redis-sink。 当您查看此项目时,有一些技巧需要记住: 该项目仅支持REDIS CLUSTER模式。 该项目使用...

    FlinkForwardChina2018FlinkStreamingSQL2018.pdf

    为了更好地处理无序数据,Flink增强了对事件时间的支持,包括更加灵活的水位线(watermark)机制。 #### 3. 历史表功能 历史表功能允许用户查询过去某个时间点的数据状态,这对于需要跟踪历史变化的应用场景非常...

    大数据技术之Flink教程

    - **Time Semantics & Watermark**:Flink的时间语义确保正确处理乱序事件,Watermark机制用来处理延迟事件,保证处理时间与事件时间的一致性。 **ProcessFunction API(底层API)** ProcessFunction允许用户...

    【Flink篇07】Flink之时间语义和WaterMark1

    当Flink接收到数据时,会根据数据中的最大Event Time生成Watermark,一旦Watermark超过了窗口的结束时间,那么窗口就会被触发并进行计算。 例如,假设我们有两个窗口,一个是从1s到5s,另一个是从6s到10s。当时间戳...

    flink系列-使用/教程/实例/配置/文档/代码.zip

    Flink通过Event Time和Watermark机制实现了时间窗口的概念,确保了处理无界数据流时的精确一次语义。 二、Flink使用 Flink的使用通常包括环境搭建、作业提交和监控。你可以使用Java或Scala API编写Flink程序,也...

    Flink实战脑图.xmind

    flink全站式内容纲要,针对于flink的内容,学习思路,Flink保证ExactlyOnce,Flink的WaterMark,Flink侧流输出 等

    Flink八股文-5分钟学大数据

    Flink 可以通过 watermark 机制来处理迟到数据。watermark 机制可以将迟到数据与正常数据区分开,以确保数据处理的正确性和一致性。 19. Flink 中 window 出现数据倾斜怎么解决 可以通过调整窗口大小、调整并行度...

    Flink超神文档.pdf

    - **Flink Time Watermark(水印)**: 用来追踪事件时间。 - **Allowed Lateness**: 允许迟到的数据。 #### 十一、Flink关联维表实战 **11.1 TableAPI和FlinkSQL** - **开发环境构建**: 构建支持 TableAPI 和 Flink...

    基于Flink的大数据实施城市交通监控平台.zip

    3. **事件时间和处理延迟**:理解Flink中的事件时间处理机制,它是如何确保数据的正确处理顺序,以及如何通过水印(Watermark)来处理乱序事件。 4. **大数据在交通监控中的应用**:研究如何利用大数据技术分析交通...

Global site tag (gtag.js) - Google Analytics