千家信息网

flink的时间及时区问题怎样解决

发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,flink的时间及时区问题怎样解决,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。1.时间纪元所谓的"时间纪元"就是1970年1月1日0时
千家信息网最后更新 2025年12月01日flink的时间及时区问题怎样解决

flink的时间及时区问题怎样解决,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

1.时间纪元

所谓的"时间纪元"就是1970年1月1日0时0分0秒,指的是开始的时间。比如Java类代码:

Date date = new Date(0);

System.out.println(date);

打印出来的结果:

Thu Jan 01 08:00:00 CST 1970

也是1970年1月1日,实际上时分秒是0点0分0秒,这里打印出来的时间是8点而非0点,原因是存在系统时间和本地时间的问题,其实系统时间依然是0点,只不过我们的电脑时区设置为东8区,故打印的结果是8点。

只需要将时区设置为GMT+0,即可打印出0点0分0秒

System.setProperty("user.timezone","GMT+0");

实际上时区问题都是在此时间纪元基础上加/减一定的offset。

2.Flink时间

说java纪元跟本文将的flink时间问题有啥关系呢?

Flink在使用时间的这个概念的时候就是基于时间纪元这个概念的。比如首先,我们的时区是东八区,在我们的视野中UTC-0时间应该加8小时的offset,才是我们看到的时间,所以在使用flink的窗口的时候往往比我们当前的时间少8小时。

还有flink的窗口对其,也是基于纪元时间的。比如下面的有三个窗口函数的例子

1).5min滚动窗口

14:16:391启动的窗口,滚动窗口时间是5min,会发现并不是等待五分钟之后才有结果输出,而是到了14:20:00.0的时候就直接输出结果了。

2).30min滚动窗口

14:27:11启动的滚动窗口,是在14:30:00的时候就直接输出了,而不是等待半小时。

3).1hour滚动窗口

15:54:48启动的一小时的滚动窗口,输出时间是16点整。

时间上差了八小时,但是对齐是基于时间纪元的整数单位。

3.解决差八小时问题

实际在使用的时候flink输出的时差很令人反感,但是没办法flink目前不支持配置时区,但是blink支持,等待着合并吧。

其实,时区问题解决方案比较多吧,要想不伤筋动骨,主要介绍以下三种:

  1. flink端不做处理。也即是在读取数据的时候加上8小时的offset。

  2. 使用udf等算子给时间戳加上8小时的offset。

  3. sink内部做处理。

1).Udf实现

sink端处理

import org.apache.flink.table.functions.ScalarFunction;
import java.sql.Timestamp;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.TimeZone;
public class UTC2Local extends ScalarFunction { public Timestamp eval(Timestamp s) { long timestamp = s.getTime() + 28800000; return new Timestamp(timestamp); }
}

注册udf

  tEnv.registerFunction("utc2local",new UTC2Local());

使用udf

  Table table1 = tEnv.sqlQuery("select count(number),utc2local(TUMBLE_END(proctime, INTERVAL '1' HOUR)) from res group by TUMBLE(proctime, INTERVAL '1' HOUR)");

2). sink内部支持

sink端的实现也比较简单,主要是判断输出字段类型,然后加上8小时offset即可。可以参考blink的printtablesink的实现。

  override def invoke(in: JTuple2[JBool, Row]): Unit = {    val sb = new StringBuilder    val row = in.f1    for (i <- 0 to row.getArity - 1) {      if (i > 0) sb.append(",")      val f = row.getField(i)      if (f.isInstanceOf[Date]) {        sb.append(DateTimeFunctions.dateFormat(f.asInstanceOf[JDate].getTime, "yyyy-MM-dd", tz))      } else if (f.isInstanceOf[Time]) {        sb.append(DateTimeFunctions.dateFormat(f.asInstanceOf[JDate].getTime, "HH:mm:ss", tz))      } else if (f.isInstanceOf1654075720) {        sb.append(DateTimeFunctions.dateFormat(f.asInstanceOf[JDate].getTime,          "yyyy-MM-dd HH:mm:ss.SSS", tz))      } else {        sb.append(StringUtils.arrayAwareToString(f))      }    }
if (in.f0) { System.out.println(prefix + "(+)" + sb.toString()) } else { System.out.println(prefix + "(-)" + sb.toString()) } }

看完上述内容,你们掌握flink的时间及时区问题怎样解决的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!

时间 问题 小时 时区 纪元 时候 输出 结果 实际 是在 处理 支持 内容 原因 就是 方法 更多 概念 系统 伤筋动骨 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 药品广告数据库明兴清开灵 物流软件开发要什么专业的 2014数据库 sql数据库是否一直写硬盘 民航 杰普逊 数据库 oracle数据库 索引 湖南执梦者软件开发有限公司 服务器阵列卡6g跟12g区别 易语言标签如何显示数据库数据 光盘文件无法读取数据库 关于校园网络安全整改报告 商城系统数据库er图 时事新闻课网络安全小知识ppt 沈海高速有多少个服务器 服务器ip访问记录 广电网络技术员考试内容 半神下载软件开发 基于人工智能的网络安全对抗 一码通和服务器有什么区别 互联网的服务器在哪里 数据库中的时间验证规则怎么做 邮箱怎么填写服务器 合肥邮储软件开发中心是国企 怎么删除阿里旺旺服务器的消息 我的世界侏罗纪公园之双人服务器 电子书阅读软件开发方案 计算机实验室里的服务器 长沙软件开发招聘ios 什么情况下玩游戏需要服务器 原神国际服服务器字母
0