Flink中Watermarks怎么用
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,这篇文章将为大家详细讲解有关Flink中Watermarks怎么用,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。Watermarks水印:为输入的数据流的设置一个时
千家信息网最后更新 2025年12月03日Flink中Watermarks怎么用
这篇文章将为大家详细讲解有关Flink中Watermarks怎么用,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
Watermarks水印:为输入的数据流的设置一个时间事件(时间戳),对窗口内的数据输入流无序与延迟提供解决方案
示例环境
java.version: 1.8.xflink.version: 1.11.1
TimestampsAndWatermarks.java
import com.flink.examples.DataSource;import org.apache.commons.lang3.time.DateFormatUtils;import org.apache.flink.api.common.eventtime.*;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import org.apache.flink.streaming.api.functions.windowing.WindowFunction;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.util.Collector;import java.util.Date;import java.util.Iterator;import java.util.List;/** * @Description Watermarks水印:为输入的数据流的设置一个时间事件(时间戳),对窗口内的数据输入流无序与延迟提供解决方案 */public class TimestampsAndWatermarks { /** * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html */ /** * 遍历集合,分别打印不同性别的信息,对于执行超时,自动触发定时器 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* TimeCharacteristic有三种时间类型: ProcessingTime:以operator处理的时间为准,它使用的是机器的系统时间来作为data stream的时间; IngestionTime:以数据进入flink streaming data flow的时间为准; EventTime:以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段;需要实现assignTimestampsAndWatermarks方法,并设置时间水位线; */ //使用event time,需要指定事件的时间戳 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); //设置自动生成水印的时间周期,避免数据流量大的情况下,频繁添加水印导致计算性能降低。 env.getConfig().setAutoWatermarkInterval(1000L); List> tuple3List = DataSource.getTuple3ToList(); DataStream> inStream = env.addSource(new MyRichSourceFunction()); DataStream> dataStream = inStream //为一个水位线,这个Watermarks在不断的变化,一旦Watermarks大于了某个window的end_time,就会触发此window的计算,Watermarks就是用来触发window计算的。 //Duration.ofSeconds(2),到数据流到达flink后,再水位线中设置延迟时间,也就是在所有数据流的最大的事件时间比window窗口结束时间大或相等时,再延迟多久触发window窗口结束;// .assignTimestampsAndWatermarks(// WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(2))// .withTimestampAssigner((element, timestamp) -> {// long times = System.currentTimeMillis() ;// System.out.println(element.f1 + ","+ element.f0 + "的水位线为:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));// return times;// })// ) .assignTimestampsAndWatermarks(new MyWatermarkStrategy() .withTimestampAssigner(new SerializableTimestampAssigner>() { @Override public long extractTimestamp(Tuple3 element, long timestamp) { long times = System.currentTimeMillis(); System.out.println(element.f1 + "," + element.f0 + "的水位线为:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss")); return times; } })) //分区窗口 .keyBy((KeySelector, String>) k -> k.f1) //触发3s滚动窗口 .window(TumblingEventTimeWindows.of(Time.seconds(3))) //执行窗口数据,对keyBy数据流批量处理 .apply(new WindowFunction, Tuple2, String, TimeWindow>(){ @Override public void apply(String s, TimeWindow window, Iterable> input, Collector> out) throws Exception { long times = System.currentTimeMillis() ; System.out.println(); System.out.println("窗口处理时间:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss")); Iterator> iterator = input.iterator(); int total = 0; int size = 0; String sex = ""; while (iterator.hasNext()){ Tuple3 tuple3 = iterator.next(); total += tuple3.f2; size ++; sex = tuple3.f1; } out.collect(new Tuple2<>(sex, total / size)); } }); dataStream.print(); env.execute("flink Filter job"); } /** * 定期水印生成器 */ public static class MyWatermarkStrategy implements WatermarkStrategy>{ @Override public WatermarkGenerator> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator>() { //设置固定的延迟量3.5 seconds private final long maxOutOfOrderness = 3500; private long currentMaxTimestamp; /** * 事件处理 * @param event 数据流对象 * @param eventTimestamp 事件水位线时间 * @param output 输出 */ @Override public void onEvent(Tuple3 event, long eventTimestamp, WatermarkOutput output) { currentMaxTimestamp = Math.max(System.currentTimeMillis(), eventTimestamp); } @Override public void onPeriodicEmit(WatermarkOutput output) { // 拿上一个水印时间 - 延迟量 = 等于给的窗口最终数据最后时间(如果在窗口到期内,未发生新的水印事件,则按window正常结束时间计算,当在最后水印时间-延迟量的时间范围内,有新的数据流进入,则会重新触发窗口内对全部数据流计算) output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1)); } }; } } /** * 模拟数据持续输出 */ public static class MyRichSourceFunction extends RichSourceFunction> { @Override public void run(SourceContext> ctx) throws Exception { List> tuple3List = DataSource.getTuple3ToList(); int j = 0; for (int i=0;i<100;i++){ if (i%6 == 0){ j=0; } ctx.collect(tuple3List.get(j)); //1秒钟输出一个 Thread.sleep(1 * 1000); j ++; } } @Override public void cancel() { try{ super.close(); }catch (Exception e){ e.printStackTrace(); } } }} 打印结果
man,张三的水位线为:2020-12-27 10:28:20girl,李四的水位线为:2020-12-27 10:28:21man,王五的水位线为:2020-12-27 10:28:22girl,刘六的水位线为:2020-12-27 10:28:23girl,伍七的水位线为:2020-12-27 10:28:24窗口处理时间:2020-12-27 10:28:25(man,20)man,吴八的水位线为:2020-12-27 10:28:25man,张三的水位线为:2020-12-27 10:28:26girl,李四的水位线为:2020-12-27 10:28:27窗口处理时间:2020-12-27 10:28:28(girl,28)窗口处理时间:2020-12-27 10:28:28(man,29)
关于"Flink中Watermarks怎么用"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
时间
数据
水位
水位线
数据流
水印
事件
处理
延迟
输入
篇文章
输出
字段
方案
更多
解决方案
张三
李四
生成
不同
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
挂端服务器
运维部网络安全措施
数据库求反
流放者柯南清除服务器设置
主流数据库论文检索
搭建云端奥维企业服务器
财源阁网络技术有限公司
我的世界都有大陆服务器下载地址
电影服务器租用
软件开发过程有哪些信息需求
广元gpu云服务器生产厂家
中兴深圳软件开发怎么样
你的世界服务器透视
北京读我网络技术有限公司官网
网络安全法实施时间颁布号
数据库管理员最基本
北京乐游畅想软件开发公司
100台服务器需要几个机柜
安卓软件开发应用
分区表修复软件开发
网络安全为人民靠人民的画
应用网络安全论坛
中国计算机网络安全管理
2019网络安全论坛 郭
车载导航软件开发介绍
数据库的安全
网信办网络安全审查办
服务器远程连接输入什么
软件开发的标准有哪些问题
服务器wordpress教程