Flink的CoGroup如何使用
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,这篇文章主要介绍"Flink的CoGroup如何使用",在日常操作中,相信很多人在Flink的CoGroup如何使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Fli
千家信息网最后更新 2025年12月01日Flink的CoGroup如何使用
这篇文章主要介绍"Flink的CoGroup如何使用",在日常操作中,相信很多人在Flink的CoGroup如何使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Flink的CoGroup如何使用"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
CoGroup算子:将两个数据流按照key进行group分组,并将数据流按key进行分区的处理,最终合成一个数据流(与join有区别,不管key有没有关联上,最终都会合并成一个数据流)
示例环境
java.version: 1.8.xflink.version: 1.11.1
示例数据源 (项目码云下载)
Flink 系例 之 搭建开发环境与数据
CoGroup.java
package com.flink.examples.functions;import com.flink.examples.DataSource;import com.google.gson.Gson;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.functions.CoGroupFunction;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;import java.time.Duration;import java.util.Arrays;import java.util.List;/** * @Description CoGroup算子:将两个数据流按照key进行group分组,并将数据流按key进行分区的处理,最终合成一个数据流(与join有区别,不管key有没有关联上,最终都会合并成一个数据流) */public class CoGroup { /** * 两个数据流集合,对相同key进行内联,分配到同一个窗口下,合并并打印 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //watermark 自动添加水印调度时间 //env.getConfig().setAutoWatermarkInterval(200); List> tuple3List1 = DataSource.getTuple3ToList(); List> tuple3List2 = Arrays.asList( new Tuple3<>("伍七", "girl", 18), new Tuple3<>("吴八", "man", 30) ); //Datastream 1 DataStream> dataStream1 = env.fromCollection(tuple3List1) //添加水印窗口,如果不添加,则时间窗口会一直等待水印事件时间,不会执行apply .assignTimestampsAndWatermarks(WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner((element, timestamp) -> System.currentTimeMillis())); //Datastream 2 DataStream> dataStream2 = env.fromCollection(tuple3List2) //添加水印窗口,如果不添加,则时间窗口会一直等待水印事件时间,不会执行apply .assignTimestampsAndWatermarks(WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner(new SerializableTimestampAssigner>() { @Override public long extractTimestamp(Tuple3 element, long timestamp) { return System.currentTimeMillis(); } }) ); //对dataStream1和dataStream2两个数据流进行关联,没有关联也保留 //Datastream 3 DataStream newDataStream = dataStream1.coGroup(dataStream2) .where(new KeySelector, String>() { @Override public String getKey(Tuple3 value) throws Exception { return value.f1; } }) .equalTo(t3->t3.f1) .window(TumblingEventTimeWindows.of(Time.seconds(1))) .apply(new CoGroupFunction, Tuple3, String>() { @Override public void coGroup(Iterable> first, Iterable> second, Collector out) throws Exception { StringBuilder sb = new StringBuilder(); Gson gson = new Gson(); //datastream1的数据流集合 for (Tuple3 tuple3 : first) { sb.append(gson.toJson(tuple3)).append("\n"); } //datastream2的数据流集合 for (Tuple3 tuple3 : second) { sb.append(gson.toJson(tuple3)).append("\n"); } out.collect(sb.toString()); } }); newDataStream.print(); env.execute("flink CoGroup job"); }} 打印结果
{"f0":"张三","f1":"man","f2":20}{"f0":"王五","f1":"man","f2":29}{"f0":"吴八","f1":"man","f2":30}{"f0":"吴八","f1":"man","f2":30}{"f0":"李四","f1":"girl","f2":24}{"f0":"刘六","f1":"girl","f2":32}{"f0":"伍七","f1":"girl","f2":18}{"f0":"伍七","f1":"girl","f2":18}到此,关于"Flink的CoGroup如何使用"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!
数据
数据流
时间
水印
两个
关联
学习
事件
更多
环境
示例
算子
并将
并成
分组
处理
帮助
实用
相同
接下来
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
医疗网络安全与数据保护
土豆王国服务器要来到
机关网络安全措施
学软件开发需要掌握的几个点
关系型数据库复杂查询面试题
关注网络安全心得体会
计算机网络技术还要考试吗
数据中心网络安全策略
销售单价数据库设计
怎么保证微信数据库数据同步
云服务器挂载硬盘
新加坡银行网络安全吗
9月11日天津网络安全大赛
连接不上视频监控管理服务器
安卓studio数据库异常
网络安全 网络信任
电力互联网科技部
北京专业软件开发定制
软件仓库数据库
底层软件开发属于哪一专业
中央政法网络安全
云服务器防护设备
公司电脑网络安全管理方案
瑞士软件开发人员数量
软件开发程序规划
承载网络技术支撑传输专业
凤县租房软件开发
网络技术 考点
网络安全宣传进校园心得体会
dos攻击服务器犯法吗