flink中的聚合算子是什么
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这篇文章主要讲解了"flink中的聚合算子是什么",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"flink中的聚合算子是什么"吧!前言flink中的一个
千家信息网最后更新 2025年12月02日flink中的聚合算子是什么
这篇文章主要讲解了"flink中的聚合算子是什么",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"flink中的聚合算子是什么"吧!
前言
flink中的一个接口org.apache.flink.api.common.functions.AggregateFunction,这个类可以接在window流之后,做窗口内的统计计算。
注意:除了这个接口AggregateFunction,flink中还有一个抽象类AggregateFunction:org.apache.flink.table.functions.AggregateFunction,大家不要把这个弄混淆了,接口AggregateFunction我们可以理解为flink中的一个算子,和MapFunction、FlatMapFunction等是同级别的,而抽象类AggregateFunction是用于用户自定义聚合函数的,和max、min之类的函数是同级的。
原理解析
比如我们想实现一个类似sql的功能:
select TUMBLE_START(proctime,INTERVAL '2' SECOND) as starttime,user,count(*) from logs group by user,TUMBLE(proctime,INTERVAL '2' SECOND)
这个sql就是来统计一下每两秒钟的滑动窗口内每个人出现的次数,今天我们就以这个简单的sql的功能为例讲解一下flink的aggregate算子,其实就是我们用程序来实现这个sql的功能。
首先看一下聚合函数的接口:
@PublicEvolving
public interface AggregateFunction extends Function, Serializable {
ACC createAccumulator();
ACC add(IN value, ACC accumulator);
ACC merge(ACC a, ACC b);
OUT getResult(ACC accumulator);
}
这个接口AggregateFunction里面有4个方法,我们分别来讲解一下。
AggregateFunction这个类是一个泛型类,这里面有三个参数,IN, ACC, OUT。IN就是聚合函数的输入类型,ACC是存储中间结果的类型,OUT是聚合函数的输出类型。 createAccumulator
这个方法首先要创建一个累加器,要进行一些初始化的工作,比如我们要进行count计数操作,就要给累加器一个初始值。add
add方法就是我们要做聚合的时候的核心逻辑,比如我们做count累加,其实就是来一个数,然后就加一。
类似上面的sql的逻辑,我们在写业务逻辑的时候,可以这么想,进入这方法数的数据都是属于某一个用户的,系统在调用这个方法之前会先进行hash分组,然后不同的用户会重复调用这个方法。所以这个函数的入参是IN类型,返回值是ACC类型merge
因为flink是一个分布式计算框架,可能计算是分布在很多节点上同时进行的,比如上述的add操作,可能同一个用户在不同的节点上分别调用了add方法在本地节点对本地的数据进行了聚合操作,但是我们要的是整个结果,整个时候,我们就需要把每个用户各个节点上的聚合结果merge一下,整个merge方法就是做这个工作的,所以它的入参和出参的类型都是中间结果类型ACC。getResult
这个方法就是将每个用户最后聚合的结果经过处理之后,按照OUT的类型返回,返回的结果也就是聚合函数的输出结果了。
实例讲解
自定义source
首先我们自定义source生成用户的信息
public static class MySource implements SourceFunction>{
private volatile boolean isRunning = true;
String userids[] = {
"4760858d-2bec-483c-a535-291de04b2247", "67088699-d4f4-43f2-913c-481bff8a2dc5",
"72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
"aabbaa50-72f4-495c-b3a1-70383ee9d6a4", "3218bbb9-5874-4d37-a82d-3e35e52d1702",
"3ebfb9602ac07779||3ebfe9612a007979", "aec20d52-c2eb-4436-b121-c29ad4097f6c",
"e7e896cd939685d7||e7e8e6c1930689d7", "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
};
@Override
public void run(SourceContext> ctx) throws Exception{
while (isRunning){
Thread.sleep(10);
String userid = userids[(int) (Math.random() * (userids.length - 1))];
ctx.collect(Tuple2.of(userid, System.currentTimeMillis()));
}
}
@Override
public void cancel(){
isRunning = false;
}
}
自定义聚合函数
public static class CountAggregate
implements AggregateFunction,Integer,Integer>{
@Override
public Integer createAccumulator(){
return 0;
}
@Override
public Integer add(Tuple2 value, Integer accumulator){
return ++accumulator;
}
@Override
public Integer getResult(Integer accumulator){
return accumulator;
}
@Override
public Integer merge(Integer a, Integer b){
return a + b;
}
}
自定义结果输出函数
/**
* 这个是为了将聚合结果输出
*/
public static class WindowResult
implements WindowFunction,Tuple,TimeWindow>{
@Override
public void apply(
Tuple key,
TimeWindow window,
Iterable input,
Collector> out) throws Exception{
String k = ((Tuple1) key).f0;
long windowStart = window.getStart();
int result = input.iterator().next();
out.collect(Tuple3.of(k, new Date(windowStart), result));
}
}
主流程
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream> dataStream = env.addSource(new MySource());
dataStream.keyBy(0).window(TumblingProcessingTimeWindows.of(Time.seconds(2)))
.aggregate(new CountAggregate(), new WindowResult()
).print();
env.execute();
感谢各位的阅读,以上就是"flink中的聚合算子是什么"的内容了,经过本文的学习后,相信大家对flink中的聚合算子是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
函数
结果
就是
方法
类型
用户
算子
接口
节点
输出
功能
时候
逻辑
学习
不同
内容
同级
数据
累加器
自定
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
多措并举提升网络安全
广州易双网络技术有限公司图片
网络安全osce
魔兽世界服务器公会
大理软件开发专业培训
是构成数据库的基本单元
通达信高级行情服务器地址更新
德国电信网络技术
数据库数据回写进文件
销售管理系统数据库设计
北京软件开发工资一般多
网络安全对于国家重要性
什么叫nds服务器
阿里巴巴传统数据库
寻仙服务器名称
班马网络技术有限公司
长沙市网络安全学习
网络安全总监cio
云端数据库设计入口
sql 数据库项目
网络安全你我他手
数据库数据一致性检验方法
php怎么显示数据库的表
数据库 访问 中间件
软件开发ba是什么
近代农业数据库
法院网络安全演讲稿
上海软件开发生产过程
应聘58的网络安全师王鑫
植入数据库