Spark Streaming 实现数据实时统计案例
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,Spark 是一个基于内存式的分布式计算框架。具有高性能,高效可扩展,容错等优点。今天讲解一下spark的流计算,其实它也不完全是实时的流计算,算是一种准实时的流计算。上图讲解运行环境:需要linux
千家信息网最后更新 2025年12月01日Spark Streaming 实现数据实时统计案例
Spark 是一个基于内存式的分布式计算框架。具有高性能,高效可扩展,容错等优点。
今天讲解一下spark的流计算,其实它也不完全是实时的流计算,算是一种准实时的流计算。
上图讲解

运行环境:需要linux环境下的spark环境
本例用的centOS 6.5x64 因为需要使用TCP协议传输数据,所以需要安装一个nc插件。
安装方式: yum install ncxxx 或者挂载光盘安装
安装后启动nc -lk 9999 端口可以随便指定,最好是1024以上的就可以。
下面贴出代码
java版本的
import java.util.Arrays;import java.util.List;import org.apache.spark.SparkConf;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import com.google.common.base.Optional;import scala.Tuple2;public class SparkDemo { public static void main(String[] args) { SparkConf conf=new SparkConf().setAppName("sparkDemo2").setMaster("local[3]"); JavaStreamingContext jsc=new JavaStreamingContext(conf,Durations.seconds(5)); //使用带状态的算子,需要checkpoint做容错处理 jsc.checkpoint("D://chkspark"); JavaReceiverInputDStream socketTextStream=jsc.socketTextStream("10.115.27.234", 1000); JavaDStream wordsDstream=socketTextStream.flatMap(new FlatMapFunction() { private static final long serialVersionUID=1L; public Iterable call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); JavaPairDStream wordsToPairDstream=wordsDstream.mapToPair(new PairFunction() { private static final long SerialVersionUID=1L; public Tuple2 call(String word) throws Exception { return new Tuple2(word, 1); } }); /** * 一个batch对应一个RDD。 * */ JavaPairDStream resultDstream=wordsToPairDstream.updateStateByKey(new Function2, Optional, Optional>() { private static final long serialVersionUID=1L; public Optional call(List values, Optional state) throws Exception { Integer oldValue=0; //默认旧value是0 if (state.isPresent()) { oldValue=state.get(); } for (Integer value:values) { oldValue+=value; } return Optional.of(oldValue); } }); //打印结果 resultDstream.print(); jsc.start(); jsc.awaitTermination(); }}
程序测试: 从linux端的nc 下输入任意字符串,spark streaming会实时对输入的数据做出统计。类似于wordcount. 除非手动kill这个进程,否则会一直运行下去。因为它的原理就是和自来水的水流一样,是一连串的数据流。
运行结果展示:

也可以用scala写出同样的程序,代码量更少。
需要深入理解spark streaming的架构原理。
实时
数据
环境
运行
代码
原理
程序
结果
容错
输入
统计
一连串
上图
优点
光盘
内存
分布式
字符
字符串
就是
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
i春秋网络安全分享
日本声优手机数据库
如何爬取公众号的数据库
无锡智能软件开发销售电话
商品三级种类数据库设计
网络安全事态感知
杭州网络安全研究所待遇
数据库关系模式概述
电脑服务器代理我可能有问题
淘宝网络技术安全总监
网络安全手抄报高清大图
教资支付时内部服务器错误怎么办
家用宽带搭建服务器
方块方舟服务器管理
保障网络安全更换设备
网络安全员学薪
网络技术计算目的网络
网上实现Java数据库
决策中国人物影响力数据库
互联网科技如何
江西综合软件开发价格大全
我的世界1.17.7服务器推荐
数据库新建实例命令
通信网络安全展会
物流数据库设计总结
暗黑破坏神一直连接暴雪服务器
山西新一代软件开发设施
服务器桌面显示图片图标
软件开发和芯片
服务器如何提供下载库