如何实现Apache Flink中Flink数据流转换
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,本篇文章给大家分享的是有关如何实现Apache Flink中Flink数据流转换,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。Opera
千家信息网最后更新 2025年12月01日如何实现Apache Flink中Flink数据流转换
本篇文章给大家分享的是有关如何实现Apache Flink中Flink数据流转换,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
Operators操作转换一个或多个DataStream到一个新的DataStream 。
filter function
Scala
object DataStreamTransformationApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment filterFunction(env) env.execute("DataStreamTransformationApp") } def filterFunction(env: StreamExecutionEnvironment): Unit = { val data=env.addSource(new CustomNonParallelSourceFunction) data.map(x=>{ println("received:" + x) x }).filter(_%2 == 0).print().setParallelism(1) }}数据源选择之前的任意一个数据源即可。
这里的map中没有做任何实质性的操作,filter中将所有的数都对2取模操作,打印结果如下:
received:1received:22received:3received:44received:5received:66received:7received:88
说明map中得到的所有的数据,而在filter中进行了过滤操作。
Java
public static void filterFunction(StreamExecutionEnvironment env) { DataStreamSource data = env.addSource(new JavaCustomParallelSourceFunction()); data.setParallelism(1).map(new MapFunction() { @Override public Long map(Long value) throws Exception { System.out.println("received:"+value); return value; } }).filter(new FilterFunction() { @Override public boolean filter(Long value) throws Exception { return value % 2==0; } }).print().setParallelism(1); } 需要先使用data.setParallelism(1)然后再进行map操作,否则会输出多次。因为我们用的是JavaCustomParallelSourceFunction(),而当我们使用JavaCustomNonParallelSourceFunction时,默认就是并行度1,可以不用设置。
Union Function
Scala
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment// filterFunction(env) unionFunction(env) env.execute("DataStreamTransformationApp") } def unionFunction(env: StreamExecutionEnvironment): Unit = { val data01 = env.addSource(new CustomNonParallelSourceFunction) val data02 = env.addSource(new CustomNonParallelSourceFunction) data01.union(data02).print().setParallelism(1) }Union操作将两个数据集综合起来,可以一同处理,上面打印输出如下:
11223344
Java
public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();// filterFunction(environment); unionFunction(environment); environment.execute("JavaDataStreamTransformationApp"); } public static void unionFunction(StreamExecutionEnvironment env) { DataStreamSource data1 = env.addSource(new JavaCustomNonParallelSourceFunction()); DataStreamSource data2 = env.addSource(new JavaCustomNonParallelSourceFunction()); data1.union(data2).print().setParallelism(1); } Split Select Function
Scala
split可以将一个流拆成多个流,select可以从多个流中进行选择处理的流。
def splitSelectFunction(env: StreamExecutionEnvironment): Unit = { val data = env.addSource(new CustomNonParallelSourceFunction) val split = data.split(new OutputSelector[Long] { override def select(value: Long): lang.Iterable[String] = { val list = new util.ArrayList[String]() if (value % 2 == 0) { list.add("even") } else { list.add("odd") } list } }) split.select("odd","even").print().setParallelism(1) }可以根据选择的名称来处理数据。
Java
public static void splitSelectFunction(StreamExecutionEnvironment env) { DataStreamSource data = env.addSource(new JavaCustomNonParallelSourceFunction()); SplitStream split = data.split(new OutputSelector() { @Override public Iterable select(Long value) { List output = new ArrayList<>(); if (value % 2 == 0) { output.add("odd"); } else { output.add("even"); } return output; } }); split.select("odd").print().setParallelism(1); } 以上就是如何实现Apache Flink中Flink数据流转换,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。
数据
多个
处理
选择
数据流
就是
数据源
更多
知识
篇文章
输出
实用
不用
两个
中将
名称
实质
实质性
工作会
文章
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
河南通用软件开发零售价格
云天宫可以转的服务器
宜兴一站式软件开发诚信合作
如何卸载access数据库
战地5社区服务器多吗
数据库中邮政编码
岳塘区网络安全
象山计算机软件开发项目
武汉夲地宝网络技术有限公司
网络技术发展看法
网络安全周红衣郑州
民生银行 软件开发中心
网络安全哪三种
wcf服务器
龙岩微力量网络技术有限公司
网络技术 考点
ei数据库使用指南
网站服务器价的格
视频服务器管理系统
淮安华科网络技术有限公司
海南互联网科技企业孵化器
软件测试开发与软件开发
网络安全法 不实图文
重庆服务器防火墙直供
忻州网络技术参考价格
qq邮件与服务器
如何搭设安全防护服务器
网络安全等保定级
2018公需培训网络安全
浙江学校时钟监控网关服务器