Akka学习 实现workcount
发表于:2025-12-04 作者:千家信息网编辑
千家信息网最后更新 2025年12月04日,package com.dcx.scala.actorimport akka.actor.{Actor, ActorRef, ActorSystem, Props}import scala.colle
千家信息网最后更新 2025年12月04日Akka学习 实现workcount
package com.dcx.scala.actorimport akka.actor.{Actor, ActorRef, ActorSystem, Props}import scala.collection.mutable.HashMapimport scala.collection.mutable.ListBufferimport scala.io.Source/** * 思路: * 要有个Server * 要有个Client去通信,client统计文本后把(qy,3)输出给Server;Server再把所有的qy聚合,放到ListBuffer中 */object AkkaWordCount {// 可变长List val list = new ListBuffer[HashMap[String,Int]] def main(args: Array[String]): Unit = {// 输入数据文本 val files: Array[String] = Array("D:\\tmp\\input\\word3.txt", "D:\\tmp\\input\\word3.txt", "D:\\tmp\\input\\word3.txt") //存放接收到的每个actor处理的结果数据 //存放有actor返回结果的Future数据 //拿ActorSystem是一个静态工厂 val weChatApp = ActorSystem("WeChatApp") //拿到两个Actor的通信地址 val akkaServerRef: ActorRef = weChatApp.actorOf(Props[AkkaServer],"jianjian1") val clientRef: ActorRef = weChatApp.actorOf(Props(new Client(akkaServerRef)),"jianjian") for (file <- files) { clientRef ! file }// 让该线程先睡一下,过早进入死循环会导致list没有3个,一直循环不出来 Thread.sleep(1000)// 如果list把三个文件都放满了,就退出循环 while(true){ if(list.size == 3){// 输出list println(list(list.size -1)) return } } }}//把每次聚合后的值都发送给AkkaServerclass Client(val serverRef:ActorRef) extends Actor { override def receive: Receive = { {// 偏函数 常用作模式匹配// case filePath: String => {//// map阶段// val list: List[String] = Source.fromFile(filePath).getLines().toList// val words: List[String] = list.flatMap(_.split(" "))// val res: Map[String, Int] = words.map((_, 1)).groupBy(_._1).mapValues(_.size)// //异步发送结果数据 res发送到Server,去模式匹配// serverRef ! res// } case filePath:String => { val list: List[String] = Source.fromFile(filePath).getLines().toList val words: List[String] = list.flatMap(_.split(" "))// 得出: (qy,3) 格式 val res: Map[String, Int] = words.map((_, 1)).groupBy(_._1).mapValues(_.size) serverRef ! res } } }}import scala.collection.mutable.HashMapclass AkkaServer extends Actor { private var hashMap: HashMap[String, Int] = new HashMap[String, Int] override def receive: Receive = { case context: Map[String, Int] =>{// (qy,3) context.map( (map:(String,Int)) => {// 聚合 val value: Any = hashMap.getOrElse(map._1,None) if(value != None){ hashMap(map._1) = value.asInstanceOf[Int] + map._2 }else{ hashMap(map._1) = map._2 } } ) AkkaWordCount.list += hashMap } }}
数据
结果
文本
通信
两个
地址
工厂
思路
格式
模式
通信地址
阶段
静态
可变
处理
统计
输入
输出
学习
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全测评师职责
腾讯wifi管家使用网络安全
惠普的服务器
软件开发办公室怎么设计
普通发票出现数据库连接
魔域服务器网关工具
地理数据库实时更新
pk数据库啥意思
阿里云服务器怎么挂网店
机房 网络安全应急预案
网络安全发展前沿热议
华为 服务器 ppt
ps4如何清除游戏数据库
2020网络安全日题目
天津大学国家网络安全宣传周
ssr酸酸乳服务器订阅
计算机网络技术专科有前途吗
服务器安全加固系统工作原理
网络技术学什么最赚钱
网页的服务器如何配置
wow大服务器实装
打造一支网络安全 2018
中医药期刊文献数据库
东忠软件开发公司招聘信息
远程连接阿里云数据库
腾讯服务器被挖光纤
多媒体软件开发平台安装
工行财资云合作软件开发商
瑞丽服务器显卡联系方式
软件开发外汇数据来源