千家信息网

Spark Streaming运行流程是怎样的

发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,本篇内容介绍了"Spark Streaming运行流程是怎样的"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学
千家信息网最后更新 2025年12月01日Spark Streaming运行流程是怎样的

本篇内容介绍了"Spark Streaming运行流程是怎样的"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

通过下面的一个简单的例子来理解spark streaming

object OnlineForeachRDD2DB {  def main(args: Array[String]){    /*      * 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,      * 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置      * 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如      * 只有1G的内存)的初学者       *      */    val conf = new SparkConf() //创建SparkConf对象    conf.setAppName("OnlineForeachRDD") //设置应用程序的名称,在程序运行的监控界面可以看到名称//    conf.setMaster("spark://Master:7077") //此时,程序在Spark集群    conf.setMaster("local[6]")    //设置batchDuration时间间隔来控制Job生成的频率并且创建Spark Streaming执行的入口    val ssc = new StreamingContext(conf, Seconds(5))    val lines = ssc.socketTextStream("Master", 9999)    val words = lines.flatMap(_.split(" "))    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)    wordCounts.foreachRDD { rdd =>      rdd.foreachPartition { partitionOfRecords => {        // ConnectionPool is a static, lazily initialized pool of connections        val connection = ConnectionPool.getConnection()        partitionOfRecords.foreach(record => {          val sql = "insert into streaming_itemcount(item,count) values('" + record._1 + "'," + record._2 + ")"          val stmt = connection.createStatement();          stmt.executeUpdate(sql);        })        ConnectionPool.returnConnection(connection)  // return to the pool for future reuse      }      }    }    /**      * 在StreamingContext调用start方法的内部其实是会启动JobScheduler的Start方法,进行消息循环,在JobScheduler      * 的start内部会构造JobGenerator和ReceiverTacker,并且调用JobGenerator和ReceiverTacker的start方法:      *   1,JobGenerator启动后会不断的根据batchDuration生成一个个的Job      *   2,ReceiverTracker启动后首先在Spark Cluster中启动Receiver(其实是在Executor中先启动ReceiverSupervisor),在Receiver收到      *   数据后会通过ReceiverSupervisor存储到Executor并且把数据的Metadata信息发送给Driver中的ReceiverTracker,在ReceiverTracker      *   内部会通过ReceivedBlockTracker来管理接受到的元数据信息      * 每个BatchInterval会产生一个具体的Job,其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD      * 的DAG而已,从Java角度讲,相当于Runnable接口实例,此时要想运行Job需要提交给JobScheduler,在JobScheduler中通过线程池的方式找到一个      * 单独的线程来提交Job到集群运行(其实是在线程中基于RDD的Action触发真正的作业的运行),为什么使用线程池呢?      *   1,作业不断生成,所以为了提升效率,我们需要线程池;这和在Executor中通过线程池执行Task有异曲同工之妙;      *   2,有可能设置了Job的FAIR公平调度的方式,这个时候也需要多线程的支持;      *      */    ssc.start()    ssc.awaitTermination()  }}

"Spark Streaming运行流程是怎样的"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

运行 线程 程序 生成 信息 数据 方法 集群 配置 流程 不断 内容 名称 对象 方式 是在 更多 知识 中通 作业 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 公众号服务器哪里可以租 重庆电脑软件开发哪家实惠 c语言用啥函数输入数据库 数据库安全的课题论证 网络安全法律法规的解读 广州协手网络技术有限公司 流体模拟软件开发公司 重大网络安全事件处置记录表 数据库系统概率第三章课后答案 数据库中统计空值 上海新能源网络技术厂家现货 网页不可识别数据库格式 戴尔r510服务器 网络安全密钥的安全关键字 福建惠普服务器虚拟化迁移云主机 服务器开发用到的软件 句容市网络安全宣传周 浙江专业网络技术服务创新服务 101打印机服务器 网络安全手秒报教画 初中生学软件开发的学校 勒索病毒毒对数据库的影响 cmd进入数据库的密码 服务器 热插拔 数据库分库分表解决什么问题 网页不可识别数据库格式 网络安全事件处置时限要求 能源互联网 弘讯科技 软件开发项目订单模板 软件开发和服务协议
0