在使用eventTime的时候如何处理乱序数据?我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络延迟等原因,导致乱序的产生,特别是使用kafka的话,多个分区的数据无法保证有序。所以在进行window计算的时候,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。Watermark是用于处理乱序事件的,用于衡量Event Time进展的机制。watermark可以翻译为水位线。
一、Watermark的核心原理
Watermark的核心本质可以理解成一个延迟触发机制。
在 Flink 的窗口处理过程中,如果确定全部数据到达,就可以对 Window 的所有数据做 窗口计算操作(如汇总、分组等),如果数据没有全部到达,则继续等待该窗口中的数据全 部到达才开始处理。这种情况下就需要用到水位线(WaterMarks)机制,它能够衡量数据处 理进度(表达数据到达的完整性),保证事件数据(全部)到达 Flink 系统,或者在乱序及 延迟到达时,也能够像预期一样计算出正确并且连续的结果。当任何 Event 进入到 Flink 系统时,会根据当前最大事件时间产生 Watermarks 时间戳。
那么 Flink 是怎么计算 Watermak 的值呢?
Watermark =进入Flink 的最大的事件时间(mxtEventTime)-指定的延迟时间(t)
那么有 Watermark 的 Window 是怎么触发窗口函数的呢?
如果有窗口的停止时间等于或者小于 maxEventTime - t(当时的warkmark),那么这个窗口被触发执行。
其核心处理流程如下图所示。
二、Watermark的三种使用情况
1、本来有序的Stream中的 Watermark
如果数据元素的事件时间是有序的,Watermark 时间戳会随着数据元素的事件时间按顺 序生成,此时水位线的变化和事件时间保持一直(因为既然是有序的时间,就不需要设置延迟了,那么t就是 0。所以 watermark=maxtime-0 = maxtime),也就是理想状态下的水位 线。当 Watermark 时间大于 Windows 结束时间就会触发对 Windows 的数据计算,以此类推, 下一个 Window 也是一样。这种情况其实是乱序数据的一种特殊情况。
2、乱序事件中的Watermark
现实情况下数据元素往往并不是按照其产生顺序接入到 Flink 系统中进行处理,而频繁 出现乱序或迟到的情况,这种情况就需要使用 Watermarks 来应对。比如下图,设置延迟时间t为2。
3、并行数据流中的Watermark
在多并行度的情况下,Watermark 会有一个对齐机制,这个对齐机制会取所有 Channel 中最小的 Watermark。
三、设置Watermark的核心代码
1、首先,正确设置事件处理的时间语义,一般都是采用Event Time。
sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
2、其次,指定生成Watermark的机制,包括:延时处理的时间和EventTime对应的字段。如下:
注意:不管是数据是否有序,都可以使用上面的代码。有序的数据只是无序数据的一种特殊情况。
四、Watermark编程案例
测试数据:基站的手机通话数据,如下:
需求:按基站,每5秒统计通话时间最长的记录。
- StationLog用于封装基站数据
package watermark; //station1,18688822219,18684812319,10,1595158485855 public class StationLog { private String stationID; //基站ID private String from; //呼叫放 private String to; //被叫方 private long duration; //通话的持续时间 private long callTime; //通话的呼叫时间 public StationLog(String stationID, String from, String to, long duration, long callTime) { this.stationID = stationID; this.from = from; this.to = to; this.duration = duration; this.callTime = callTime; } public String getStationID() { return stationID; } public void setStationID(String stationID) { this.stationID = stationID; } public long getCallTime() { return callTime; } public void setCallTime(long callTime) { this.callTime = callTime; } public String getFrom() { return from; } public void setFrom(String from) { this.from = from; } public String getTo() { return to; } public void setTo(String to) { this.to = to; } public long getDuration() { return duration; } public void setDuration(long duration) { this.duration = duration; } }
- 代码实现:WaterMarkDemo用于完成计算(注意:为了方便咱们测试设置任务的并行度为1)
package watermark; import java.time.Duration; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; //每隔五秒,将过去是10秒内,通话时间最长的通话日志输出。 public class WaterMarkDemo { public static void main(String[] args) throws Exception { //得到Flink流式处理的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); //设置周期性的产生水位线的时间间隔。当数据流很大的时候,如果每个事件都产生水位线,会影响性能。 env.getConfig().setAutoWatermarkInterval(100);//默认100毫秒 //得到输入流 DataStreamSource<String> stream = env.socketTextStream("bigdata111", 1234); stream.flatMap(new FlatMapFunction<String, StationLog>() { public void flatMap(String data, Collector<StationLog> output) throws Exception { String[] words = data.split(","); // 基站ID from to 通话时长 callTime output.collect(new StationLog(words[0], words[1],words[2], Long.parseLong(words[3]), Long.parseLong(words[4]))); } }).filter(new FilterFunction<StationLog>() { @Override public boolean filter(StationLog value) throws Exception { return value.getDuration() > 0?true:false; } }).assignTimestampsAndWatermarks(WatermarkStrategy.<StationLog>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner(new SerializableTimestampAssigner<StationLog>() { @Override public long extractTimestamp(StationLog element, long recordTimestamp) { return element.getCallTime(); //指定EventTime对应的字段 } }) ).keyBy(new KeySelector<StationLog, String>(){ @Override public String getKey(StationLog value) throws Exception { return value.getStationID(); //按照基站分组 }} ).timeWindow(Time.seconds(5)) //设置时间窗口 .reduce(new MyReduceFunction(),new MyProcessWindows()).print(); env.execute(); } } //用于如何处理窗口中的数据,即:找到窗口内通话时间最长的记录。 class MyReduceFunction implements ReduceFunction<StationLog> { @Override public StationLog reduce(StationLog value1, StationLog value2) throws Exception { // 找到通话时间最长的通话记录 return value1.getDuration() >= value2.getDuration() ? value1 : value2; } } //窗口处理完成后,输出的结果是什么 class MyProcessWindows extends ProcessWindowFunction<StationLog, String, String, TimeWindow> { @Override public void process(String key, ProcessWindowFunction<StationLog, String, String, TimeWindow>.Context context, Iterable<StationLog> elements, Collector<String> out) throws Exception { StationLog maxLog = elements.iterator().next(); StringBuffer sb = new StringBuffer(); sb.append("窗口范围是:").append(context.window().getStart()).append("----").append(context.window().getEnd()).append("\n");; sb.append("基站ID:").append(maxLog.getStationID()).append("\t") .append("呼叫时间:").append(maxLog.getCallTime()).append("\t") .append("主叫号码:").append(maxLog.getFrom()).append("\t") .append("被叫号码:") .append(maxLog.getTo()).append("\t") .append("通话时长:").append(maxLog.getDuration()).append("\n"); out.collect(sb.toString()); } }
相关推荐
了解Oracle的审计机制,实现对数据库操作的追踪。 6. **备份与恢复**:学习Oracle的备份策略,如全备、增量备和导出/导入。掌握RMAN(恢复管理器)工具的使用,以及如何在数据丢失或系统故障时进行数据恢复。 7. *...
想要好好地学习Oracle数据库的朋友呀,你错过了她就太不值得了。里面有好多的Oracle操作命令可能你都没接触过吧。好了,话不多多说。坚信资料不错!你,值得拥有!OK.还有,之所有要你2分打赏,是我给了你这么好的...
精通JSP编程 作者赵强 编 12-18节
《精通JSP编程》是赵强先生的一部深入解析JSP技术的专业著作,该书针对JSP编程进行了全面且深入的讲解,旨在帮助读者掌握JSP的核心概念和技术,提升Web应用开发能力。根据提供的文件名列表,我们可以推测书籍的章节...
根据提供的文件信息,我们可以推断出这是一份与Java Server Pages (JSP)相关的学习资料介绍,特别是关于赵强编写的《精通JSP编程》这本书的相关信息。下面将基于这个理解来生成相关知识点。 ### 一、JSP基础概念 ...
5. **多表查询与子查询的选择**:当两者都能实现需求时,尽量选择多表查询,因为它只需要对数据库进行一次操作,效率通常更高。 6. **处理NULL值**:在Oracle中,NULL表示无效、未指定或未知的值,它不等于空格或0...
教程名称:Oracle 数据库赵强视频教程【3天】教程目录:【】Oracle安装与管理、SQL语句(赵强)【】Orcale存储过程jdbc与Orcale大文本操作等(赵强)【】SQL简单查询触发器视图(赵强) 资源太大,传百度网盘了,链接在...
5. 安全性:设置用户角色、权限,实现基于角色的访问控制(RBAC),并可以集成LDAP(Lightweight Directory Access Protocol)进行集中身份验证。 总结,"day2013-0110-webLogic配置和集群(赵强)"这个资料包涵盖了...
图结构数据库则是基于图模型,适合处理复杂关系的数据。 MongoDB是一种面向文档的NoSQL数据库,它采用了类似JSON的格式存储数据,提供了丰富的查询功能和文档更新操作。它的特点在于它的无模式设计,这意味着用户...
基于matlab/simulink的矿井低压电缆绝缘参数在线监测的仿真研究,赵强,王彦文,本文叙述了MATLAB/SIULINK的特点,建立了基于MATLAB/SIMULINK仿真技术的矿井低压电缆传输模型,在此基础上实现了一种基于附加低频信号...
数控机床的直线电机伺服系统研究是工业自动化领域中的一个重要课题,它直接...通过Matlab/Simulink仿真模型和改进的数字滑模控制策略,可以更好地实现数控机床的高精度加工,满足工业自动化对先进制造技术的不断追求。
【标题】"java代码-46 赖赵强"所指的可能是一个关于Java编程的项目或示例,由开发者赖赵强创建。在这个项目中,他可能分享了一段特定的Java代码,用于解决某种问题或者实现一个功能。这个标题暗示了这是一个与Java...
磁盘阵列通过RAID技术实现容量和速度的提升,同时提高数据的可用性和安全性。此外,大规模集群存储如Google的存储系统,以及对等存储(P2P)网络,利用分布式存储的优势,提供大规模、低成本的数据服务。 随着硬件...
- 磁流变阻尼器不需要高压激励电源,相比于传统的液压阻尼器或气动系统,其结构简单,易于在实车上实现。 2. 控制系统设计与仿真: - 文中提到了基于模型的控制策略,这意味着控制器设计是建立在对座椅动力学行为...
《LoadRunner性能测试巧匠训练营》是一本深入讲解LoadRunner性能测试的教材,由赵强和邹伟伟两位专家共同编著。该资源提供的是完整版,且无需密码即可解压阅读,对于想要学习和提升LoadRunner性能测试技能的人来说,...