第97课:Spark Streaming 结合Spark SQL 案例
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,代码如下:package com.dt.spark.streamingimport org.apache.spark.sql.SQLContextimport org.apache.spark.{Sp
千家信息网最后更新 2025年12月03日第97课:Spark Streaming 结合Spark SQL 案例
代码如下:
package com.dt.spark.streamingimport org.apache.spark.sql.SQLContextimport org.apache.spark.{SparkContext, SparkConf}import org.apache.spark.streaming.{StreamingContext, Duration}/** * 使用SparkStreaming结合SparkSQL对日志进行分析。 * 假设电商网站点击日志格式(简化)如下: * userid,itemId,clickTime * 需求:处理10分钟内item点击次数排序Top10,并且将商品名称显示出来。商品itemId与商品名称的对应关系存放在MySQL数据库中 * Created by dinglq on 2016/5/4. */object LogAnalyzerStreamingSQL { val WINDOW_LENGTH = new Duration(600 * 1000) val SLIDE_INTERVAL = new Duration(10 * 1000) def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("LogAnalyzerStreamingSQL").setMaster("local[4]") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ //从数据库中加载itemInfo表 val itemInfoDF = sqlContext.read.format("jdbc").options(Map( "url"-> "jdbc:mysql://spark-master:3306/spark", "driver"->"com.mysql.jdbc.Driver", "dbtable"->"iteminfo", "user"->"root", "password"-> "vincent" )).load() itemInfoDF.registerTempTable("itemInfo") val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL) val logLinesDStream = streamingContext.textFileStream("D:/logs_incoming") val accessLogsDStream = logLinesDStream.map(AccessLog.parseLogLine).cache() val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL) windowDStream.foreachRDD(accessLogs => { if (accessLogs.isEmpty()) { println("No logs received in this time interval") } else { accessLogs.toDF().registerTempTable("accessLogs") val sqlStr = "SELECT a.itemid,a.itemname,b.cnt FROM itemInfo a JOIN " + " (SELECT itemId,COUNT(*) cnt FROM accessLogs GROUP BY itemId) b " + " ON (a.itemid=b.itemId) ORDER BY cnt DESC LIMIT 10 " val topTenClickItemLast10Minus = sqlContext.sql(sqlStr) // Persist top ten table for this window to HDFS as parquet file topTenClickItemLast10Minus.show() } }) streamingContext.start() streamingContext.awaitTermination() }}case class AccessLog(userId: String, itemId: String, clickTime: String) {}object AccessLog { def parseLogLine(log: String): AccessLog = { val logInfo = log.split(",") if (logInfo.length == 3) { AccessLog(logInfo(0),logInfo(1), logInfo(2)) } else { AccessLog("0","0","0") } }}MySQL中表的内容如下:
mysql> select * from spark.iteminfo;+--------+----------+| itemid | itemname |+--------+----------+| 001 | phone || 002 | computer || 003 | TV |+--------+----------+3 rows in set (0.00 sec)
在D创建目录logs_incoming
运行Spark Streaming 程序。
新建文件,内容如下:
0001,001,2016-05-04 22:10:200002,001,2016-05-04 22:10:210003,001,2016-05-04 22:10:220004,002,2016-05-04 22:10:230005,002,2016-05-04 22:10:240006,001,2016-05-04 22:10:250007,002,2016-05-04 22:10:260008,001,2016-05-04 22:10:270009,003,2016-05-04 22:10:280010,003,2016-05-04 22:10:290011,001,2016-05-04 22:10:300012,003,2016-05-04 22:10:310013,003,2016-05-04 22:10:32
将文件保存到目录logs_incoming 中,观察Spark程序的输出:
+------+--------+---+|itemid|itemname|cnt|+------+--------+---+| 001| phone| 6|| 003| TV| 4|| 002|computer| 3|+------+--------+---+
备注:
1、DT大数据梦工厂微信公众号DT_Spark
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains
商品
数据
内容
名称
数据库
文件
日志
目录
程序
中表
代码
公众
备注
大数
实战
工厂
格式
次数
网站
需求
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全96110
绩溪进口软件开发服务解决方案
大海战烈焰与重生服务器端
山东腾纵软件开发公司
淮南深信服网络安全
娄底学数据库软件应用能力水平高
南京中兴软件开发大概月薪多少
中学网络安全应急演练方案
武警网络安全教育宣传片视频
广东省网络安全培训机构
产品数据库意义是什么
软件开发轮廓图标
网络安全性多少年检测一次
虹口区管理软件开发代理品牌
WPS是用什么软件开发的
杭州软件开发精修培训机构
肥东网络技术服务收费
电子竞技改成了网络安全
济南康健网络技术
八小网络安全教育
数据库补全工具
网络安全和软件开发区别
数据库实验心得
应急厅网络安全建设
网络技术合作框架协议
血站网络安全工作汇报
知网+全文数据库
关于网络技术发展
网络安全执法成就2022
郴州软件开发培训哪家强