千家信息网

Actor并行化的wordcount怎么实现

发表于:2025-11-15 作者:千家信息网编辑
千家信息网最后更新 2025年11月15日,本篇内容介绍了"Actor并行化的wordcount怎么实现"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有
千家信息网最后更新 2025年11月15日Actor并行化的wordcount怎么实现

本篇内容介绍了"Actor并行化的wordcount怎么实现"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

在scala中她能实现很强大的功能,他是基于并发机制的一个事件模型

我们现在学的scala2.10.x版本就是之前的Actor

同步:在主程序上排队执行的任务,只有前一个任务执行完毕后,才能执行下一个任务

异步:指不进入主程序,而进入"任务对列"的任务,只有等主程序任务执行完毕,"任务对列"开始请求主程序,请求任务执行,该任务会进入主程序

java

共享变量 -- 加锁

会出现锁死问题

scala

Actor不共享数据

没有锁的概念

Actor通信之间需要message(通信)

Aactor执行顺序

1.首先调用start()方法启动Actor

2.调用start()方法后act()方法会被执行

3.Actor之间进行发送消息

Actor发送消息的三种方式

! -> 发送异步消息,没有返回值

!? -> 发送同步消息,有返回值,会有线程等待

!! -> 发送异步消息,有返回值,返回值类型Future[Any](用来获取异步操作结果)

Actor并行执行

//注意,这两个actor会并行执行,当其中一个for循环结束后,actor结束

object ActorDemo01 {

def main(args: Array[String]): Unit = {

MyActor1.start()

MyActor2.start()

}

}

object MyActor1 extends Actor{

override def act(): Unit = {

for (i <- 1 to 10){

println(s"actor => $i")

Thread.sleep(2000)

}

}

object MyActor2 extends Actor{

override def act(): Unit = {

for (i <- 1 to 5){

println(s"actor2 => $i")

Thread.sleep(2000)

}

}

}

}

用Actor不断接受消息

执行第一种方式,异步

object ActorDemo02 {

def main(args: Array[String]): Unit = {

val actor: MyActor = new MyActor

actor.start()

//并行执行

actor ! "start" // !->异步

actor ! "stop"

println("发送完成")

}

}

class MyActor extends Actor{

override def act(): Unit = {

while (true){ //死循环

receive { //接收

case "start" => {

println("starting")

Thread.sleep(1000)

println("started")

}

case "stop" => {

println("stopping")

Thread.sleep(1000)

println("stopped")

}

}

}

}

}

第二种方式:利用react来代替receive,也就是说react线程可复用,比receive更高效

object ActorDemo03 {

def main(args: Array[String]): Unit = {

val actor: MyActor3 = new MyActor3

actor.start()

actor ! "start"

actor ! "stop"

println("成功了")

}

}

class MyActor3 extends Actor{

override def act(): Unit = {

loop {

react{

case "start" =>{

println("starting")

Thread.sleep(1000)

println("sarted")

}

case "stop" =>{

println("stoppting")

Thread.sleep(1000)

println("stopped")

}

}

}

}

}

结合样例类练习Actor发送消息

//创建样例类

case class AsyncMsg(id: Int, msg: String)

case class SyncMsg(id: Int, msg: String)

case class ReplyMsg(id: Int, msg: String)

object ActorDemo01 extends Actor {

override def act(): Unit = {

while (true) {

receive {

case "start" => println("starting...")

case AsyncMsg(id, msg) =>

{

println(s"id:$id,msg:$msg")

sender ! ReplyMsg(1,"sucess") //接收到消息后返回响应消息

}

case SyncMsg(id,msg) => {

println(s"id:$id,msg:$msg")

sender ! ReplyMsg(2,"sucess")

}

}

}

}

}

object ActorTest{

def main(args: Array[String]): Unit = {

val actor: Actor = ActorDemo01.start()

// //异步发送消息,没有返回值

// actor ! AsyncMsg(3,"heihei")

// println("异步消息发送完成,没有返回值")

// //同步发送消息,有返回值

// val text: Any = actor !? SyncMsg(4,"OK")

// println(text)

// println("同步消息发送成功")

//异步发送消息,有返回值,返回类型为Future[Any]

val reply: Future[Any] = actor !! SyncMsg(5,"OK is 不存在的")

Thread.sleep(2000)

if (reply.isSet){

val applyMsg: Any = reply.apply()

println(applyMsg)

}else{

println("Nothing")

}

}

}

Actor并行化的wordcount

class Task extends Actor {

override def act(): Unit = {

loop {

react {

case SubmitTask(fileName) => {

val contents = Source.fromFile(new File(fileName)).mkString

val arr = contents.split("\r\n")

val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.length)

//val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2))

sender ! ResultTask(result)

}

case StopTask => {

exit()

}

}

}

}

}

object WorkCount {

def main(args: Array[String]) {

val files = Array("c://words.txt", "c://words.log")

val replaySet = new mutable.HashSet[Future[Any]]

val resultList = new mutable.ListBuffer[ResultTask]

for(f <- files) {

val t = new Task

val replay = t.start() !! SubmitTask(f)

replaySet += replay

}

while(replaySet.size > 0){

val toCumpute = replaySet.filter(_.isSet)

for(r <- toCumpute){

val result = r.apply()

resultList += result.asInstanceOf[ResultTask]

replaySet.remove(r)

}

Thread.sleep(100)

}

val finalResult = resultList.map(_.result).flatten.groupBy(_._1).mapValues(x => x.foldLeft(0)(_ + _._2))

println(finalResult)

}

}

case class SubmitTask(fileName: String)

case object StopTask

case class ResultTask(result: Map[String, Int])

"Actor并行化的wordcount怎么实现"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

消息 任务 主程序 同步 方式 方法 成功 之间 内容 只有 更多 知识 类型 线程 循环 通信 实用 强大 学有所成 接下来 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 弦米网络技术 敏捷软件开发讲解 淮南旧服务器回收今日报价 长春网络安全特训营 仙居县嗨玩网络技术工作室 网络安全法中应当遵循 新中大 连接数据库 社团管理系统软件开发计划 巨杉数据库上班时间 最大的cg渲染服务器 服务器安装forge 上海乐五互联网科技有限公司 微信数据库基地在哪里 华三服务器交换机不亮灯 数据库能再次备份之前的数据 有关计算机网络安全 微博5e数据库 网络安全金融行业市场概述 服务器安全狗ip黑名单怎么弄 德宏网络安全前途 微信红包定位软件开发定制 深圳大腕互联网科技有限公司 网络安全和信息化期刊投稿 给数据库中数据确定展示顺序 服务器可以组装双硬盘吗 联通怎么开虚拟服务器 苹果手机邮箱连接服务器失败 网络安全宣传答题答案 汉王考勤管理系统服务器登录 广电网络技术部年终个人工作总结
0