Flink Join怎么使用
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,这篇文章主要讲解了"Flink Join怎么使用",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flink Join怎么使用"吧!Join算子:两个数据
千家信息网最后更新 2025年12月03日Flink Join怎么使用
这篇文章主要讲解了"Flink Join怎么使用",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flink Join怎么使用"吧!
Join算子:两个数据流通过内部相同的key分区,将窗口内两个数据流相同key数据元素计算后,合并输出(类似于mysql表的inner join操作)
示例环境
java.version: 1.8.xflink.version: 1.11.1
示例数据源 (项目码云下载)
Flink 系例 之 搭建开发环境与数据
Join.java
package com.flink.examples.functions;import com.flink.examples.DataSource;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.functions.FlatJoinFunction;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;import java.time.Duration;import java.util.Arrays;import java.util.List;/** * @Description Join算子:两个数据流通过内部相同的key分区,将窗口内两个数据流相同key数据元素计算后,合并输出(类似于mysql表的inner join操作) */public class Join { /** * Flink支持了两种Join:Window Join(窗口连接)和Interval Join(时间间隔连接),本示例演示的为Window Join * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/operators/joining.html */ /** * 两个数据流集合,对相同key进行内联,分配到同一个窗口下,合并并打印 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// //watermark 自动添加水印调度时间// env.getConfig().setAutoWatermarkInterval(200); List> tuple3List1 = DataSource.getTuple3ToList(); List> tuple3List2 = Arrays.asList( new Tuple3<>("伍七", "girl", 18), new Tuple3<>("吴八", "man", 30) ); //Datastream 1 DataStream> dataStream1 = env.fromCollection(tuple3List1) //添加水印窗口,如果不添加,则时间窗口会一直等待水印事件时间,不会执行apply .assignTimestampsAndWatermarks(WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner((element, timestamp)->System.currentTimeMillis())); //Datastream 2 DataStream> dataStream2 = env.fromCollection(tuple3List2) //添加水印窗口,如果不添加,则时间窗口会一直等待水印事件时间,不会执行apply .assignTimestampsAndWatermarks(WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner(new SerializableTimestampAssigner>() { @Override public long extractTimestamp(Tuple3 element, long timestamp) { return System.currentTimeMillis(); } })); //Datastream 3 DataStream newDataStream = dataStream1.join(dataStream2) .where(new KeySelector, String>() { @Override public String getKey(Tuple3 value) throws Exception { System.out.println("first name:" + value.f0 + ",sex:" + value.f1); return value.f1; } }) .equalTo(new KeySelector, String>() { @Override public String getKey(Tuple3 value) throws Exception { System.out.println("second name:" + value.f0 + ",sex:" + value.f1); return value.f1; } }) .window(TumblingEventTimeWindows.of(Time.seconds(1)) .apply(new FlatJoinFunction, Tuple3, String>() { @Override public void join(Tuple3 first, Tuple3 second, Collector out) throws Exception { out.collect(first.f0 + "|" + first.f1 + "|" + first.f2 + "|" + second.f0 + "|" + second.f1 + "|" + second.f2); } }) ; newDataStream.print(); env.execute("flink Join job"); }} 打印结果
4> 李四|girl|24|伍七|girl|184> 刘六|girl|32|伍七|girl|184> 伍七|girl|18|伍七|girl|182> 张三|man|20|吴八|man|302> 王五|man|29|吴八|man|302> 吴八|man|30|吴八|man|30
感谢各位的阅读,以上就是"Flink Join怎么使用"的内容了,经过本文的学习后,相信大家对Flink Join怎么使用这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
数据
时间
相同
两个
数据流
水印
示例
学习
事件
元素
内容
环境
算子
输出
官方
就是
思路
情况
数据源
文档
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发的面试题笔试
珠海pc软件开发市场价
10万用户 服务器
读研期间学软件开发
服务器存储网络设备的原理
科技互联网技术论文
opendns服务器
a新打开页面怎么展示数据库
软件开发流程和设计模式
动态心电图 数据库
网络安全知识主题班会会议记录
学生如何防范网络安全诈骗图片
数据库怎么查建表时间查询
云平台的网络安全防护
将自己的云服务器作为图片
平板电脑装阿里云服务器流程
华为软件开发云搭建教程
济南互联网络科技有限公司
网络技术处理员考试
软件开发公司找项目
ibm服务器保修
湖北信息化软件开发价钱
济南信息安全技术提升网络安全
网络安全尽职免责
云锁服务器高级防护
大厂 服务器系统
网络安全5分钟的自我介绍
量化交易软件开发实战
日常网络安全预警机制
云服务器公用安全性保障