`
greemranqq
  • 浏览: 980654 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

flink-watermark

阅读更多

一.背景

     当我们统计用户点击的时候,有时候会因为各种情况数据延迟,我们需要一个允许最大的延迟范围进行统计。这里的延迟统计分为两种:

       模拟初始数据:早上10:10:00  用户点击了一次,但是延迟到10:10:05 才发送过来,允许最大延迟5秒, 5秒窗口统计。我们希望还是能统计到

       

二.基本代码

@Data
public class UserTimeInfo implements Serializable {

    private String userId;
    /** 实际时间-偏移量 偏移后的时间*/
    private Timestamp pTime;
    public UserTimeInfo() {
    }
    public UserTimeInfo(String userId, Timestamp pTime) {
        this.userId = userId;
        this.pTime = pTime;
    }
}

 

public class UserTimeSource implements SourceFunction<UserTimeInfo> {


    /**
     * 为了id 统计方便,我们只留一个id
     */
    static String[] userIds = {"id->"};
    Random random = new Random();
    /**
     * 模拟发送20次
     */
    int times = 20;

    @Override
    public void run(SourceContext sc) throws Exception {
        while (true) {
            TimeUnit.SECONDS.sleep(1);
            int m = (int) (System.currentTimeMillis() % userIds.length);
            // 随机延迟几秒
            int defTime = random.nextInt(5);
            // 发送时间
            DateTime dateTime = new DateTime();
            // 计算延迟后的时间,并且打印时间
            DateTime dateTimePrint = dateTime.plusSeconds(-defTime);
            System.out.println("实际时间:" + print(dateTime) + ",延迟:" + defTime + ":-->" + print(dateTimePrint));
            // 发送延迟时间
            dateTime = dateTime.plusSeconds(-defTime);
            sc.collect(new UserTimeInfo(userIds[m], new Timestamp(dateTime.getMillis())));
            // 只持续固定时间方便观察
            if (--times == 0) {
                break;
            }
        }
    }

    @Override
    public void cancel() {
        System.out.println("cancel to do ...");
    }

    private static String print(DateTime dateTime) {
        return dateTime.toString("yyyy-MM-dd hh:mm:ss");
    }
}

 

三.定义我们的两种watermark

    a. 基于系统时间 

   

/**
 * 这里逻辑,模拟按系统时间进行统计
 * 所有数据和系统自身时间有关
 */
public class UserTimeWaterMarkBySystem implements AssignerWithPeriodicWatermarks<UserTimeInfo> {
    /**
     * 默认允许 5秒延迟
     */
    long maxDelayTime = 5000;
    /**
     * 该时间由于基于系统时间来做,
     * 如果10:00 11:10 秒用户点击的数据,然后延迟,实际收到的时间是10.00 11:15  
     * a.根据系统时间 想减,小于5秒就会统计到
     * b.注意,如果程序挂了,12点重启消费这个数据,就统计不到了
     * @return
     */
    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(System.currentTimeMillis() - maxDelayTime);
    }
    @Override
    public long extractTimestamp(UserTimeInfo element, long previousElementTimestamp) {
        long timestamp = element.getPTime().getTime();
        return timestamp;
    }
}

 

  b.根据数据自生时间进行做延迟判断

   

public class UserTimeWaterMarkByRowTime implements AssignerWithPeriodicWatermarks<UserTimeInfo> {
    /**
     * 默认允许 5秒延迟
     */
    long maxDelayTime = 5000;

    /**
     * 该时间由于基于数据时间来做,
     * 如果10:00 11:10 秒用户点击的数据,然后延迟,实际收到的时间是10.00 11:15
     * a.根据系统时间 想减,小于5秒就会统计到
     * b.只要消息 时间延迟小于5 就能被统计。 
     * 这种对点击事件来说,更符合要求
     * @return
     */
    private long currentMaxTimestamp;
    
    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp - maxDelayTime);
    }
    @Override
    public long extractTimestamp(UserTimeInfo element, long previousElementTimestamp) {
        long timestamp = element.getPTime().getTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }
}

 

四.source 类,和以前一样

  

public class UserTimeWaterMarkApp {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<UserTimeInfo> userInfoDataStream = env.addSource(new UserTimeSource());
        //  UserTimeWaterMarkByRowTime 这个时间可以替换
        DataStream<UserTimeInfo> timedData = userInfoDataStream.assignTimestampsAndWatermarks(new UserTimeWaterMarkByRowTime());
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
        tableEnv.registerDataStream("test", timedData, "userId,pTime.rowtime");
        Table result = tableEnv.sqlQuery("SELECT userId,TUMBLE_END(pTime, INTERVAL '5' SECOND) as pTime,count(1) as cnt FROM  test" +
                " GROUP BY TUMBLE(pTime, INTERVAL '5' SECOND),userId ");
        // deal with (Tuple2<Boolean, Row> value) -> out.collect(row)
        SingleOutputStreamOperator allClick = tableEnv.toRetractStream(result, Row.class)
                .flatMap((Tuple2<Boolean, Row> value, Collector<Row> out) -> {
                    out.collect(value.f1);
                }).returns(Row.class);
        // add sink or print
        allClick.print();
        env.execute("test");
    }

}

 

 

public class UserTimeWaterMarkApp {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<UserTimeInfo> userInfoDataStream = env.addSource(new UserTimeSource());

        DataStream<UserTimeInfo> timedData = userInfoDataStream.assignTimestampsAndWatermarks(new UserTimeWaterMarkByRowTime());
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
        tableEnv.registerDataStream("test", timedData, "userId,pTime.rowtime");
        Table result = tableEnv.sqlQuery("SELECT userId,TUMBLE_END(pTime, INTERVAL '5' SECOND) as pTime,count(1) as cnt FROM  test" +
                " GROUP BY TUMBLE(pTime, INTERVAL '5' SECOND),userId ");
        // deal with (Tuple2<Boolean, Row> value) -> out.collect(row)
        SingleOutputStreamOperator allClick = tableEnv.toRetractStream(result, Row.class)
                .flatMap((Tuple2<Boolean, Row> value, Collector<Row> out) -> {
                    out.collect(value.f1);
                }).returns(Row.class);
        // add sink or print
        allClick.print();
        env.execute("test");
    }

 

小结:

   1.这个是基于flink 1.7 跑的

   2.代码比较简单,也好理解,有问题直接私信我 

 

 

 

0
0
分享到:
评论

相关推荐

    GUI面板MATLAB香烟汉字识别.zip

    GUI面板MATLAB香烟汉字识别

    2023年统招专升本计算机考试真题及答案6.pdf

    2023年统招专升本计算机考试真题及答案6.pdf

    Java毕业设计-SpringBoot+Vue的“漫画之家”系统(附源码、数据库、教程).zip

    Java 项目, Java 毕业设计,Java 课程设计,基于 SpringBoot 开发的,含有代码注释,新手也可看懂。毕业设计、期末大作业、课程设计、高分必看,下载下来,简单部署,就可以使用。 包含:项目源码、数据库脚本、软件工具等,前后端代码都在里面。 该系统功能完善、界面美观、操作简单、功能齐全、管理便捷,具有很高的实际应用价值。 项目都经过严格调试,确保可以运行! 1. 技术组成 前端:html、javascript、Vue 后台框架:SpringBoot 开发环境:idea 数据库:MySql(建议用 5.7 版本,8.0 有时候会有坑) 数据库工具:navicat 部署环境:Tomcat(建议用 7.x 或者 8.x 版本), maven 2. 部署 如果部署有疑问的话,可以找我咨询 Java工具包下载地址: https://pan.quark.cn/s/eb24351ebac4 后台路径地址:localhost:8080/项目名称/admin/dist/index.html 前台路径地址:localhost:8080/项目名称/front/index.html (无前台不需要输入)

    基于ssm+vue的教学视频点播系统(java毕业设计,包括源码,数据库,教程).zip

    Java 项目, Java 毕业设计,Java 课程设计,基于 SSM 开发的,含有代码注释,新手也可看懂。毕业设计、期末大作业、课程设计、高分必看,下载下来,简单部署,就可以使用。 包含:项目源码、数据库脚本、软件工具等,前后端代码都在里面。 该系统功能完善、界面美观、操作简单、功能齐全、管理便捷,具有很高的实际应用价值。 项目都经过严格调试,确保可以运行! 1. 技术组成 前端:vue/html5 后台框架:SSM 开发环境:idea 数据库:MySql(建议用 5.7 版本,8.0 有时候会有坑) 数据库工具:navicat 部署环境:Tomcat(建议用 7.x 或者 8.x 版本), maven 2. 部署 如果部署有疑问的话,可以找我咨询 Java工具包下载地址: https://pan.quark.cn/s/eb24351ebac4

    Java毕业设计-基于SpringBoot+Vue+MySql的五台山景点购票系统(附源码、数据库、教程).zip

    Java 项目, Java 毕业设计,Java 课程设计,基于 SpringBoot 开发的,含有代码注释,新手也可看懂。毕业设计、期末大作业、课程设计、高分必看,下载下来,简单部署,就可以使用。 包含:项目源码、数据库脚本、软件工具等,前后端代码都在里面。 该系统功能完善、界面美观、操作简单、功能齐全、管理便捷,具有很高的实际应用价值。 项目都经过严格调试,确保可以运行! 1. 技术组成 前端:html、javascript、Vue 后台框架:SpringBoot 开发环境:idea 数据库:MySql(建议用 5.7 版本,8.0 有时候会有坑) 数据库工具:navicat 部署环境:Tomcat(建议用 7.x 或者 8.x 版本), maven 2. 部署 如果部署有疑问的话,可以找我咨询 Java工具包下载地址: https://pan.quark.cn/s/eb24351ebac4 后台路径地址:localhost:8080/项目名称/admin/dist/index.html 前台路径地址:localhost:8080/项目名称/front/index.html (无前台不需要输入)

    MATLAB设计的危险区域预警系统(GUI界面设计).zip

    MATLAB设计的危险区域预警系统(GUI界面设计)

    2023年江苏省计算机二级VB试卷.pdf

    2023年江苏省计算机二级VB试卷.pdf

    NSCBx1.0.1b Keys19.0.0.zip

    NSCBx1.0.1b Keys19.0.0.zip

    智慧园区一卡通与清分结算系统Word(45页).docx

    智慧园区,作为现代化城市发展的新兴模式,正逐步改变着传统园区的运营与管理方式。它并非简单的信息化升级,而是跨越了行业壁垒,实现了数据共享与业务协同的复杂运行系统。在智慧园区的构建中,人们常常陷入一些误区,如认为智慧园区可以速成、与本部门无关或等同于传统信息化。然而,智慧园区的建设需要长期规划与多方参与,它不仅关乎技术层面的革新,更涉及到管理理念的转变。通过打破信息孤岛,智慧园区实现了各系统间的无缝对接,为园区的科学决策提供了有力支持。 智慧园区的核心价值在于其提供的全方位服务与管理能力。从基础设施的智能化改造,如全面光纤接入、4G/5G网络覆盖、Wi-Fi网络及物联网技术的运用,到园区综合管理平台的建设,智慧园区打造了一个高效、便捷、安全的运营环境。在这个平台上,园区管理方可以实时掌握运营动态,包括道路状况、游客数量、设施状态及自然环境等信息,从而实现事件的提前预警与自动调配。同时,智慧园区还为园区企业提供了丰富的服务,如项目申报、资质认定、入园车辆管理及统计分析等,极大地提升了企业的运营效率。此外,智慧园区还注重用户体验,通过信息发布系统、服务门户系统及各类智慧应用,如掌上营销、智慧停车、智能安防等,为园区员工、企业及访客提供了便捷、舒适的生活与工作体验。值得一提的是,智慧园区还充分利用大数据、云计算等先进技术,对园区的能耗数据进行采集、分析与管理,实现了绿色、节能的运营目标。 在智慧园区的建设过程中,还涌现出了许多创新的应用场景。例如,在环境监测方面,智慧园区通过集成各类传感器与监控系统,实现了对园区水质、空气质量的实时监测与预警;在交通管理方面,智慧园区利用物联网技术,对园区观光车、救援车辆等进行实时定位与调度,提高了交通效率与安全性;在公共服务方面,智慧园区通过构建统一的公共服务平台,为园区居民提供了包括平安社区、便民社区、智能家居在内的多元化服务。这些创新应用不仅提升了园区的智能化水平,还为园区的可持续发展奠定了坚实基础。同时,智慧园区的建设也促进了产业链的聚合与发展,通过搭建聚合产业链平台,实现了园区内企业间的资源共享与合作共赢。总的来说,智慧园区的建设不仅提升了园区的综合竞争力,还为城市的智慧化发展树立了典范。它以用户需求为导向,以技术创新为驱动,不断推动着园区向更加智慧、高效、绿色的方向发展。对于写方案的读者而言,智慧园区的成功案例与创新应用无疑提供了宝贵的借鉴与启示,值得深入探索与学习。

    数据库系统课程设计报告-商品供应管理系统设计与开发

    一、系统需求分析 1 (一)需求概述 1 (二)业务流分析 1 (三)数据流分析 3 (四)数据字典 3 二、数据库概念结构设计 5 (一)实体分析 5 (二)属性分析 5 (三)联系分析 6 (四)概念模型分析(.PDM图) 7 三、数据库逻辑结构设计 8 (一)概念模型转化为逻辑模型 8 1.一对一关系的转化 8 2.一对多关系的转化 8 3.多对多关系的转化 8 (二)逻辑模型设计(.PDM图) 8 四、数据库物理实现 9 (一)表设计 9 (二)创建表和完整性约束代码设计 10 (三)创建视图、索引、存储过程和触发器 11 五、数据库功能调试 12 (一)职工管理模块 12 (二)工程负责人管理模块 13 (三)系统管理员管理模块 15 六、设计系统前台软件 20 (一)开发软件选择 20 (二)软件功能要求与设计 21 (三)软件功能实现 21 (四)系统测试 27 七、设计总结 28

    springboot校园在线拍卖系统.zip

    ava项目springboot基于springboot的课程设计,包含源码+数据库+毕业论文

    【人机交互】MATLAB手势识别设计.zip

    【人机交互】MATLAB手势识别设计

    【工程项目】MATLAB的人脸+指纹融合系统(结合人脸和指纹一致性方可通行).zip

    【工程项目】MATLAB的人脸+指纹融合系统(结合人脸和指纹一致性方可通行)

    2023年历年真题考试:管理系统中计算机应用历年真题汇编(共207题).pdf

    2023年历年真题考试:管理系统中计算机应用历年真题汇编(共207题).pdf

    sprinmgboot实习管理系统--论文.zip

    ava项目springboot基于springboot的课程设计,包含源码+数据库+毕业论文

    【人机交互】MATLAB信号与系统数字信号设计.zip

    【人机交互】MATLAB信号与系统数字信号设计

    Delphi 12.3控件之Chatbox-1.9.8-Setup.rar

    Delphi 12.3控件之Chatbox-1.9.8-Setup.rar

    HD-Speed绿色版是一款功能强大的磁盘读取速度测试软件,这款软件可以帮助用户经行磁盘检测、分析、清理等功能,需要的朋友欢迎来绿色资源网下载使用

    HD_Speed是一款非常小巧好用的实时磁盘读取速度测试软件。它可以比较准确地测试到磁盘的持续传输率和突发传输率一定程度上反映系统的磁盘性能,可以测试软盘、硬盘、光驱。并用曲线图方式体现出来,用它可以很直观的看出您的硬盘到底有多快而且也可以很方便的看出光驱的加速曲线。并且作为一款免费软件,测试也相当简单,大家有兴趣可以测试一下自己的磁盘性能。 HD_Speed绿色版 HD_Speed绿色版功能介绍 ●任何目录可以在Finder(即在本地机器上,一个外部驱动器或远程服务器上)可以选择立即分析。 ●自由航行在一个文件夹或子目录的分析装置。 ●平滑的动画之间的转换选定的目录管理。 ●移动鼠标到一个文件中看到它的名称和尺寸。 ●使用上下文菜单中选择文件的垃圾。 ●快速搜索文件名。 ●保存您最喜爱的位置和访问他们在主窗口中单击。

    基于SSM+JSP的高校四六级报名管理系统+数据库(Java毕业设计,包括源码,教程).zip

    Java 项目, Java 毕业设计,Java 课程设计,基于 SpringBoot 开发的,含有代码注释,新手也可看懂。毕业设计、期末大作业、课程设计、高分必看,下载下来,简单部署,就可以使用。 包含:项目源码、数据库脚本、软件工具等,前后端代码都在里面。 该系统功能完善、界面美观、操作简单、功能齐全、管理便捷,具有很高的实际应用价值。 项目都经过严格调试,确保可以运行! 1. 技术组成 前端:jsp 后台框架:SSM 开发环境:idea 数据库:MySql(建议用 5.7 版本,8.0 有时候会有坑) 数据库工具:navicat 部署环境:Tomcat(建议用 7.x 或者 8.x 版本), maven 2. 部署 如果部署有疑问的话,可以找我咨询 Java工具包下载地址: https://pan.quark.cn/s/eb24351ebac4

    GUI面板MATLAB芯片字符识别.zip

    GUI面板MATLAB芯片字符识别

Global site tag (gtag.js) - Google Analytics