Flink开发怎样进行实时处理应用程序
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,本篇文章为大家展示了Flink开发怎样进行实时处理应用程序,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。使用Flink + java实现需求环境JDK:1.8
千家信息网最后更新 2025年12月03日Flink开发怎样进行实时处理应用程序
本篇文章为大家展示了Flink开发怎样进行实时处理应用程序,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
使用Flink + java实现需求
环境
JDK:1.8
Maven:3.6.1(最低Maven 3.0.4)
使用上一节中的springboot-flink-train项目
开发步骤
第一步:创建流处理上下文环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
第二步:读取数据,使用socket流方式读取数据
DataStreamSourcetext = env.socketTextStream("192.168.152.45", 9999);
第三步:transform
text.flatMap(new FlatMapFunction>() { @Override public void flatMap(String value, Collector > out) throws Exception { String[] tokens = value.toLowerCase().split(","); for(String token: tokens) { if(token.length() > 0) { out.collect(new Tuple2 (token, 1)); } } } }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print();
这里我们使用逗号分隔,然后跟批处理不同的是,这里使用keyBy(0),而不是groupBy(0)。timewindow表示每隔多久执行一次。
第四步:执行
env.execute("StreamingWCJavaApp");整体代码如下:
/** * 使用Java API来开发Flink的实时处理应用程序 * wc统计的数据源自socket */public class StreamingWCJava02App { public static void main(String[] args) throws Exception { // 获取参数 int port; try{ ParameterTool tool = ParameterTool.fromArgs(args); port = tool.getInt("port"); } catch (Exception e) { System.out.println("端口未设置, 使用默认端口9999"); port = 9999; } // step1: 获取流处理上下文环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // step2: 读取数据 DataStreamSource text = env.socketTextStream("192.168.152.45", port); // step3: transform text.flatMap(new FlatMapFunction>() { @Override public void flatMap(String value, Collector> out) throws Exception { String[] tokens = value.toLowerCase().split(","); for(String token: tokens) { if(token.length() > 0) { out.collect(new Tuple2(token, 1)); } } } }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print(); env.execute("StreamingWCJavaApp"); }} 运行
首先在192.168.152.45上运行命令
nc -l 9999
然后在运行main方法。在192.168.152.45的nc上输入
abc,def,abc,ddd
在idea控制台输出如下:
4> (abc,2)1> (def,1)4> (ddd,1)
这个前面的"4>"表示并行度。我们可以设置setParallelism(1)来忽略这个问题。如下所示:
text.flatMap(new FlatMapFunction>() { @Override public void flatMap(String value, Collector > out) throws Exception { String[] tokens = value.toLowerCase().split(","); for(String token: tokens) { if(token.length() > 0) { out.collect(new Tuple2 (token, 1)); } } } }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);
这样控制台的打印结果如下:
(abc,2)(ddd,1)(def,1)
这样一个简单的demo就成功了!
重构代码
上面的代码中localhost与port需要用参数传递进来。
代码如下:
// 获取参数 int port; try{ ParameterTool tool = ParameterTool.fromArgs(args); port = tool.getInt("port"); } catch (Exception e) { System.out.println("端口未设置, 使用默认端口9999"); port = 9999; }使用Flink提供的ParameterTool来接收参数。
我们在运行时就可以指定参数列表了,其中的key必须以"-"或者"--"开头。
在运行时,配置参数:
这样运行就可以从外界传递参数了
使用Flink + Scala实现需求
接下来使用Scala方式实现,在项目springboot-flink-train-scala中新建StreamingWCScalaApp,内容如下:
/** * 使用Scala开发Flink的实时处理应用程序 */object StreamingWCScalaApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 引入隐式转换 import org.apache.flink.api.scala._ val text = env.socketTextStream("192.168.152.45", 9999) text.flatMap(_.split(",")) .map((_,1)) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1) .print() .setParallelism(1) env.execute("StreamingWCScalaApp"); }}这种方式比java实现更加简洁。
上述内容就是Flink开发怎样进行实时处理应用程序,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注行业资讯频道。
参数
处理
运行
开发
实时
应用程序
程序
应用
代码
数据
端口
内容
方式
环境
上下
上下文
技能
控制台
知识
需求
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发成果包括源代码
网警在行动网络安全需预防
校园网络安全手抄报电子版
渭南软件开发厂家
花雨庭服务器进不去国际服
杭州大晟网络技术有限公司
.数据库技术的核心是
软件开发用家庭版
关系数据库的应用
多益网络技术社招有几轮面试
网络安全工作压力
accesst数据库引擎
基金地投有网络安全板块吗
数据库引擎BDE下载
软件开发队伍建设方案
差异数据库备份与恢复
电子电路开发 软件开发
王者中怎样选取手q服务器
如何清空360浏览器历史数据库
汉南定制软件开发方案
六盘水服务器显卡加盟
对网络安全非传统安全的认识
天融信服务器raid配置
酒店管理系统数据库实施
网络安全知识答题答案20题
2018国家网络安全宣传单
软件开发外包账务处理
安全狗无法连接服务器
试述数据库设计的全过程
软件开发门槛高不高