Kafka+SparkStream+Hive的项目实现方法是什么
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,本篇内容主要讲解"Kafka+SparkStream+Hive的项目实现方法是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Kafka+SparkSt
千家信息网最后更新 2025年12月03日Kafka+SparkStream+Hive的项目实现方法是什么
本篇内容主要讲解"Kafka+SparkStream+Hive的项目实现方法是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Kafka+SparkStream+Hive的项目实现方法是什么"吧!
目前的项目中需要将kafka队列的数据实时存到hive表中。
import org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.types.{StringType, StructField, StructType}import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}import org.apache.spark.streaming.{Durations, Seconds, StreamingContext}import org.apache.spark.streaming.dstream.{DStream, InputDStream}import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistentimport org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe def main(args: Array[String]): Unit = { // val conf = new SparkConf() // conf.setMaster("local") // conf.setAppName("SparkStreamingOnKafkaDirect") val spark = SparkSession.builder().appName("test").master("local").enableHiveSupport().getOrCreate() val ssc = new StreamingContext(spark.sparkContext, Durations.seconds(3)) //设置日志级别 ssc.sparkContext.setLogLevel("Error") val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "MyGroupId", // /** * 当没有初始的offset,或者当前的offset不存在,如何处理数据 * earliest :自动重置偏移量为最小偏移量 * latest:自动重置偏移量为最大偏移量【默认】 * none:没有找到以前的offset,抛出异常 */ "auto.offset.reset" -> "earliest", /** * 当设置 enable.auto.commit为false时,不会自动向kafka中保存消费者offset.需要异步的处理完数据之后手动提交 */ "enable.auto.commit" -> (false: java.lang.Boolean) //默认是true ) //设置Kafka的topic val topics = Array("test") //创建与Kafka的连接,接收数据 /*这里接收到数据的样子 2019-09-26 1569487411604 1235 497 Kafka Register 2019-09-26 1569487411604 1235 497 Kafka Register 2019-09-26 1569487414838 390 778 Flink View */ val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, // Subscribe[String, String](topics, kafkaParams) ) //对接收到的数据进行处理,打印出来接收到的key跟value,最后放回的是valueval transStrem: DStream[String] = stream.map(record => { val key_value = (record.key, record.value) println("receive message key = " + key_value._1) println("receive message value = " + key_value._2) key_value._2 }) //这里用了一下动态创建的Schema val structType: StructType = StructType(List[StructField]( StructField("Date_", StringType, nullable = true), StructField("Timestamp_", StringType, nullable = true), StructField("UserID", StringType, nullable = true), StructField("PageID", StringType, nullable = true), StructField("Channel", StringType, nullable = true), StructField("Action", StringType, nullable = true) )) //因为foreachRDD可以拿到封装到DStream中的rdd,可以对里面的rdd进行, /*代码解释: 先从foreach中拿到一条数据,,在函数map中对接收来的数据用 "\n" 进行切分,放到Row中,用的是动态创建Schema,因为我们需要再将数据存储到hive中,所以需要Schema。 因为map是transformance算子,所以用rdd.count()触发一下 spark.createDataFrame:创建一个DataFrame,因为要注册一个临时表,必须用到DataFrame frame.createOrReplaceTempView("t1"):注册临时表 spark.sql("use spark"):使用 hive 的 spark 库 result.write.mode(SaveMode.Append).saveAsTable("test_kafka"):将数据放到 test_kafka 中 */ transStrem.foreachRDD(one => { val rdd: RDD[Row] = one.map({ a => val arr = a.toString.split("\t") Row(arr(0).toString, arr(1).toString, arr(2).toString, arr(3).toString, arr(4).toString, arr(5).toString) }) rdd.count() val frame: DataFrame = spark.createDataFrame(rdd, structType) // println(" Scheme: "+frame.printSchema()) frame.createOrReplaceTempView("t1") // spark.sql("select * from t1").show() spark.sql("use spark") spark.sql("select * from t1"). write.mode(SaveMode.Append).saveAsTable("test_kafka") } ) /** * 以上业务处理完成之后,异步的提交消费者offset,这里将 enable.auto.commit 设置成false,就是使用kafka 自己来管理消费者offset * 注意这里,获取 offsetRanges: Array[OffsetRange] 每一批次topic 中的offset时,必须从 源头读取过来的 stream中获取,不能从经过stream转换之后的DStream中获取。 */ stream.foreachRDD { rdd => val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // some time later, after outputs have completed stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } ssc.start() ssc.awaitTermination() ssc.stop() }到此,相信大家对"Kafka+SparkStream+Hive的项目实现方法是什么"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
数据
方法
项目
偏移
消费者
处理
消费
内容
动态
学习
实用
更深
最大
最小
业务
代码
兴趣
函数
动向
实时
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
图片数据库解决的问题
删除数据库如何恢复数据
青浦区环保网络技术
php可以引用数据库吗
软件开发人员包括几种
我国的计算机网络技术
上海百胜软件开发有限公司
山东pdu服务器电源生产厂
it行业与软件开发哪个好
软件开发行业的关键绩效指标
黑龙江省威海软件开发
ibm服务器维修服务哪里好
贵阳庶足网络技术有限公司
吉林大学网络安全工程
数据库怎么管理
黄浦区辅助软件开发有哪些
奉贤区上门软件开发厂家活动方案
国产数据库压力测试工具
数据库查询返回条数
word连接数据库服务器
德温特专利数据库的收录范围
bim数据库构建组成数据表
中国网络安全泰斗
黄山工业网络安全
网络安全化管理
推动公司网络安全
数据库安全的第一套保障
利普网络安全产品
linux删除mysql数据库
那里有软件开发培训