Scala笔记整理(九):Actor和AKKA
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,[TOC]概述 Scala的Actor有点类似于Java中的多线程编程。但是不同的是,Scala的Actor提供的模型与多线程有所不同。Scala的Actor尽可能地避免锁和共享状态,从而避免多线程
千家信息网最后更新 2025年12月02日Scala笔记整理(九):Actor和AKKA
[TOC]
概述
Scala的Actor有点类似于Java中的多线程编程。但是不同的是,Scala的Actor提供的模型与多线程有所不同。Scala的Actor尽可能地避免锁和共享状态,从而避免多线程并发时出现资源争用的情况,进而提升多线程编程的性能。
Spark中使用的分布式多线程框架,是Akka,是Scala的一种多线程的类库。Akka也实现了类似Scala Actor的模型,其核心概念同样也是Actor。Scala Actor模型已经在2.1.0的时候还在用,但是在2.1.1的时候已经被遗弃了,Spark开始转换用AKKA来替代Scala Actor,但是Scala Actor概念和原理都还是相同的。所以学习Scala Actor对我们学习AKKA,Spark还是有所帮助的
之所以学习Scala Actor,AKKA是因为在学习Spark源码的时,我们能看懂Spark的源码,因为在底层消息传递机制上大量使用AKKA的传送机制。
scala actor
在使用前,需要先引入maven依赖:
org.scala-lang scala-actors 2.10.5 actor单向通信
测试代码如下:
package cn.xpleaf.bigdata.p5.myactorimport scala.actors.Actor/** * 学习scala actor的基本操作 * 和java中的Runnable Thread几乎一致 * * 第一步:编写一个类,扩展特质trait Actor(scala 的actor) * 第二步:复写其中的act方法 * 第三步:创建该actor的对象,调用该对象的start()方法,启动该线程 * 第四步:通过scala的操作符"!",发送消息 * 第五步:结束的话,调用close即可 * * 模拟单向打招呼 */object ActorOps { def main(args: Array[String]): Unit = { val mFActor = new MyFirstActor() mFActor.start() // 发送消息 mFActor ! "小美,睡了吗?" mFActor ! "我去洗澡了~" mFActor ! "呵呵" }}class MyFirstActor extends Actor { override def act(): Unit = { while(true) { receive { case str: String => println(str) } } }}输出结果如下:
小美,睡了吗?我去洗澡了~呵呵使用样例类(case class)进行actor消息传递
测试代码如下:
package cn.xpleaf.bigdata.p5.myactorimport scala.actors.Actor/** * */object GreetingActor { def main(args: Array[String]): Unit = { val ga = new GreetingActor ga.start() ga ! Greeting("小美") ga ! WorkContent("装系统") }}case class Greeting(name:String)case class WorkContent(content:String)class GreetingActor extends Actor { override def act(): Unit = { while(true) { receive { case Greeting(name) => println(s"Hello, $name") case WorkContent(content) => println(s"Let's talk about sth. with $content") } } }}输出结果如下:
Hello, 小美Let's talk about sth. with 装系统actor相互通信
测试代码如下:
package cn.xpleaf.bigdata.p5.myactorimport scala.actors.Actor/** * actor之线程间,互相通信 * * studentActor * 向老师问了一个问题 * * teacherActor * 向学生做回应 * * 通信的协议: * 请求,使用Request(内容)来表示 * 响应,使用Response(内容)来表示 */object _03CommunicationActorOps { def main(args: Array[String]): Unit = { val teacherActor = new TeacherActor() teacherActor.start() val studentActor = new StudentActor(teacherActor) studentActor.start() studentActor ! Request("老李啊,scala学习为什么这么难啊") }}case class Request(req:String)case class Response(resp:String)class StudentActor(teacherActor: TeacherActor) extends Actor { override def act(): Unit = { while(true) { receive { case Request(req) => { // 向老师请求相关的问题 println("学生向老师说:" + req) teacherActor ! Request(req) } case Response(resp) => { println(resp) println("高!") } } } }}class TeacherActor() extends Actor { override def act(): Unit = { while (true) { receive { case Request(req) => { // 接收到学生的请求 sender ! Response("这个问题,需要如此搞定~") } } } }}输出结果如下:
学生向老师说:老李啊,scala学习为什么这么难啊这个问题,需要如此搞定~高!消息的同步和Future
1、Scala在默认情况下,消息都是以异步进行发送的;但是如果发送的消息是同步的,即对方接受后,一定要给自己返回结果,那么可以使用!?的方式发送消息。即:
val response= activeActor !? activeMessage2、如果要异步发送一个消息,但是在后续要获得消息的返回值,那么可以使用Future。即!!语法,如下:
val futureResponse = activeActor !! activeMessageval activeReply = future()AKKA actor
首先需要添加akka的maven依赖:
com.typesafe.akka akka-actor_2.10 2.3.16 com.typesafe.akka akka-remote_2.10 2.3.16 com.typesafe.akka akka-slf4j_2.10 2.3.16 AKKA消息传递--本地
原理如下:
_01StudentActorOps
package cn.xpleaf.bigdata.p5.myakka.p1import akka.actor.{Actor, ActorSystem, Props}import cn.xpleaf.bigdata.p5.myakka.MessageProtocol.{QuoteRequest, QuoteResponse}import scala.util.Random/** * 基于AKKA Actor的单向通信案例 * 学生向老师发送请求 */object _01StudentActorOps { def main(args: Array[String]): Unit = { // 第一步:构建Actor操作系统 val actorSystem = ActorSystem("StudentActorSystem") // 第二步:actorSystem创建TeacherActor的代理对象ActorRef val teacherActorRef = actorSystem.actorOf(Props[TeacherActor]) // 第三步:发送消息 teacherActorRef ! QuoteRequest() Thread.sleep(2000) // 第四步:关闭 actorSystem.shutdown() }}class TeacherActor extends Actor { val quotes = List( "Moderation is for cowards", "Anything worth doing is worth overdoing", "The trouble is you think you have time", "You never gonna know if you never even try") override def receive = { case QuoteRequest() => { val random = new Random() val randomIndex = random.nextInt(quotes.size) val randomQuote = quotes(randomIndex) val response = QuoteResponse(randomQuote) println(response) } }}MessageProtocol
后面akka通信的几个测试程序都会使用到这个object,只在这里给出,后面不再给出。
package cn.xpleaf.bigdata.p5.myakka/** * akka actor通信协议 */object MessageProtocol { case class QuoteRequest() case class QuoteResponse(resp: String) case class InitSign()}object Start extends Serializableobject Stop extends Serializabletrait Message { val id: String}case class Shutdown(waitSecs: Int) extends Serializablecase class Heartbeat(id: String, magic: Int) extends Message with Serializablecase class Header(id: String, len: Int, encrypted: Boolean) extends Message with Serializablecase class Packet(id: String, seq: Long, content: String) extends Message with Serializable测试
输出结果如下:
QuoteResponse(Anything worth doing is worth overdoing)AKKA请求与响应--本地
原理如下:
TeacherActor
package cn.xpleaf.bigdata.p5.myakka.p2import akka.actor.Actorimport cn.xpleaf.bigdata.p5.myakka.MessageProtocol.{QuoteRequest, QuoteResponse}import scala.util.Random/** * Teacher Actor */class TeacherActor extends Actor { val quotes = List( "Moderation is for cowards", "Anything worth doing is worth overdoing", "The trouble is you think you have time", "You never gonna know if you never even try") override def receive = { case QuoteRequest() => { val random = new Random() val randomIndex = random.nextInt(quotes.size) val randomQuote = quotes(randomIndex) val response = QuoteResponse(randomQuote)// println(response) sender ! response } }}StudentActor
package cn.xpleaf.bigdata.p5.myakka.p2import akka.actor.{Actor, ActorLogging, ActorRef}import cn.xpleaf.bigdata.p5.myakka.MessageProtocol.{InitSign, QuoteRequest, QuoteResponse}/** * Student Actor * 当学生接收到InitSign信号之后,便向老师发送一条Request请求的消息 */class StudentActor(teacherActorRef:ActorRef) extends Actor with ActorLogging { override def receive = { case InitSign => { teacherActorRef ! QuoteRequest()// println("student send request") } case QuoteResponse(resp) => { log.info(s"$resp") } }}DriverApp
package cn.xpleaf.bigdata.p5.myakka.p2import akka.actor.{ActorSystem, Props}import cn.xpleaf.bigdata.p5.myakka.MessageProtocol.InitSignobject DriverApp { def main(args: Array[String]): Unit = { val actorSystem = ActorSystem("teacherStudentSystem") // 老师的代理对象 val teacherActorRef = actorSystem.actorOf(Props[TeacherActor], "teacherActor") // 学生的代理对象 val studentActorRef = actorSystem.actorOf(Props[StudentActor](new StudentActor(teacherActorRef)), "studentActor") studentActorRef ! InitSign Thread.sleep(2000) actorSystem.shutdown() }}测试
输出结果如下:
[INFO] [04/24/2018 10:02:19.932] [teacherStudentSystem-akka.actor.default-dispatcher-2] [akka://teacherStudentSystem/user/studentActor] Anything worth doing is worth overdoingAKKA请求与响应--远程
application.conf
MyRemoteServerSideActor { akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 2552 } } }}MyRemoteClientSideActor { akka { actor { provider = "akka.remote.RemoteActorRefProvider" } }}RemoteActor
package cn.xpleaf.bigdata.p5.myakka.p3import akka.actor.{Actor, ActorLogging}import cn.xpleaf.bigdata.p5.myakka.{Header, Shutdown, Start, Stop}class RemoteActor extends Actor with ActorLogging { def receive = { case Start => { // 处理Start消息 log.info("Remote Server Start ==>RECV Start event : " + Start) } case Stop => { // 处理Stop消息 log.info("Remote Server Stop ==>RECV Stop event: " + Stop) } case Shutdown(waitSecs) => { // 处理Shutdown消息 log.info("Remote Server Shutdown ==>Wait to shutdown: waitSecs=" + waitSecs) Thread.sleep(waitSecs) log.info("Remote Server Shutdown ==>Shutdown this system.") context.system.shutdown // 停止当前ActorSystem系统 } case Header(id, len, encrypted) => log.info("Remote Server => RECV header: " + (id, len, encrypted)) // 处理Header消息 case _ => }}AkkaServerApplication
package cn.xpleaf.bigdata.p5.myakka.p3import akka.actor.{ActorSystem, Props}import com.typesafe.config.ConfigFactoryobject AkkaServerApplication extends App { // 创建名称为remote-system的ActorSystem:从配置文件application.conf中获取该Actor的配置内容 val system = ActorSystem("remote-system", ConfigFactory.load().getConfig("MyRemoteServerSideActor")) val log = system.log log.info("===>Remote server actor started: " + system) // 创建一个名称为remoteActor的Actor,返回一个ActorRef,这里我们不需要使用这个返回值 system.actorOf(Props[RemoteActor], "remoteActor")}ClientActor
package cn.xpleaf.bigdata.p5.myakka.p3import akka.actor.SupervisorStrategy.Stopimport akka.actor.{Actor, ActorLogging}import cn.xpleaf.bigdata.p5.myakka.{Header, Start}class ClientActor extends Actor with ActorLogging { // akka.://@:/ val path = "akka.tcp://remote-system@127.0.0.1:2552/user/remoteActor" // 远程Actor的路径,通过该路径能够获取到远程Actor的一个引用 val remoteServerRef = context.actorSelection(path) // 获取到远程Actor的一个引用,通过该引用可以向远程Actor发送消息 @volatile var connected = false @volatile var stop = false def receive = { case Start => { // 发送Start消息表示要与远程Actor进行后续业务逻辑处理的通信,可以指示远程Actor初始化一些满足业务处理的操作或数据 send(Start) if (!connected) { connected = true log.info("ClientActor==> Actor connected: " + this) } } case Stop => { send(Stop) stop = true connected = false log.info("ClientActor=> Stopped") } case header: Header => { log.info("ClientActor=> Header") send(header) } case (seq, result) => log.info("RESULT: seq=" + seq + ", result=" + result) // 用于接收远程Actor处理一个Packet消息的结果 case m => log.info("Unknown message: " + m) } private def send(cmd: Serializable): Unit = { log.info("Send command to server: " + cmd) try { remoteServerRef ! cmd // 发送一个消息到远程Actor,消息必须是可序列化的,因为消息对象要经过网络传输 } catch { case e: Exception => { connected = false log.info("Try to connect by sending Start command...") send(Start) } } }} AkkaClientApplication
package cn.xpleaf.bigdata.p5.myakka.p3import akka.actor.{ActorSystem, Props}import cn.xpleaf.bigdata.p5.myakka.{Header, Start}import com.typesafe.config.ConfigFactoryobject AkkaClientApplication extends App { // 通过配置文件application.conf配置创建ActorSystem系统 val system = ActorSystem("client-system", ConfigFactory.load().getConfig("MyRemoteClientSideActor")) val log = system.log val clientActor = system.actorOf(Props[ClientActor], "clientActor") // 获取到ClientActor的一个引用 clientActor ! Start // 发送一个Start消息,第一次与远程Actor握手(通过本地ClientActor进行转发) Thread.sleep(2000) clientActor ! Header("What's your name: Can you tell me ", 20, encrypted = false) // 发送一个Header消息到远程Actor(通过本地ClientActor进行转发) Thread.sleep(2000)}测试
服务端输出结果如下:
[INFO] [04/24/2018 09:39:49.271] [main] [Remoting] Starting remoting[INFO] [04/24/2018 09:39:49.508] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://remote-system@127.0.0.1:2552][INFO] [04/24/2018 09:39:49.509] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://remote-system@127.0.0.1:2552][INFO] [04/24/2018 09:39:49.517] [main] [ActorSystem(remote-system)] ===>Remote server actor started: akka://remote-system[INFO] [04/24/2018 09:46:01.872] [remote-system-akka.actor.default-dispatcher-3] [akka.tcp://remote-system@127.0.0.1:2552/user/remoteActor] Remote Server Start ==>RECV Start event : cn.xpleaf.bigdata.p5.myakka.Start$@325737b3[INFO] [04/24/2018 09:46:03.501] [remote-system-akka.actor.default-dispatcher-3] [akka.tcp://remote-system@127.0.0.1:2552/user/remoteActor] Remote Server => RECV header: (What's your name: Can you tell me ,20,false)客户端输出结果如下:
[INFO] [04/24/2018 09:46:01.274] [main] [Remoting] Starting remoting[INFO] [04/24/2018 09:46:01.479] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://client-system@192.168.43.132:2552][INFO] [04/24/2018 09:46:01.480] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://client-system@192.168.43.132:2552][INFO] [04/24/2018 09:46:01.493] [client-system-akka.actor.default-dispatcher-4] [akka.tcp://client-system@192.168.43.132:2552/user/clientActor] Send command to server: cn.xpleaf.bigdata.p5.myakka.Start$@4f00805d[INFO] [04/24/2018 09:46:01.496] [client-system-akka.actor.default-dispatcher-4] [akka.tcp://client-system@192.168.43.132:2552/user/clientActor] ClientActor==> Actor connected: cn.xpleaf.bigdata.p5.myakka.p3.ClientActor@5a85b576[INFO] [04/24/2018 09:46:03.490] [client-system-akka.actor.default-dispatcher-2] [akka.tcp://client-system@192.168.43.132:2552/user/clientActor] ClientActor=> Header[INFO] [04/24/2018 09:46:03.491] [client-system-akka.actor.default-dispatcher-2] [akka.tcp://client-system@192.168.43.132:2552/user/clientActor] Send command to server: Header(What's your name: Can you tell me ,20,false)
消息
结果
线程
通信
学生
老师
处理
学习
测试
输出
对象
系统
问题
小美
配置
代码
内容
单向
原理
模型
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
微软数据库的设计与实现
媒体服务器 唤醒
成都哪些软件开发是国企
软件开发公司老牌子
网络技术f5是什么意思
香港云服务器浏览youtube
网络安全信息有哪些
软件开发托管
联想服务器sr588驱动安装
微软服务器放在哪里最合适
内蒙古创新网络技术服务怎么样
2k17打不开服务器
客服端服务器不一致
昆明网络安全保障培训机构
如何做好公司级网络安全
电子证件安全服务器 没电
做vfp数据库
机票旅游app软件开发
上海名创网络技术有限公司
武林外传的服务器叫什么
金融信息网络安全防范
服务器 百度
网络安全的儿童图画
松江区企业网络技术服务口碑推荐
重庆浪潮服务器虚拟化建设
我的世界服务器vip
广州纺织软件开发
软件开发中心功能测试岗
高并发数据库分库分表
上海名创网络技术有限公司