Flink的SessionWindow怎么用
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,这篇文章主要讲解了"Flink的SessionWindow怎么用",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flink的SessionWindow怎
千家信息网最后更新 2025年12月01日Flink的SessionWindow怎么用
这篇文章主要讲解了"Flink的SessionWindow怎么用",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flink的SessionWindow怎么用"吧!
sessionWindows会话窗口:按不活跃时间切成不同分区窗口,并进行窗口计算
示例环境
java.version: 1.8.xflink.version: 1.11.1
示例数据源 (项目码云下载)
Flink 系例 之 搭建开发环境与数据
SessionWindow.java
import com.flink.examples.DataSource;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;import org.apache.flink.streaming.api.windowing.time.Time;import java.util.List;/** * @Description sessionWindows会话窗口:按不活跃时间切成不同分区窗口,并进行窗口计算 */public class SessionWindow { /** * 遍历集合,返回会话滑动窗口下按不活跃时间切分后的,每个窗口下性别分区里最大年龄数据记录 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置流处理时间事件,对于会话窗口必需设置此时间类型,有三种类型: //1.ProcessingTime:以operator处理的时间为准,它使用的是机器的系统时间来作为data stream的时间 //2.IngestionTime:以数据进入flink streaming data flow的时间为准 //3.EventTime:以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段 env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.setParallelism(4); DataStream> inStream = env.addSource(new MyRichSourceFunction()); DataStream> dataStream = inStream.keyBy((KeySelector, String>) k ->k.f1) //按会话窗口滚动,当2秒之内没有指定分区数据流,则计算一次 //会话窗口是根据在指定时间之后没有活跃的数据接入,则认为窗口结束,进行窗口计算 .window(EventTimeSessionWindows.withGap(Time.seconds(2))) .reduce(new ReduceFunction>() { @Override public Tuple3 reduce(Tuple3 t1, Tuple3 t2) throws Exception { //返回年龄最大的 return t1.f2 > t2.f2 ? t1: t2; } }); dataStream.print(); env.execute("flink EventTimeSessionWindows job"); } /** * 模拟数据持续输出 */ public static class MyRichSourceFunction extends RichParallelSourceFunction> { @Override public void run(SourceContext> ctx) throws Exception { List> tuple3List = DataSource.getTuple3ToList(); for (Tuple3 tuple3 : tuple3List){ ctx.collect(tuple3); //1秒钟输出一个 Thread.sleep(2 * 1000); } } @Override public void cancel() { try{ super.close(); }catch (Exception e){ e.printStackTrace(); } } }} 打印结果
2> (张三,man,20)4> (李四,girl,24)2> (王五,man,29)4> (刘六,girl,32)2> (吴八,man,30)
感谢各位的阅读,以上就是"Flink的SessionWindow怎么用"的内容了,经过本文的学习后,相信大家对Flink的SessionWindow怎么用这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
时间
数据
学习
不同
最大
内容
字段
年龄
环境
示例
类型
切成
处理
输出
事件
就是
应用程序
思路
性别
情况
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
金融科技和金融互联网的区别
flash服务器
服务器代维服务方案
软件开发公司成本怎么核算
泉州bim软件开发工程
2022永久免费的服务器
数据库由什莫组成
ibm服务器死机
胶州安卓软件开发服务公司
怀旧服tbc服务器排行
阿里云海外服务器
苏州管理软件开发机构
湖南同路互联网科技有限公司
网络安全与技术应用
将txt数据导入数据库中
网络安全宣传周云端展
mcpe生存服务器
苹果服务器连接不上付不了款
网络安全与信息化建设的内容
导数据库的数据
网络安全密钥是什么东西啊
安徽本地网络安全公司
网络安全包括纵向加密装置吗
网络安全组织领导构架
计算机网络技术第一版
软件开发的财务估算
苹果备忘录连接服务器
试论网络道德与网络安全(论文)
武汉大学网络安全硕士难度
跟网络安全相关的英文论文