SparkStreaming的实现和使用方法
发表于:2025-11-11 作者:千家信息网编辑
千家信息网最后更新 2025年11月11日,这篇文章主要讲解了"SparkStreaming的实现和使用方法",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"SparkStreaming的实现和使用
千家信息网最后更新 2025年11月11日SparkStreaming的实现和使用方法
这篇文章主要讲解了"SparkStreaming的实现和使用方法",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"SparkStreaming的实现和使用方法"吧!
一.DStream 整合RDD
1.官网算子
2.使用案例
生产中使用多的是一个文件中有很多域名,另一个中是黑名单,要进行剔除数据一:日志信息 DStream domain,traffic xinlang.com xinlang.com baidu.com数据二:已有的文件 黑名单 RDD domain baidu.com
3.RDD实现上述需求
package sparkstreaming02import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable.ListBufferobject Demo1 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Demo1").setMaster("local[2]") val sc = new SparkContext(conf) val input1 = new ListBuffer[(String,Long)] input1.append(("www.xinlang.com", 8888)) input1.append(("www.xinalng.com", 9999)) input1.append(("www.baidu.com", 7777)) val data1 = sc.parallelize(input1) //进行join一定要是key,value形式的 val input2 = new ListBuffer[(String,Boolean)] input2.append(("www.baidu.com",true)) val data2 = sc.parallelize(input2) data1.leftOuterJoin(data2) .filter(x => { x._2._2.getOrElse(false) != true }).map(x => (x._1,x._2._1)) .collect().foreach(println) }}4.SparkStreaming实现
package sparkstreaming02import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.mutable.ListBufferobject Streaming { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Streaming").setMaster("local[2]") val ssc = new StreamingContext(conf,Seconds(10)) val lines = ssc.socketTextStream("s201",9999) // 数据二: rdd val input2 = new ListBuffer[(String,Boolean)] input2.append(("www.baidu.com",true)) val data2 = ssc.sparkContext.parallelize(input2) lines.map(x=>(x.split(",")(0), x)).transform( rdd => { rdd.leftOuterJoin(data2) .filter(x => { x._2._2.getOrElse(false) != true //注意 join之后过滤 }).map(x => (x._1,x._2._1)) } ).print() ssc.start() ssc.awaitTermination() }}二.SparkStreaming插入外部数据源
1.插入外部数据源用的,但是使用这个有几个坑
、
2.错误一官网例子

3.原因
connect 在Driver端创建,record在executor,发过去序列化错误
4.解决
解决:第一种把connect放到executor端这样弊端是每条记录会生成一个connect太耗费资源 words.foreachRDD { rdd => rdd.foreach { record => val connection = createConnection() // executed at the driver val word = record._1 val count = record._2.toInt val sql = s"insert into wc (wc,count) values($word,$count)" connection.createStatement().execute(sql) }5.最终解决办法
package sparkstreaming02import java.sql.DriverManagerimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}object MysqlStreaming { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("MysqlStreaming") val ssc = new StreamingContext(conf,Seconds(1)) val lines = ssc.socketTextStream("s201",9999) val words = lines.flatMap(x => x.split(",")).map((_,1)).reduceByKey(_+_)// words.foreachRDD { rdd =>// val connection = createConnection() // executed at the driver// rdd.foreach { record =>// val word = record._1// val count = record._2// val sql = s"insert into wc (word,count) values($word,$count)"// connection.createStatement().execute(sql)// }// }// words.foreachRDD { rdd =>// rdd.foreach { record =>// val connection = createConnection() // executed at the driver// val word = record._1// val count = record._2.toInt// val sql = s"insert into wc (wc,count) values($word,$count)"// connection.createStatement().execute(sql)// }// } //最终的写法 words.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val connection = createConnection() partitionOfRecords.foreach( record =>{ val word = record._1 val count = record._2 val sql = s"insert into wc (wc,count) values('$word',$count)" connection.createStatement().execute(sql)} ) } } ssc.start() ssc.awaitTermination() } def createConnection() = { Class.forName("com.mysql.cj.jdbc.Driver") DriverManager.getConnection("jdbc:mysql://localhost:3306/hive?serverTimezone=UTC&useSSL=false","root","123456") }}6.出现问题
错误,插入数据库时,你要插入字符串要用''例如:val sql = s"insert into wc (wc,count) values($word,$count)"word是字符串,你要不加双引号就报这个错误正确val sql = s"insert into wc (wc,count) values('$word',$count)"感谢各位的阅读,以上就是"SparkStreaming的实现和使用方法"的内容了,经过本文的学习后,相信大家对SparkStreaming的实现和使用方法这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
数据
使用方法
方法
错误
学习
内容
字符
字符串
数据源
文件
问题
黑名单
黑名
例子
信息
写法
办法
原因
域名
就是
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全能考的证书
网络安全工作办公室
rdo服务器任务管理器
计算机网络技术应用题答案
接收邮件服务器也称什么
数据库刷库的英文单词
酒店网络安全检查整改报告
哪家服务器
psp无法与服务器
简历工作经历软件开发
网络安全产品的书籍
如何快速比对两表中数据库
思修网络安全论文
培养网络技术专业型人才
数据库对象资源管理器新建索引
数据库的存在冲突是什么
数据库测试开发
东阳的网络技术公司招聘
蓝色引力互联网科技
网络安全考研数据
德阳市用友软件开发公司地址
数据库提示已存在表
路由器做打印服务器怎么做
数据库安全模型
家庭nas 路由服务器
服务器映射到公网的步骤
网络安全小卫士说课稿
360信息网络安全
服务器搬迁报价
新华互联网科技 照片