Flink的SideOutputSplit分流怎么实现
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这篇文章主要讲解了"Flink的SideOutputSplit分流怎么实现",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flink的SideOutpu
千家信息网最后更新 2025年12月02日Flink的SideOutputSplit分流怎么实现
这篇文章主要讲解了"Flink的SideOutputSplit分流怎么实现",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flink的SideOutputSplit分流怎么实现"吧!
版本说明:
环境: Windiws
Scala: 2.11.8
Flink :1.10.1
大部分的DataStream API的算子的输出是单一输出,也就是某种数据类型的流。
除了split算子,可以将一条流分成多条流,这些流的数据类型也都相同。
process function的side outputs功能可以产生多条流(Flink 1.9版本之后推荐此种方案),并且这些流的数据类型可以不一样。一个side output可以定义为OutputTag[X]对象,X是输出流的数据类型。process function可以通过Context对象发射一个事件到一个或者多个side outputs。
import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}import org.apache.flink.api.scala._import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.ProcessFunctionimport org.apache.flink.util.Collector/** * * @param deviceNo 设备号 * @param timestamp 时间戳 * @param temperature 温度 */case class SensorReading(deviceNo: String, timestamp: Long, temperature: Double)object SensorReadingSplitStreaming { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //设置时间语义 时间发生时间 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val socketSource: DataStream[String] = env.readTextFile("D:\\tmp\\file1.txt") val mapStream: DataStream[SensorReading] = socketSource .map(data => { val split: Array[String] = data.split(",") SensorReading(split(0).trim, split(1).trim.toLong, split(2).trim.toDouble) }) //对数据流进行分流处理 val tmpStageStream: DataStream[SensorReading] = mapStream.process(new TempStageProcess()) tmpStageStream.print("main"); val lowStream: DataStream[(String, Double)] = tmpStageStream.getSideOutput(new OutputTag[(String, Double)]("low-tmp")) val highStream: DataStream[(String, Double)] = tmpStageStream.getSideOutput(new OutputTag[(String, Double)]("high-tmp")) lowStream.print("low") highStream.print("high") env.execute() }}class TempStageProcess() extends ProcessFunction[SensorReading, SensorReading] { // 定义侧输出流 lazy val lowTmp: OutputTag[(String, Double)] = new OutputTag[(String, Double)]("low-tmp"); lazy val HighTmp: OutputTag[(String, Double)] = new OutputTag[(String, Double)]("high-tmp"); //处理数据 override def processElement(value: SensorReading, context: ProcessFunction[SensorReading, SensorReading]#Context, collector: Collector[SensorReading]): Unit = { if (value.temperature < 10) { context.output(lowTmp, (value.deviceNo, value.temperature)) } else if (value.temperature > 70) { context.output(HighTmp, (value.deviceNo, value.temperature)) } else { collector.collect(value) } }} //测试文件内容如下↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓/*设备8,1610035289736,84.3设备5,1610035371758,38.8设备5,1610035458637,60.2设备1,1610035543127,10.2设备7,1610035623427,51.6设备5,1610035705302,20.1设备5,1610035787387,12.9设备7,1610035877019,88.2设备6,1610035960537,33.5设备7,1610036043040,63.0设备5,1610036125179,64.5设备6,1610036214972,30.2设备5,1610036296542,56.5设备7,1610036377999,29.7设备6,1610036467523,59.4设备4,1610036557446,71.1设备5,1610036641100,28.2设备2,1610036725803,88.8设备8,1610036808041,73.5设备1,1610036897060,18.0设备7,1610036980127,14.9设备2,1610037069523,47.4设备4,1610037154507,59.5设备5,1610037235099,35.0设备6,1610037317868,76.4设备2,1610037403367,10.0设备2,1610037484177,18.5设备4,1610037571384,98.7设备5,1610037653666,95.6设备6,1610037735520,32.6设备6,1610037823906,83.3设备3,1610037913756,29.1设备7,1610037994980,74.6设备6,1610038081606,22.2设备3,1610038163043,10.4设备5,1610038244717,56.9设备3,1610038326227,64.8设备4,1610038411053,65.0设备8,1610038500538,93.2设备8,1610038583924,76.2设备1,1610038670150,42.1设备5,1610038756839,35.1设备3,1610038840180,75.9设备3,1610038929751,83.4设备7,1610039019422,24.1设备3,1610039101778,85.0设备8,1610039183077,45.6设备3,1610039264498,79.5设备1,1610039351600,44.4设备8,1610039434187,73.3设备3,1610039518048,77.9设备7,1610039598556,9.79设备4,1610039679144,19.0设备2,1610039761967,56.1设备3,1610039847823,88.2设备6,1610039933024,77.4设备7,1610040014212,14.4设备4,1610040101627,98.2设备8,1610040182379,85.0设备6,1610040265210,61.8设备2,1610040345769,48.0设备3,1610040432855,19.9设备4,1610040515943,30.9设备4,1610040601373,51.7设备1,1610040681803,29.7设备8,1610040770779,31.6设备3,1610040851986,67.1设备4,1610040941421,93.2设备7,1610041022836,37.2设备8,1610041105401,84.6设备6,1610041189301,19.2设备4,1610041270735,99.0设备4,1610041354109,77.0设备5,1610041435113,49.7设备1,1610041521773,74.2设备8,1610041603035,42.2设备3,1610041687230,87.1设备1,1610041767985,82.7设备3,1610041848130,0.59设备4,1610041933021,7.38设备2,1610042016080,28.9设备2,1610042103229,99.2设备2,1610042190222,42.2设备3,1610042277841,12.0设备7,1610042364076,93.5设备7,1610042444652,10.5设备4,1610042530461,68.5设备1,1610042615421,78.2设备3,1610042702219,18.5设备6,1610042787478,64.8设备5,1610042874301,6.34设备2,1610042956073,65.6设备8,1610043038793,10.6设备8,1610043122971,30.3设备7,1610043203810,17.5设备8,1610043291566,83.8设备5,1610043373188,30.5设备2,1610043456107,84.7设备1,1610043545998,53.4设备3,1610043627174,97.4 */输出结果:
SLF4J: Class path contains multiple SLF4J bindings.SLF4J: Found binding in [jar:file:/D:/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: Found binding in [jar:file:/D:/.m2/repository/ch/qos/logback/logback-classic/1.2.0/logback-classic-1.2.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]17:19:42,659 WARN org.apache.flink.runtime.taskmanager.TaskManagerLocation - No hostname could be resolved for the IP address 127.0.0.1, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.17:19:42,725 WARN org.apache.flink.runtime.taskmanager.TaskManagerLocation - No hostname could be resolved for the IP address 127.0.0.1, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.17:19:43,088 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils - Log file environment variable 'log.file' is not set.17:19:43,089 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'Key: 'web.log.path' , default: null (fallback keys: [{key=jobmanager.web.log.path, isDeprecated=true}])'.high> (设备8,84.3)main> SensorReading(设备5,1610035371758,38.8)main> SensorReading(设备5,1610035458637,60.2)main> SensorReading(设备1,1610035543127,10.2)main> SensorReading(设备7,1610035623427,51.6)main> SensorReading(设备5,1610035705302,20.1)main> SensorReading(设备5,1610035787387,12.9)high> (设备7,88.2)main> SensorReading(设备6,1610035960537,33.5)main> SensorReading(设备7,1610036043040,63.0)main> SensorReading(设备5,1610036125179,64.5)main> SensorReading(设备6,1610036214972,30.2)main> SensorReading(设备5,1610036296542,56.5)main> SensorReading(设备7,1610036377999,29.7)main> SensorReading(设备6,1610036467523,59.4)high> (设备4,71.1)main> SensorReading(设备5,1610036641100,28.2)high> (设备2,88.8)high> (设备8,73.5)main> SensorReading(设备1,1610036897060,18.0)main> SensorReading(设备7,1610036980127,14.9)main> SensorReading(设备2,1610037069523,47.4)main> SensorReading(设备4,1610037154507,59.5)main> SensorReading(设备5,1610037235099,35.0)high> (设备6,76.4)main> SensorReading(设备2,1610037403367,10.0)main> SensorReading(设备2,1610037484177,18.5)high> (设备4,98.7)high> (设备5,95.6)main> SensorReading(设备6,1610037735520,32.6)high> (设备6,83.3)main> SensorReading(设备3,1610037913756,29.1)high> (设备7,74.6)main> SensorReading(设备6,1610038081606,22.2)main> SensorReading(设备3,1610038163043,10.4)main> SensorReading(设备5,1610038244717,56.9)main> SensorReading(设备3,1610038326227,64.8)main> SensorReading(设备4,1610038411053,65.0)high> (设备8,93.2)high> (设备8,76.2)main> SensorReading(设备1,1610038670150,42.1)main> SensorReading(设备5,1610038756839,35.1)high> (设备3,75.9)high> (设备3,83.4)main> SensorReading(设备7,1610039019422,24.1)high> (设备3,85.0)main> SensorReading(设备8,1610039183077,45.6)high> (设备3,79.5)main> SensorReading(设备1,1610039351600,44.4)high> (设备8,73.3)high> (设备3,77.9)low> (设备7,9.79)main> SensorReading(设备4,1610039679144,19.0)main> SensorReading(设备2,1610039761967,56.1)high> (设备3,88.2)high> (设备6,77.4)main> SensorReading(设备7,1610040014212,14.4)high> (设备4,98.2)high> (设备8,85.0)main> SensorReading(设备6,1610040265210,61.8)main> SensorReading(设备2,1610040345769,48.0)main> SensorReading(设备3,1610040432855,19.9)main> SensorReading(设备4,1610040515943,30.9)main> SensorReading(设备4,1610040601373,51.7)main> SensorReading(设备1,1610040681803,29.7)main> SensorReading(设备8,1610040770779,31.6)main> SensorReading(设备3,1610040851986,67.1)high> (设备4,93.2)main> SensorReading(设备7,1610041022836,37.2)high> (设备8,84.6)main> SensorReading(设备6,1610041189301,19.2)high> (设备4,99.0)high> (设备4,77.0)main> SensorReading(设备5,1610041435113,49.7)high> (设备1,74.2)main> SensorReading(设备8,1610041603035,42.2)high> (设备3,87.1)high> (设备1,82.7)low> (设备3,0.59)low> (设备4,7.38)main> SensorReading(设备2,1610042016080,28.9)high> (设备2,99.2)main> SensorReading(设备2,1610042190222,42.2)main> SensorReading(设备3,1610042277841,12.0)high> (设备7,93.5)main> SensorReading(设备7,1610042444652,10.5)main> SensorReading(设备4,1610042530461,68.5)high> (设备1,78.2)main> SensorReading(设备3,1610042702219,18.5)main> SensorReading(设备6,1610042787478,64.8)low> (设备5,6.34)main> SensorReading(设备2,1610042956073,65.6)main> SensorReading(设备8,1610043038793,10.6)main> SensorReading(设备8,1610043122971,30.3)main> SensorReading(设备7,1610043203810,17.5)high> (设备8,83.8)main> SensorReading(设备5,1610043373188,30.5)high> (设备2,84.7)main> SensorReading(设备1,1610043545998,53.4)high> (设备3,97.4)Process finished with exit code 0感谢各位的阅读,以上就是"Flink的SideOutputSplit分流怎么实现"的内容了,经过本文的学习后,相信大家对Flink的SideOutputSplit分流怎么实现这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
设备
数据
输出
时间
类型
内容
学习
多条
对象
版本
算子
处理
相同
也就是
事件
功能
可以通过
多个
大部分
就是
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发维护教学
数据库原理属性个数是什么
神通数据库模式什么意思
网络安全人物动作
天象网络技术集团
成都温江区软件开发的公司
软件开发公司应该怎么发展
世界软件开发者出售事件
计算机岗位网络安全协议
软件开发是哪些专业
网络安全技术职业生涯规划
核安保 网络安全
南邮网络安全期末试卷
三级网络技术好过么
三国武将数据库
数据库最多多少行
大学网络安全人员培养
搜狗手机软件开发者
如何清空手机号数据库
网络安全靠人民活动目的
安全不放假网络安全
注册服务器占用cpu
安防领域网络安全防护主流
wins服务器软件
广州编程软件开发流程
服务器怎么能打开44158端口
国家网络安全空间概念股
信息网络安全主要包括
数据库最多多少行
美服minecraft服务器