Flink中Transform怎么用
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,小编给大家分享一下Flink中Transform怎么用,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!分组聚合 Strin
千家信息网最后更新 2025年12月02日Flink中Transform怎么用
小编给大家分享一下Flink中Transform怎么用,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
分组聚合
String path = "E:\\GIT\\flink-learn\\flink-learn\\telemetering.txt"; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); TupleTypeInfo> typeInfo = new TupleTypeInfo<>(Types.STRING, Types.DOUBLE, Types.LONG); TupleCsvInputFormat > tupleCsvInputFormat = new TupleCsvInputFormat<>(new Path(path), typeInfo); DataStreamSource > dataStreamSource = env.createInput(tupleCsvInputFormat, typeInfo); //或 DataStreamSource > dataStreamSource = env.readFile(tupleCsvInputFormat, path); SingleOutputStreamOperator > operator = dataStreamSource .filter(Objects::nonNull)// .map()// .flatMap()// .keyBy(0) .keyBy(tuple -> tuple.f0) .minBy(1);// .min()// .max(1);// .maxBy(1, false);// .sum(1);// .reduce();// .process(); operator.print().setParallelism(1); env.execute();
分流/合流
String path = "E:\\GIT\\flink-learn\\flink-learn\\telemetering.txt"; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); PojoTypeInfotypeInfo = (PojoTypeInfo ) Types.POJO(TelemeterDTO.class); PojoCsvInputFormat inputFormat = new PojoCsvInputFormat<>(new Path(path), typeInfo, new String[]{"code", "value", "timestamp"}); DataStreamSource dataStreamSource = env.createInput(inputFormat, typeInfo); //分流 SplitStream splitStream = dataStreamSource .split(item -> { if (item.getValue() > 100) { return Collections.singletonList("high"); } return Collections.singletonList("low"); }); DataStream highStream = splitStream.select("high"); DataStream lowStream = splitStream.select("low"); //合流 ConnectedStreams connectedStreams = lowStream.connect(highStream);// DataStream unionDataStream = lowStream.union(highStream); //需要类型一致 SingleOutputStreamOperator > operator = connectedStreams .map(new CoMapFunction >() { @Override public Tuple3 map1(TelemeterDTO value) { return Tuple3.of(value.getCode(), value.getValue(), value.getTimestamp()); } @Override public Tuple3 map2(TelemeterDTO value) { return Tuple3.of(value.getCode(), value.getValue(), value.getTimestamp()); } }); operator.print(); env.execute();
UDF函数,提供底层支持
MapFunction
FilterFunction
ReduceFunction
ProcessFunction
SourceFunction
SinkFunction
富函数
富函数 包含了生命周期,及上下文相关信息,如
open() 可以在算子创建之初建立数据库连接
close() 在在算子生命结束之前关闭资源
以上是"Flink中Transform怎么用"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!
函数
篇文章
内容
生命
算子
合流
一致
上下
上下文
不怎么
信息
周期
大部分
底层
数据
数据库
更多
知识
类型
行业
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发有那么累吗
sr860服务器
网络安全国家认证证书
gtalk服务器
做网络技术账务
nfs 挂载服务器失败
网络安全哪年开始
程序流程图软件开发
邵阳市网络安全宣传周活动
公安网络安全宣传图
nginx对服务器的要求
服务器查询目录使用率
末日之刃服务器人口普查2022
免费视觉软件开发包下载
软件安装在服务器快还是本地快
链接服务器出现问题
小洲科技互联网
服务器收费什么意思
腾讯云海外服务器免费更换IP
网络数据库技术工作原理
杭州湾区腾讯互联网科技大厦
web服务器线程池管理
什么是数据库的恢复原则
济南蓝泰网络技术
服务器部署mysql
网络服务器安全操作规程
花神南京互联网科技有限公司
衣二三网络技术有限公司
服务器直接连网线远程管理口
软件开发费用 怎么做账