(版本定制)第17课:Spark Streaming资源动态申请和动态控制消费速率原理剖析
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,本期内容:1、Spark Streaming资源动态分配2、Spark Streaming动态控制消费速率为什么需要动态?a)Spark默认情况下粗粒度的,先分配好资源再计算。对于Spark Stre
千家信息网最后更新 2025年12月03日(版本定制)第17课:Spark Streaming资源动态申请和动态控制消费速率原理剖析
本期内容:
1、Spark Streaming资源动态分配
2、Spark Streaming动态控制消费速率
为什么需要动态?
a)Spark默认情况下粗粒度的,先分配好资源再计算。对于Spark Streaming而言有高峰值和低峰值,但是他们需要的资源是不一样的,如果按照高峰值的角度的话,就会有大量的资源浪费。
b) Spark Streaming不断的运行,对资源消耗和管理也是我们要考虑的因素。
Spark Streaming资源动态调整的时候会面临挑战:
Spark Streaming是按照Batch Duration运行的,Batch Duration需要很多资源,下一次Batch Duration就不需要那么多资源了,调整资源的时候还没调整完Batch Duration运行就已经过期了。这个时候调整时间间隔。
Spark Streaming资源动态申请
1. 在SparkContext中默认是不开启动态资源分配的,但是可以通过手动在SparkConf中配置。
// Optionally scale number of executors dynamically based on workload. Exposed for testing.val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) { logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")}_executorAllocationManager =if (dynamicAllocationEnabled) {Some(new ExecutorAllocationManager(this, listenerBus, _conf)) } else { None }_executorAllocationManager.foreach(_.start())设置spark.dynamicAllocation.enabled参数为true
这里会通过实例化ExecutorAllocationManager对象来动态分配资源,其内部是有定时器会不断的去扫描Executor的情况,通过线程池的方式调用schedule()来完成资源动态分配。
/** * Register for scheduler callbacks to decide when to add and remove executors, and start * the scheduling task. */def start(): Unit = { listenerBus.addListener(listener)val scheduleTask = new Runnable() {override def run(): Unit = {try { schedule() //动态调整Executor分配数量 } catch {case ct: ControlThrowable =>throw ctcase t: Throwable => logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t) } } }executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)}private def schedule(): Unit = synchronized {val now = clock.getTimeMillis updateAndSyncNumExecutorsTarget(now) //更新Executor数量removeTimes.retain { case (executorId, expireTime) =>val expired = now >= expireTimeif (expired) {initializing = falseremoveExecutor(executorId) } !expired }}/** * Updates our target number of executors and syncs the result with the cluster manager. * * Check to see whether our existing allocation and the requests we've made previously exceed our * current needs. If so, truncate our target and let the cluster manager know so that it can * cancel pending requests that are unneeded. * * If not, and the add time has expired, see if we can request new executors and refresh the add * time. * * @return the delta in the target number of executors. */private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {val maxNeeded = maxNumExecutorsNeededif (initializing) {// Do not change our target while we are still initializing, // Otherwise the first job may have to ramp up unnecessarily0} else if (maxNeeded < numExecutorsTarget) {// The target number exceeds the number we actually need, so stop adding new // executors and inform the cluster manager to cancel the extra pending requestsval oldNumExecutorsTarget = numExecutorsTarget numExecutorsTarget = math.max(maxNeeded, minNumExecutors)numExecutorsToAdd = 1// If the new target has not changed, avoid sending a message to the cluster managerif (numExecutorsTarget < oldNumExecutorsTarget) { client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +s"$oldNumExecutorsTarget) because not all requested executors are actually needed") }numExecutorsTarget - oldNumExecutorsTarget } else if (addTime != NOT_SET && now >= addTime) {val delta = addExecutors(maxNeeded) logDebug(s"Starting timer to add more executors (to " +s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")addTime += sustainedSchedulerBacklogTimeoutS * 1000delta } else {0}}动态控制消费速率:
Spark Streaming提供了一种弹性机制,流进来的速度和处理速度的关系,是否来得及处理数据。如果不能来得及的话,他会自动动态控制数据流进来的速度,spark.streaming.backpressure.enabled参数设置。
资源
动态
分配
调整
控制
时候
速度
运行
速率
消费
不断
参数
峰值
情况
数据
数量
处理
内容
可以通过
因素
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
湖北网络技术开发案例
便民服务平台软件开发
江阴专注软件开发怎么样
商丘租房软件开发
易语言什么软件开发的
c2m软件开发企业
安部网络安全监察举报网站
vm虚拟机做服务器
维普中文生物医学数据库入口
点石网络技术
亲亲漫画服务器
数据库中的关系运算
性读书软件开发
服务器安装win10
科大软件开发公司
小公司管理多个服务器
a7m3连接电脑ftp服务器
丹东磐古网络技术
如何看待服务器被炸事件
美国什么时候颁布网络安全法案
归档型数据库有哪些
局域网服务器怎么设置分区
如何区分软件开发工程师
网络技术服务合同审查要点
数据库中间件服务器几台
苏州电子网络技术参考价格
软件开发质量管理体系说明
保证数据库安全方法
网络安全的口令攻击
软件开发的任务书