千家信息网

Flink watermark

发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,Flink中watermark主要解决保序问题. 而保序问题的根本原因是多个任务同时从流中并行处理数据,顺序无法保证.上游: 生成watermark一般在WINDOW 操作之前生成WATERMARK,
千家信息网最后更新 2025年12月01日Flink watermark

Flink中watermark主要解决保序问题. 而保序问题的根本原因是多个任务同时从流中并行处理数据,顺序无法保证.

上游: 生成watermark
一般在WINDOW 操作之前生成WATERMARK, WATERMARK 有两种:
AssignWithPeriodicWatermarks:
每隔N秒自动向流里注入一个WATERMARK 时间间隔由ExecutionConfig.setAutoWatermarkInterval 决定. 每次调用getCurrentWatermark 方法, 如果得到的WATERMARK 不为空并且比之前的大就注入流中 (emitWatermark)
参考 TimestampsAndPeriodicWatermarksOperator.processElement

AssignWithPunctuatedWatermarks:
基于事件向流里注入一个WATERMARK,每一个元素都有机会判断是否生成一个WATERMARK. 如果得到的WATERMARK 不为空并且比之前的大就注入流中 (emitWatermark)
参考 TimestampsAndPunctuatedWatermarksOperator.processElement

每次生成WATERMARK将覆盖流中已有的WATERMARK

下游: 处理watermark
StatusWatermarkValve 负责将不同Channel 的Watermark 对齐,再传给pipeline 下游,对齐的概念是当前Channel的Watermark时间大于所有Channel最小的Watermark时间

WindowOperator 的处理:
WindowOperator.processElement

  1. WindowAssigner.assignWindows 为当前的消息分配滑动窗口
    常用的有: TumblingEventTimeWindows: 按照消息的 EventTime 分配窗口 (每次生成单个窗口)
    TumblingProcessingTimeWindows 按照当前的时间分配窗口 (每次生成单个窗口)
    需要配合StreamExecutionEnvironment.setStreamTimeCharacteristic 使用 (默认是TimeCharacteristic.ProcessingTime), 这个必须匹配
    否则无法正常触发滑动窗口

实际观察结果:

  • 如果使用ProcessingTimeWindows 即使Event 本身的时间落后于窗口时间很多也会被触发
  • 无论是否使用WATERMARK,窗口中的数据会有乱序,即后到窗口中的数据早于先到窗口中的数据
  • 如果使用EventTimeWindow, 数据和窗口时间对齐不会乱序,同一窗口中的数据不能严格保证顺序,需要SORT.
  • 最后一批数据有缺失,缺失的数据取决于WATERMARK的MAXOUTOFORDERNESS
  • 默认的WATERMARK算法是根据元素的最大时间决定的,当没有新的元素进入流中的时候,水位不再上涨,再减去MAXOUTOFORDERNESS, 则最后一批数据无法落在水位之下,导致WINDOW无法触发
  1. 将当前的滑动窗口和对象加入WindowState, 根据不同的应用场景会使用不同的WindowState. WindowState 的类型由WindowedStream的具体操作决定, 生成对应的StateDescriptor, 不同的WindowState 的 add/get 行为会不同. 比如HeapListWindowState 会把当前的对象追加到currentNamespace (即Timewindow) 对应的List 下. 比如HeapAggregateState 会对当前的对象应用Aggregate function 并更新结果

Window 触发的条件
在 WindowOperator 中有两个点会检查窗口是否触发,两者的检查条件有所不同

  1. processElement 这是在新的流数据进入时触发
    检查条件: watermark时间 >= 窗口最大时间 参见 EventTimeTrigger.onElement
    如果窗口不能被触发则调用InteralTimeService.registerEventTimeTimer 注册一个定时器,以KEY+窗口最大时间为条件触发, 到一定时间后定时器会被自动销毁. 时间为窗口最大时间+WindowOperator.allowedLateness WindowOperator.allowedLateness 可以通过 Stream.window(...).allowedLateness(...) 设置. 一般应该略大于WatermarkGenerator 的 maxOutOfOrderness

  2. onEventTime 或者 onProcessingTime 取决于Watermark的类型, 这是在Watermark更新的时候触发 (InteralTimeService.advanceWatermark). 这时会把当前Watermark 的时间和之前注册的定时器的时间做比较, 如果定时器还存在并且Watermark的时间大于定时器时间则可以触发窗口. 参见 EventTimeTrigger.onEventTime

    参考 http://blog.csdn.net/lmalds/article/details/52704170

WATERMARK和普通数据分开处理
如果一个元素来的过晚 element.getTimestamp + allowedLateness < currentWatermark
会有一个特殊的OutputTag 和正常的流数据区分开
参考 https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/side_output.html

如果窗口来的过晚, window.maxTimestamp + allowedLateness < currentWatermark, 则窗口会被直接丢弃

Watermark 的问题:
默认的Watermark机制是数据驱动的,新的数据进入才会触发水位上升, 而由于maxOutOfOrderness 的存在, watermark < 最大流数据时间 < 当前窗口结束时间
根据之前的分析,最新的时间窗口总是不会被触发,除非更新的数据进入再次提高水位到当前窗口结束时间以后, 如果数据进入的频率低或者没有新的数据进入流,那最新的时间窗口被处理的延时会非常高甚至永远不会被触发,这在实时性要求高的流式系统是很致命的. 比如一个银行系统,要做客户账号层面的保序,每个账号的交易可能一天只有几笔甚至一笔,如果我们在Window 处理的时候KEY BY 账号就会引起上述问题. 我们可以考虑KEY BY的条件改为 HASH(账号) 再取模,然后在窗口处理中再次根据账号分组,这样虽然处理复杂一些,但是保证了窗口中数据的频次

另外一种方案是优化WATERMARK生成的机制,如果一段时间后WATERMARK仍然没有变化,那就将WATERMARK自动上涨一次到当前窗口的结束时间,这样保证窗口处理的延时有个上限

public abstract class AbstractWatermarkGenerator implements AssignerWithPeriodicWatermarks {    private static final long serialVersionUID = -2006930231735705083L;    private static final Logger logger = LoggerFactory.getLogger(AbstractWatermarkGenerator.class);    private final long maxOutOfOrderness; // 10 seconds    private long windowSize;    private long currentMaxTimestamp;    private long lastTimestamp = 0;    private long lastWatermarkChangeTime = 0;    private long windowPurgeTime = 0;    public AbstractWatermarkGenerator(long maxOutOfOrderness, long windowSize) {        this.maxOutOfOrderness = maxOutOfOrderness;        this.windowSize = windowSize;    }    public AbstractWatermarkGenerator() {        this(10000, 10000);    }    protected abstract long extractCurTimestamp(T element) throws Exception;    public long extractTimestamp(T element,            long previousElementTimestamp) {        try {            long curTimestamp = extractCurTimestamp(element);            lastWatermarkChangeTime = new Date().getTime();            currentMaxTimestamp = Math.max(curTimestamp, currentMaxTimestamp);            windowPurgeTime = Math.max(windowPurgeTime, getWindowExpireTime(currentMaxTimestamp));            if (logger.isDebugEnabled()) {                logger.debug("Extracting timestamp: {}", currentMaxTimestamp);            }            return curTimestamp;        } catch (Exception e) {            logger.error("Error extracting timestamp", e);              }        return 0;    }    protected long getWindowExpireTime(long currentMaxTimestamp) {        long windowStart = TimeWindow.getWindowStartWithOffset(currentMaxTimestamp, 0, windowSize);        long windowEnd = windowStart + windowSize;        return windowEnd + maxOutOfOrderness;    }    public Watermark getCurrentWatermark() {        long curTime = new Date().getTime();        if (currentMaxTimestamp > lastTimestamp) {            if (logger.isDebugEnabled()) {                logger.debug("Current max timestamp has been increased since last");            }            lastTimestamp = currentMaxTimestamp;            lastWatermarkChangeTime = curTime;        }        else {            long diff = windowPurgeTime - currentMaxTimestamp;            if (diff > 0 && curTime - lastWatermarkChangeTime > diff) {                if (logger.isDebugEnabled()) {                    logger.debug("Increase current MaxTimestamp once");                }                currentMaxTimestamp = windowPurgeTime;                lastTimestamp = currentMaxTimestamp;                lastWatermarkChangeTime = curTime;            }        }        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);    }}

实际测试中发现 WATERMARK是否触发和算子的并发度和WATERMARK生成的位置有关
测试结果如下:

  • Env default parallism 10: Source parallism 20, window parallism 6, watermark 生成定义在keyby 之前
    Source 为单独的SUBTASK 并发度为20, 之后到WINDOW算子之前合成一个SUBTASK,并发度为10, WINDOW SUBTASK 并发度为6, 窗口可以正常触发
  • Env default parallism 20, Source parallism 20, window parallism 6, watermark 生成定义在keyby 之前
    Source 到 WINDOW 算子之前 合成一个SUBTASK,并发度为20, WINDOW SUBTASK 并发度为6, 窗口可以正常触发
  • Env default parallism 60, Source parallism 20, window parallism 10, watermark 生成定义在keyby 之前
    Source 为单独的SUBTASK 并发度为20, 之后到WINDOW算子之前合成一个SUBTASK,并发度为60,WINDOW SUBTASK 并发度为10, 窗口不能正常触发 (个人理解原因是算子并发度扩大,导致一些CHANNEL处理线程没有数据,根据上文的解释,WATERMARK对齐会取所有CHANNEL最小的WATERMARK,导致水位无法上涨
    可以从FLINK CONSOLE的WATERMARKS看出)
  • Env default parallism 60, Source parallism 20, window parallism 10, watermark 生成定义在Source之后
    Source 为单独的SUBTASK 并发度为20, 之后到WINDOW算子之前合成一个SUBTASK,并发度为60,WINDOW SUBTASK 并发度为10, 窗口可以正常触发
  • Env default parallism 10, Source parallism 20, window parallism 20, watermark 生成定义在keyby 之前
    Source 为单独的SUBTASK 并发度为20, 之后到WINDOW算子之前合成一个SUBTASK,并发度为10, WINDOW SUBTASK 并发度为20, 窗口可以正常触发
  • Env default parallism 30, Source parallism 20, window parallism 20, watermark 生成定义在keyby 之前
    Source 为单独的SUBTASK 并发度为20, 之后到WINDOW算子之前合成一个SUBTASK,并发度为30, WINDOW SUBTASK 并发度为20, 窗口不能正常触发
  • Env default parallism 30, Source parallism 20, window parallism 20, watermark 生成定义在keyby 之前
    Source 为单独的SUBTASK 并发度为20, 之后到WINDOW算子之前合成一个SUBTASK,并发度为30, WINDOW SUBTASK 并发度为20, 窗口不能正常触发
  • Env default parallism 30, Source parallism 20, window parallism 20, watermark 生成定义在Source 之后
    Source 为单独的SUBTASK 并发度为20, 之后到WINDOW算子之前合成一个SUBTASK,并发度为30, WINDOW SUBTASK 并发度为20, 窗口可以正常触发

所以注意WINDOW算子之前最好避免让下游算子的并发度超过上游算子,否则就把WATERMARK的生成尽量放到DAG的前端,这样WATERMARK可以被传递到下游算子

时间 数据 生成 算子 处理 不同 最大 定时器 条件 水位 账号 元素 问题 保证 参考 对象 时候 结果 分配 更新 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 bms软件开发工程师薪酬 山西安恒网络安全 数据库原理与应用分析设计题 服务器机箱开机键盘灯亮但不显示 外包软件开发安全协议范本 怎么建立管理邮箱的数据库 本体模型 大数据库 网络安全培训学校学习计划 一路通交通事故管理系统服务器 成都电脑软件开发费用是多少 制作网络安全口号和标语 湖北智能套料软件开发商 虚拟服务器中的服务器管理 安徽跃讯网络技术 李胜是某计算机软件开发公司 信息技术网络安全 mac 终端访问数据库 部队网络安全文字板报 中国网络技术发展史 为什么一直连接服务器登录不了 苹果取消激活锁无法联系服务器 钢结构软件开发 中信建设软件开发 职工网络安全意识教育文案 csv导入数据库大量数据错误 具体的计算机网络技术学习计划 制作网络安全口号和标语 超激斗梦境打开没有服务器 河南华为服务器虚拟化部署 网络安全响应承诺书
0