SparkStreaming消费kafka数据
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,概要:本例子为SparkStreaming消费kafka消息的例子,实现的功能是将数据实时的进行抽取、过滤、转换,然后存储到HDFS中。实例代码package com.fwmagic.testimpo
千家信息网最后更新 2025年12月01日SparkStreaming消费kafka数据
概要:本例子为SparkStreaming消费kafka消息的例子,实现的功能是将数据实时的进行抽取、过滤、转换,然后存储到HDFS中。
实例代码
package com.fwmagic.testimport com.alibaba.fastjson.{JSON, JSONException}import org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.SparkConfimport org.apache.spark.sql.{SaveMode, SparkSession}import org.apache.spark.streaming.kafka010._import org.apache.spark.streaming.{Seconds, StreamingContext}import org.slf4j.LoggerFactory/** * created by fwmagic */object RealtimeEtl { privateval logger = LoggerFactory.getLogger(PVUV.getClass) def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "hadoop") val conf = new SparkConf().setAppName("RealtimeEtl").setMaster("local[*]") val spark = SparkSession.builder().config(conf).getOrCreate() val streamContext = new StreamingContext(spark.sparkContext, Seconds(5)) //直连方式相当于跟kafka的Topic至直接连接 //"auto.offset.reset:earliest(每次重启重新开始消费),latest(重启时会从最新的offset开始读取) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "hd1:9092,hd2:9092,hd3:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "fwmagic", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("access") val kafkaDStream = KafkaUtils.createDirectStream[String, String]( streamContext, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) ) //如果使用SparkStream和Kafka直连方式整合,生成的kafkaDStream必须调用foreachRDD kafkaDStream.foreachRDD(kafkaRDD => { if (!kafkaRDD.isEmpty()) { //获取当前批次的RDD的偏移量 val offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges //拿出kafka中的数据 val lines = kafkaRDD.map(_.value()) //将lines字符串转换成json对象 val logBeanRDD = lines.map(line => { var logBean: LogBean = null try { logBean = JSON.parseObject(line, classOf[LogBean]) } catch { case e: JSONException => { //logger记录 logger.error("json解析错误!line:" + line, e) } } logBean }) //过滤 val filteredRDD = logBeanRDD.filter(_ != null) //将RDD转化成DataFrame,因为RDD中装的是case class import spark.implicits._ val df = filteredRDD.toDF() df.show() //将数据写到hdfs中:hdfs://hd1:9000/360 df.repartition(1).write.mode(SaveMode.Append).parquet(args(0)) //提交当前批次的偏移量,偏移量最后写入kafka kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } }) //启动 streamContext.start() streamContext.awaitTermination() streamContext.stop() }}case class LogBean(time:String, longitude:Double, latitude:Double, openid:String, page:String, evnet_type:Int)依赖环境(pom.xml)
4.0.0 com.fwmagic.360 fwmagic-360 1.0 1.8 1.8 2.11.7 2.2.2 2.7.7 UTF-8 org.scala-lang scala-library ${scala.version} org.apache.spark spark-core_2.11 ${spark.version} org.apache.spark spark-sql_2.11 ${spark.version} org.apache.spark spark-streaming_2.11 ${spark.version} org.apache.spark spark-streaming-kafka-0-10_2.11 ${spark.version} org.apache.hadoop hadoop-client ${hadoop.version} org.apache.hadoop hadoop-client ${hadoop.version} com.alibaba fastjson 1.2.39 net.alchim31.maven scala-maven-plugin 3.2.2 org.apache.maven.plugins maven-compiler-plugin 3.5.1 net.alchim31.maven scala-maven-plugin scala-compile-first process-resources add-source compile scala-test-compile process-test-resources testCompile org.apache.maven.plugins maven-compiler-plugin compile compile org.apache.maven.plugins maven-shade-plugin 2.4.3 package shade *:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA
数据
偏移
消费
例子
批次
方式
中装
代码
功能
字符
字符串
实例
实时
对象
概要
消息
环境
错误
UTF-8
存储
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
excel的软件开发
如何删除11g数据库实例
数据库点餐系统的总结
网络安全应急处置预案下载
计件软件开发用什么设计
巴中软件开发项目管理
联想服务器山东代理商
bsd服务器
pdo实现查看数据库表名
原始传奇找不到自己的服务器
梅斯特软件开发
深圳软件开发有限公司多少个
网络安全会议交流发言材料
编办网络安全责任承诺书
每个软件都有自己的数据库吗
2003服务器管理器
程序员软件开发男
服务器客服招聘准备
唐山网络安全演练
网络安全类读书笔记
闲置设备改无线打印服务器
浅谈校园信息网络安全问题
数据库创建学生表2014
pdo实现查看数据库表名
信阳网络安全攻防对抗赛
注意网络安全手抄报
hive数据库查新语句
2022云服务器
计算机的网络技术题目
静安区电子网络技术均价