Flink Aggregate怎么用
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,本篇内容主要讲解"Flink Aggregate怎么用",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Flink Aggregate怎么用"吧!Aggreg
千家信息网最后更新 2025年12月01日Flink Aggregate怎么用
本篇内容主要讲解"Flink Aggregate怎么用",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Flink Aggregate怎么用"吧!
Aggregate算子:提供基于事件窗口进行增量计算的函数。(对输入窗口每个数据流元素递增聚合计算,并将窗口状态与窗口内元素保持在累加器中)
示例环境
java.version: 1.8.xflink.version: 1.11.1
Aggregate.java
import com.flink.examples.DataSource;import org.apache.flink.api.common.accumulators.AverageAccumulator;import org.apache.flink.api.common.functions.AggregateFunction;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.List;/** * @Description Aggregate算子:提供基于事件窗口进行增量计算的函数。(对输入窗口每个数据流元素递增聚合计算,并将窗口状态与窗口内元素保持在累加器中) */public class Aggregate { /** * 遍历集合,分别打印不同性别的总人数与平均值 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //Tuple3<姓名,性别(man男,girl女),年龄> List> tuple3List = DataSource.getTuple3ToList(); DataStream dataStream = env.fromCollection(tuple3List) .keyBy((KeySelector, String>) k -> k.f1) //按数量窗口滚动,每3个输入窗口数据流,计算一次 .countWindow(3) //只能基于Windowed窗口Stream进行调用 .aggregate(new AggregateFunction, MyAverageAccumulator, MyAverageAccumulator>() { /** * 创建新累加器,开始聚合计算 * @return */ @Override public MyAverageAccumulator createAccumulator() { return new MyAverageAccumulator(); } /** * 将窗口输入的数据流值添加到窗口累加器,并返回新的累加器值 * @param tuple3 * @param accumulator * @return */ @Override public MyAverageAccumulator add(Tuple3 tuple3, MyAverageAccumulator accumulator) { System.out.println("tuple3:" + tuple3.toString()); accumulator.setGender(tuple3.f1); //此accumulator保含个数统计和值累计两个属性,add方法内会计算窗口内总数与求和 accumulator.add(tuple3.f2); return accumulator; } /** * 获取累加器聚合结果 * @param accumulator * @return */ @Override public MyAverageAccumulator getResult(MyAverageAccumulator accumulator) { return accumulator; } /** * 合并两个累加器,返回合并后的累加器的状态 * @param a * @param b * @return */ @Override public MyAverageAccumulator merge(MyAverageAccumulator a, MyAverageAccumulator b) { a.merge(b); return a; } }); dataStream.print(); env.execute("flink Filter job"); } /** * 添加性别属性(此类用于显示不同性别的平均值) */ public static class MyAverageAccumulator extends AverageAccumulator{ private String gender; public String getGender() { return gender; } public void setGender(String gender) { this.gender = gender; } @Override public String toString() { //继承父类的this.getLocalValue()方法用于计算并返回平均值 return super.toString() + ", gender to " + gender; } }} 打印结果
tuple3:(张三,man,20)tuple3:(李四,girl,24)tuple3:(刘六,girl,32)tuple3:(王五,man,29)tuple3:(伍七,girl,18)tuple3:(吴八,man,30)4> AverageAccumulator 24.666666666666668 for 3 elements, gender to girl2> AverageAccumulator 26.333333333333332 for 3 elements, gender to man
到此,相信大家对"Flink Aggregate怎么用"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
累加器
元素
数据
数据流
输入
平均值
性别
方法
状态
不同
两个
事件
内容
函数
增量
属性
算子
结果
并将
学习
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库组件基于什么语言
软件开发有关费用
中外文数据库中
军队网络安全创意视频
南岸计算机网络技术职业学校
软件开发模型发展前景
服务器屏幕出现方格
泰安网络安全应急支撑单位
怎么将文件上传到数据库
软件开发行业介绍与就业
广东常规软件开发市价
服务器标签
图片数据库解决的问题
计算机通信网络技术书籍
链家成交 数据库下载
为什么视频连接不上服务器
panabit缓存服务器
服务器开机后显示桌面
安全狗云服务器
ps4原神什么服务器
翻译外文的数据库
中国邮政总公司软件开发中心
数据库中存在数据冗余吗
韩国女vs日本女数据库
网络安全到底是什么
mysql数据库服务端链接配置
360服务器安全检测
服务器excel免费版
有哪些数据库工具
python 买服务器