spark作业怎么实现
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这篇"spark作业怎么实现"文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇"spark
千家信息网最后更新 2025年12月02日spark作业怎么实现
这篇"spark作业怎么实现"文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇"spark作业怎么实现"文章吧。
将sample.log的数据发送到Kafka中,经过Spark Streaming处理,将数据格式变为以下形式:commandid | houseid | gathertime | srcip | destip |srcport| destport | domainname | proxytype | proxyip | proxytype | title | content | url | logid在发送到kafka的另一个队列中要求:1、sample.log => 读文件,将数据发送到kafka队列中2、从kafka队列中获取数据(0.10 接口不管理offset),变更数据格式3、处理后的数据在发送到kafka另一个队列中分析1 使用课程中的redis工具类管理offset2 读取日志数据发送数据到topic13 消费主题,将数据的分割方式修改为竖线分割,再次发送到topic2
1.OffsetsWithRedisUtils
package home.oneimport java.utilimport org.apache.kafka.common.TopicPartitionimport org.apache.spark.streaming.kafka010.OffsetRangeimport redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}import scala.collection.mutableobject OffsetsWithRedisUtils { // 定义Redis参数 privateval redisHost = "linux123" privateval redisPort = 6379 // 获取Redis的连接 privateval config = new JedisPoolConfig // 最大空闲数 config.setMaxIdle(5) // 最大连接数 config.setMaxTotal(10) privateval pool = new JedisPool(config, redisHost, redisPort, 10000) private def getRedisConnection: Jedis = pool.getResource privateval topicPrefix = "kafka:topic" // Key:kafka:topic:TopicName:groupid private def getKey(topic: String, groupid: String) = s"$topicPrefix:$topic:$groupid" // 根据 key 获取offsets def getOffsetsFromRedis(topics: Array[String], groupId: String): Map[TopicPartition, Long] = { val jedis: Jedis = getRedisConnection val offsets: Array[mutable.Map[TopicPartition, Long]] = topics.map { topic => val key = getKey(topic, groupId) import scala.collection.JavaConverters._ // 将获取到的redis数据由Java的map转换为scala的map,数据格式为{key:[{partition,offset}]} jedis.hgetAll(key) .asScala .map { case (partition, offset) => new TopicPartition(topic, partition.toInt) -> offset.toLong } } // 归还资源 jedis.close() offsets.flatten.toMap } // 将offsets保存到Redis中 def saveOffsetsToRedis(offsets: Array[OffsetRange], groupId: String): Unit = { // 获取连接 val jedis: Jedis = getRedisConnection // 组织数据 offsets.map{range => (range.topic, (range.partition.toString, range.untilOffset.toString))} .groupBy(_._1) .foreach{case (topic, buffer) => val key: String = getKey(topic, groupId) import scala.collection.JavaConverters._ // 同样将scala的map转换为Java的map存入redis中 val maps: util.Map[String, String] = buffer.map(_._2).toMap.asJava // 保存数据 jedis.hmset(key, maps) } jedis.close() }}KafkaProducer
package home.oneimport java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}import org.apache.kafka.common.serialization.StringSerializerimport org.apache.log4j.{Level, Logger}import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object KafkaProducer { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.ERROR) val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]") val sc = new SparkContext(conf) // 读取sample.log文件数据 val lines: RDD[String] = sc.textFile("data/sample.log") // 定义 kafka producer参数 val prop = new Properties() // kafka的访问地址 prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092") // key和value的序列化方式 prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) // 将读取到的数据发送到mytopic1 lines.foreachPartition{iter => // 初始化KafkaProducer val producer = new KafkaProducer[String, String](prop) iter.foreach{line => // 封装数据 val record = new ProducerRecord[String, String]("mytopic1", line) // 发送数据 producer.send(record) } producer.close() } }}3.HomeOne
package home.oneimport java.util.Propertiesimport org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}import org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.InputDStreamimport org.apache.spark.streaming.kafka010._import org.apache.spark.streaming.{Seconds, StreamingContext}object HomeOne { val log = Logger.getLogger(this.getClass) def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.ERROR) val conf = new SparkConf().setAppName(this.getClass.getCanonicalName).setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(5)) // 需要消费的topic val topics: Array[String] = Array("mytopic1") val groupid = "mygroup1" // 定义kafka相关参数 val kafkaParams: Map[String, Object] = getKafkaConsumerParameters(groupid) // 从Redis获取offset val fromOffsets = OffsetsWithRedisUtils.getOffsetsFromRedis(topics, groupid) // 创建DStream val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, // 从kafka中读取数据 ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, fromOffsets) ) // 转换后的数据发送到另一个topic dstream.foreachRDD { rdd => if (!rdd.isEmpty) { // 获取消费偏移量 val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // 处理数据发送到topic2 rdd.foreachPartition(process) // 将offset保存到Redis OffsetsWithRedisUtils.saveOffsetsToRedis(offsetRanges, groupid) } } // 启动作业 ssc.start() // 持续执行 ssc.awaitTermination() } // 将处理后的数据发送到topic2 def process(iter: Iterator[ConsumerRecord[String, String]]) = { iter.map(line => parse(line.value)) .filter(!_.isEmpty) .foreach(line => sendMsg2Topic(line, "mytopic2")) } // 调用kafka生产者发送消息 def sendMsg2Topic(msg: String, topic: String): Unit = { val producer = new KafkaProducer[String, String](getKafkaProducerParameters()) val record = new ProducerRecord[String, String](topic, msg) producer.send(record) } // 修改数据格式,将逗号分隔变成竖线分割 def parse(text: String): String = { try { val arr = text.replace("<<>>", "").split(",") if (arr.length != 15) return "" arr.mkString("|") } catch { case e: Exception => log.error("解析数据出错!", e) "" } } // 定义kafka消费者的配置信息 def getKafkaConsumerParameters(groupid: String): Map[String, Object] = { Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux121:9092", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.GROUP_ID_CONFIG -> groupid, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean), ) } // 定义生产者的kafka配置 def getKafkaProducerParameters(): Properties = { val prop = new Properties() prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092") prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) prop }}2
/*假设机场的数据如下:1, "SFO"2, "ORD"3, "DFW"机场两两之间的航线及距离如下:1, 2,18002, 3, 8003, 1, 1400用 GraphX 完成以下需求:求所有的顶点求所有的边求所有的triplets求顶点数求边数求机场距离大于1000的有几个,有哪些按所有机场之间的距离排序(降序),输出结果 */
代码:
import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.graphx.{Edge, Graph, VertexId}import org.apache.spark.rdd.RDDobject TwoHome { def main(args: Array[String]): Unit = { // 初始化 val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]") val sc = new SparkContext(conf) sc.setLogLevel("warn") //初始化数据 val vertexArray: Array[(Long, String)] = Array((1L, "SFO"), (2L, "ORD"), (3L, "DFW")) val edgeArray: Array[Edge[Int]] = Array( Edge(1L, 2L, 1800), Edge(2L, 3L, 800), Edge(3L, 1L, 1400) ) //构造vertexRDD和edgeRDD val vertexRDD: RDD[(VertexId, String)] = sc.makeRDD(vertexArray) val edgeRDD: RDD[Edge[Int]] = sc.makeRDD(edgeArray) //构造图 val graph: Graph[String, Int] = Graph(vertexRDD, edgeRDD) //所有的顶点 println("所有顶点:") graph.vertices.foreach(println) //所有的边 println("所有边:") graph.edges.foreach(println) //所有的triplets println("所有三元组信息:") graph.triplets.foreach(println) //求顶点数 val vertexCnt = graph.vertices.count() println(s"总顶点数:$vertexCnt") //求边数 val edgeCnt = graph.edges.count() println(s"总边数:$edgeCnt") //机场距离大于1000的 println("机场距离大于1000的边信息:") graph.edges.filter(_.attr > 1000).foreach(println) //按所有机场之间的距离排序(降序) println("降序排列所有机场之间距离") graph.edges.sortBy(-_.attr).collect().foreach(println) }}运行结果
以上就是关于"spark作业怎么实现"这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注行业资讯频道。
数据
机场
内容
作业
之间
格式
队列
处理
消费
信息
参数
点数
顶点
最大
文件
文章
方式
生产者
知识
竖线
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
大连服务器机柜导轨安装图
网络技术网络安全网络文明
虚拟机配置ftp服务器的步骤
数据库技术培训中心
运营商 网络安全
内蒙古网络技术培训
服务器有自动登录功能吗
广州物流软件开发机构
网上学习网络技术
电影服务器是什么意思
网络安全怎么找兼职
温岭巨型软件开发维修价格
戴尔服务器 硬盘启动
网络安全与网络攻防一样吗
计算机网络技术电脑配置
自己搭建任务管理服务器
青浦区手机软件开发公司
澳门网络安全法疑问
云服务公司服务器报废
金晴云华服务器
中国台湾餐饮软件开发需要多少钱
怎么合并三个网站数据库
环县网络安全进展
互联网科技融合公司
芜湖网线网络技术公司
吉林博兴服务器
网页制作服务器域名
2020网络技术考试真题
数据库期末上机考试
车载网络技术服务