千家信息网

(版本定制)第10课:Spark Streaming源码解读

发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,本期内容:1、数据接收架构设计模式2、数据接收源码彻底研究1、Receiver接受数据的过程类似于MVC模式:Receiver,ReceiverSupervisor和Driver的关系相当于Model
千家信息网最后更新 2025年12月02日(版本定制)第10课:Spark Streaming源码解读

本期内容:

1、数据接收架构设计模式

2、数据接收源码彻底研究


1、Receiver接受数据的过程类似于MVC模式:

Receiver,ReceiverSupervisor和Driver的关系相当于Model,Control,View,也就是MVC。

Model就是Receiver,存储数据Control,就是ReceiverSupervisor,Driver是获得元数据,也就是View。

2、数据的位置信息会被封装到RDD里面。

3、Receiver接受数据,交给ReceiverSupervisor去存储数据。

4、ReceiverTracker是通过发送一个又一个的Job,每个Job只有一个Task,每个Task里面就只有一个ReceiverSupervisor,用这个函数启动每一个Receiver。


下面我们简单的看下Receiver启动流程,应用程序首先通过JobScheduler的start方法来启动receiverTracker的start方法:

def start(): Unit = synchronized {if (eventLoop != null) return // scheduler has already been startedlogDebug("Starting JobScheduler")eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)  }eventLoop.start()// attach rate controllers of input streams to receive batch completion updatesfor {    inputDStream <- ssc.graph.getInputStreams    rateController <- inputDStream.rateController} ssc.addStreamingListener(rateController)listenerBus.start(ssc.sparkContext)receiverTracker = new ReceiverTracker(ssc)inputInfoTracker = new InputInfoTracker(ssc)receiverTracker.start() //receiver启动jobGenerator.start()  logInfo("Started JobScheduler")}

通过调用receiverTracker.start()方法来进行一系列的操作:

/** Start the endpoint and receiver execution thread. */def start(): Unit = synchronized {if (isTrackerStarted) {throw new SparkException("ReceiverTracker already started")  }if (!receiverInputStreams.isEmpty) {endpoint = ssc.env.rpcEnv.setupEndpoint("ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) //Rpc消息通信,获取receiver的状态if (!skipReceiverLaunch) launchReceivers() //启动receiver    logInfo("ReceiverTracker started")trackerState = Started}}

下面通过画图简单的描述下Receiver启动的内部机制:


参考博客:http://blog.csdn.net/hanburgud/article/details/51471047

http://lqding.blog.51cto.com/9123978/1774426

0