Flink1.8中如何进行流处理SocketWordCount
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,本篇文章给大家分享的是有关Flink1.8中如何进行流处理SocketWordCount,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。概
千家信息网最后更新 2025年12月03日Flink1.8中如何进行流处理SocketWordCount
本篇文章给大家分享的是有关Flink1.8中如何进行流处理SocketWordCount,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
概述:
这里主要演示flink源码实例中"WordCount"程序的流窗口版本。
此程序连接到socket服务器并从socket读取字符串。最简单的尝试方法是打开一个文本服务器(在端口9999),使用netcat工具
我这里也贴一下:
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.hadoop.ljs.flink.streaming;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;/*** Implements a streaming windowed version of the "WordCount" program.**This program connects to a server socket and reads strings from the socket.
* The easiest way to try this out is to open a text server (at port 12345)* using the netcat tool via** nc -l 12345 on Linux or nc -l -p 12345 on Windows*
* and run this example with the hostname and the port as arguments. */@SuppressWarnings("serial")public class SocketWordCount {
public static void main(String[] args) throws Exception {
// the host and the port to connect to final String hostname; final int port; try { final ParameterTool params = ParameterTool.fromArgs(args); hostname = params.has("hostname") ? params.get("hostname") : "localhost"; port = params.getInt("port");
/*hostname = "10.124.165.98"; port = 9999;*/ } catch (Exception e) { System.err.println("No port specified. Please run 'SocketWordCount " + "--hostname --port ', where hostname (localhost by default) " + "and port is the address of the text server"); System.err.println("To start a simple text server, run 'netcat -l ' and " + "type the input text into the command line"); return; }
// get the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket DataStream text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts DataStream windowCounts = text
.flatMap(new FlatMapFunction() { @Override public void flatMap(String value, Collector out) { for (String word : value.split("\\s")) { out.collect(new WordWithCount(word, 1L)); } } })
.keyBy("word") .timeWindow(Time.seconds(5))
.reduce(new ReduceFunction() { @Override public WordWithCount reduce(WordWithCount a, WordWithCount b) { return new WordWithCount(a.word, a.count + b.count); } });
// print the results with a single thread, rather than in parallel windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount"); }
// ------------------------------------------------------------------------
/** * Data type for words with count. */ public static class WordWithCount {
public String word; public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) { this.word = word; this.count = count; }
@Override public String toString() { return word + " : " + count; } }}
通过maven package打出jar包:flink191-1.0-SNAPSHOT-jar-with-dependencies
直接提交到flink在yarn中已启动的一个session中,从flink界面上传jar:
上传后,选中jar前面的复选框,可直接填写相关参数:
参数格式:--参数名 参数值 --参数名2 参数值2
参数获取是通过上面代码第49行的工具类获取(固定格式):
ParameterTool params = ParameterTool.fromArgs(args);
最后点击"Submit"按钮,提交任务运行即可。
界面也可查看日志和输出:
以上就是Flink1.8中如何进行流处理SocketWordCount,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。
参数
处理
工具
更多
服务器
格式
界面
知识
程序
篇文章
服务
实用
代码
任务
字符
字符串
实例
就是
工作会
按钮
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全事故防控规范
计算机复试专业课问题数据库
延庆区一站式网络技术好处
常用的网络安全技术有什么
湖北网络安全学院哪里学
论文题目计算机网络技术
连接服务器后怎么进不去
上海志寻网络技术有限
昆山进口服务器代理厂家
艾比利网络技术有限公司工作
客户分类数据库管理
大话服务器状态
以下不是wlan网络安全性
吉林信科互联网科技有限公司
网络安全从我做起主题班会视频
java软件开发学徒
搜索网站需要服务器吗
宁波应用软件开发项目管理
数据库drop与
浪潮存储服务器怎么配置
算法软件开发工程师
山西通用软件开发收费
云服务器修改管理员权限代码
威海互联网养老软件开发系统
茂名数字软件开发商家
oracle服务器端配置
安庆华璟网络技术服务有限公司
短信提示无法连接服务器
江门通讯软件开发报价行情
vb远程连接数据库