spark之master与worker通信模型讲解
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,通信模型架构图master 端代码import akka.actor.{Actor, ActorSystem, Props}import com.typesafe.config.ConfigFacto
千家信息网最后更新 2025年12月03日spark之master与worker通信模型讲解
通信模型架构图
master 端代码import akka.actor.{Actor, ActorSystem, Props}import com.typesafe.config.ConfigFactory// 需要导入这2个包 封装一些属性。class MasterActor extends Actor { //在开始之前调用一次 override def preStart(): Unit = { } //用于接收消息 override def receive: Receive = { case "started" => { println("Master has been started!") //进入这个分支,说明这个Master线程已经启动完成 } case "connecting" => { println("Master has been get connect from Worker!") println("a Worker Node has been register!") //返回消息给Worker sender() ! "connected" Thread.sleep(1000) } case "stoped" => { } }}object Demo01MasterActor { def main(args: Array[String]) { //设置MasterIP和端口 val masterHost = "localhost" val masterPort = "1234" //端口和IP封装到akka架构,获取一个属性配置文件 val conStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$masterHost" |akka.remote.netty.tcp.port = "$masterPort" """.stripMargin val config = ConfigFactory.parseString(conStr) val masterActorSystem = ActorSystem("MasterActorSystem", config) val masterActor = masterActorSystem.actorOf(Props[MasterActor], "MasterActor") masterActor ! "started" masterActorSystem.awaitTermination(); }}worker端代码import akka.actor.{Actor, ActorSelection, ActorSystem, Props}import com.typesafe.config.ConfigFactoryclass WorkerActor extends Actor { var masterURL: ActorSelection = null //启动Actor之前执行,做初始化工作 override def preStart(): Unit = { //配置访问Master的URL //MasterIP:localhost //MasterPort:8888(根据Master配置) //Master的 ActorSystem对象:MasterActorSystem、MasterActor masterURL = context.actorSelection("akka.tcp://MasterActorSystem@localhost:8888/user/MasterActor") } override def receive: Receive = { case "started" => { println("Worker has been started!") //进入这个分支,说明这个Worker线程已经启动完成 //可以去向Master注册 //请求和Master建立连接 masterURL ! "connecting" } case "connected" => { println("Worker 收到来自Master确认信息!") } case "stoped" => { } }}object Demo01WorkerActor { def main(args: Array[String]) { //初始化MastereIP和端口、WorkerIP和端口 // val masterHost = args(0) // val masterPort = args(1) // val workerHost = args(2) // val workePort = args(3) val masterHost = "localhost" val masterPort = "8888" val workerHost = "localhost" val workePort = "8889" //端口和IP封装到akka架构,获取一个属性配置文件 val conStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$workerHost" |akka.remote.netty.tcp.port = "$workePort" """.stripMargin val config = ConfigFactory.parseString(conStr) val workerActorSystem = ActorSystem("WorkerActorSystem", config) val workerActor = workerActorSystem.actorOf(Props[WorkerActor], "WorkerActor") workerActor ! "started" workerActorSystem.awaitTermination(); }}
端口
配置
属性
封装
代码
分支
文件
架构
消息
线程
模型
通信
信息
去向
对象
工作
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
人才成网络安全驱动力
意大利警察数据库
数据库怎样找出不同的数据库
网站里下载数据库
小学生近视数据库
罗斯文数据库数据表
2000人游戏服务器搭建
网信办网络安全审查办
群晖服务器文件夹
网络安全维护价格
网络安全指那些方面
山东硕博软件开发
工艺芯片服务器
java数据库密码加密解密
刺客信条连接服务器停止工作
网络安全这个话怎么办
美国今天的网络安全了吗
数据库日期有效性
工程应用软件开发技术百科
湖南出口外贸软件开发
昆明网络安全宣传周视频
维护网络安全方法设计
京东的网络安全措施和技术
海口直播软件开发多少钱
浦东新区网络技术服务产品
软件开发大专选择什么专业
北京后端软件开发程序员待遇
手机远程服务器已关闭是什么原因
东塔网络安全教学视频
简述数据库的安全性策略