Flink的函数有哪些
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,这篇文章主要介绍了Flink的函数有哪些,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。1. Map: 将数据流中的数据进行一个转化,形
千家信息网最后更新 2025年12月03日Flink的函数有哪些
这篇文章主要介绍了Flink的函数有哪些,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。
1. Map: 将数据流中的数据进行一个转化,形成一个新的数据流,消费一个元素,并且产生一个元素
具体代码实现
package com.wudl.core;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/** * @version v1.0 * @ProjectName Flinklearning * @ClassName WordMap * @Description TODO map 算子实例 * @Date 2020/10/29 10:15 */public class WordMap { /** * @param args * Map 函数的用法 * 映射:将数据流中的数据进行一个转化,形成一个新的数据流,消费一个元素,并且产生一个元素 *参数: Lambda 表达式或者,new MapFunction实现类 * 返回值:DataStream */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setMaxParallelism(1); env.socketTextStream("10.204.125.140", 8899) .map(new MapFunction() { @Override public String map(String s) throws Exception { String[] split = s.split(","); return split[0] + "---" + split[1]; } }).print(); env.execute(); }} 2. FlatMap:
将数据流中的整体拆分成一个 一个 的个体使用, 消费一个元素并产生零到多个元素
package com.wudl.core;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import java.util.Arrays;import java.util.List;/** * @version v1.0 * @ProjectName Flinklearning * @ClassName TransformFlatMap * @Description TODO FlatMap * * FlatMap: 是一种扁平的映射,将数据流中的整体拆分成为一个个的个体使用, 消费后的元素产生零到多个元素 * * * * @Author wudl * @Date 2020/10/29 10:46 * * * 函数 FlatMap * 将数据流中的整体拆分成一个 一个 的个体使用, 消费一个元素并产生零到多个元素 * 参数: lambda 表达式或者是FlatFunction的实现类 * 返回值:DataStream * * * */public class TransformFlatMap { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);// DataStreamSource> listDs = env.fromCollection(Arrays.asList(// Arrays.asList(1, 2, 3),// Arrays.asList(3, 4, 5),// Arrays.asList(8,9,0)// ));// listDs.flatMap(new FlatMapFunction, Integer>() {// @Override// public void flatMap(List list, Collector collector) throws Exception {//// for (Integer number : list) {// collector.collect(number + 100);// }//// }// }).print(); DataStreamSource strDs = env.socketTextStream("10.204.125.140", 8899); strDs.flatMap(new FlatMapFunction() { @Override public void flatMap(String s, Collector collector) throws Exception { String[] split = s.split(","); collector.collect(split[0]+split[1]); } }).print(); env.execute(); }}
第三种:Filter 对数据流的过滤根据指定的规则将满足条件的(true) 的数据保留, 不瞒住条件的(false) 将丢弃
package com.wudl.core;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/** * @version v1.0 * @ProjectName Flinklearning * @ClassName TransformFilter * @Description TODO 流的过滤 * @Date 2020/11/5 10:26 */public class TransformFilter { /** * 函数中Filter 中过滤 * 过滤:根据指定的规则将满足条件的(true) 的数据保留, 不瞒住条件的(false) 将丢弃 * 返回值:DataStream */ public static void main(String[] args) throws Exception { //1.获取上下文的环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.设置并行度 env.setParallelism(1); //3.获取数据流 DataStreamSource SourceDs = env.socketTextStream("10.204.125.140", 8899); //4. 过滤数据流 DataStream filter = SourceDs.filter(new FilterFunction() { @Override public boolean filter(String value) throws Exception { String[] split = value.split(","); return split[1].length() > 3; } }); filter.print(); env.execute(); }} 感谢你能够认真阅读完这篇文章,希望小编分享的"Flink的函数有哪些"这篇文章对大家有帮助,同时也希望大家多多支持,关注行业资讯频道,更多相关知识等着你来学习!
数据
元素
数据流
函数
消费
条件
篇文章
个体
多个
整体
参数
表达式
规则
上下
上下文
代码
价值
兴趣
同时
实例
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
时序数据库详解和使用
软件开发公司使用的框架
计算机网络技术怎么介绍这个专业
无线传感网络技术与应用图片
怎么上传到云服务器
怎么做服务器文件
纽卡斯尔网络安全好毕业吗
60台电脑服务器多少钱
宇宙服务器
在服务器上配置 git
数据库大作业案例动物园
达梦数据库查询授权日期
怎么给数据库的表添加信息
漫画说网络安全法40
福建服务器机柜物理机
数据库查询表的行数
山东个性化软件开发价格走势
orcl数据库导表输入
我的世界鬼灭之刃服务器连接
网络安全法宣传提示
网络安全知识大赛主题词
网络安全警报可以吊销信息吗
数据库没有有效分析步数据
山东餐饮软件开发
网络技术有限公司的名称
网络安全背诵
天玥 服务器
网络安全与管理有那些方面
orcl数据库导表输入
政府数据库建设