基于akka怎样实现RPC
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这期内容当中小编将会给大家带来有关基于akka怎样实现RPC,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。目前的工作在基于akka实现数据服务总线,Akka 2.3
千家信息网最后更新 2025年12月02日基于akka怎样实现RPC
这期内容当中小编将会给大家带来有关基于akka怎样实现RPC,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
目前的工作在基于akka实现数据服务总线,Akka 2.3中提供了 Cluster Sharing(分片集群)和Persistence功能可以很简单的写出一个大型的分布式集群的架构。里面的一块功能就是RPC(远程过程调用),这篇文章将会介绍一种实现方式。akka rpc java目录[-]akka-rpc(基于akka的rpc的实现)RPC实现原理Server端核心代码Client端核心代码 Demoakka-rpc(基于akka的rpc的实现)代码:http://git.oschina.net/for-1988/Simples目前的工作在基于akka(java)实现数据服务总线,Akka 2.3中提供了 Cluster Sharing(分片集群)和Persistence功能可以很简单的写出一个大型的分布式集群的架构。里面的一块功能就是RPC(远程过程调用)。RPC远程过程调用(Remote Procedure Call,RPC)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用,例:Java RMI。实现原理整个RPC的调用过程完全基于akka来传递对象,因为需要进行网络通信,所以我们的接口实现类、调用参数以及返回值都需要实现java序列化接口。客户端跟服务端其实都是在一个Akka 集群关系中,Client跟Server都是集群中的一个节点。首先Client需要初始化RpcClient对象,在初始化的过程中,我们启动了AkkaSystem,加入到整个集群中,并创建了负责与Server进行通信的Actor。然后通过RpcClient中的getBean(Classclz)方法获取Server端的接口实现类的实例对象,然后通过动态代理拦截这个对象的所有方法。最后,在执行方法的时候,在RpcBeanProxy中向Server发送CallMethod事件,执行远程实现类的方法,获取返回值给Client。Server端核心代码public class RpcServer extends UntypedActor { private Map proxyBeans; public RpcServer(Map , Object> beans) { proxyBeans = new HashMap (); for (Iterator > iterator = beans.keySet().iterator(); iterator .hasNext();) { Class> inface = iterator.next(); proxyBeans.put(inface.getName(), beans.get(inface)); } } @Override public void onReceive(Object message) throws Exception { if (message instanceof RpcEvent.CallBean) { //返回Server端的接口实现类的实例 CallBean event = (CallBean) message; ReturnBean bean = new ReturnBean( proxyBeans.get(event.getBeanName()), getSelf()); getSender().tell(bean, getSelf()); } else if (message instanceof RpcEvent.CallMethod) { CallMethod event = (CallMethod) message; Object bean = proxyBeans.get(event.getBeanName()); Object[] params = event.getParams(); List > paraTypes = new ArrayList >(); Class>[] paramerTypes = new Class>[] {}; if (params != null) { for (Object param : params) { paraTypes.add(param.getClass()); } } Method method = bean.getClass().getMethod(event.getMethodName(), paraTypes.toArray(paramerTypes)); Object o = method.invoke(bean, params); getSender().tell(o, getSelf()); } }}启动Serverpublic static void main(String[] args) { final Config config = ConfigFactory .parseString("akka.remote.netty.tcp.port=" + 2551) .withFallback( ConfigFactory .parseString("akka.cluster.roles = [RpcServer]")) .withFallback(ConfigFactory.load()); ActorSystem system = ActorSystem.create("EsbSystem", config); // Server 加入发布的服务 Map , Object> beans = new HashMap , Object>(); beans.put(ExampleInterface.class, new ExampleInterfaceImpl()); system.actorOf(Props.create(RpcServer.class, beans), "rpcServer"); }Client端核心代码 RpcClient类型集成了Thread,为了解决一个问题:因为AkkaSystem在加入集群中的时候是异步的,所以我们在第一次new RpcClient对象的时候需要等待加入集群成功以后,才可以执行下面的方法,不然获取的 /user/rpcServer Route中没有Server的Actor,请求会失败。public class RpcClient extends Thread { private ActorSystem system; private ActorRef rpc; private ActorRef clientServer; private static RpcClient instance = null; public RpcClient() { this.start(); final Config config = ConfigFactory .parseString("akka.remote.netty.tcp.port=" + 2552) .withFallback( ConfigFactory .parseString("akka.cluster.roles = [RpcClient]")) .withFallback(ConfigFactory.load()); system = ActorSystem.create("EsbSystem", config); int totalInstances = 100; Iterable routeesPaths = Arrays.asList("/user/rpcServer"); boolean allowLocalRoutees = false; ClusterRouterGroup clusterRouterGroup = new ClusterRouterGroup( new AdaptiveLoadBalancingGroup( HeapMetricsSelector.getInstance(), Collections. emptyList()), new ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, "RpcServer")); rpc = system.actorOf(clusterRouterGroup.props(), "rpcCall"); clientServer = system.actorOf(Props.create(RpcClientServer.class, rpc), "client"); Cluster.get(system).registerOnMemberUp(new Runnable() { //加入集群成功后的回调事件,恢复当前线程的中断 @Override public void run() { synchronized (instance) { System.out.println("notify"); instance.notify(); } } }); } public static RpcClient getInstance() { if (instance == null) { instance = new RpcClient(); synchronized (instance) { try { //中断当前线程,等待加入集群成功后,恢复 System.out.println("wait"); instance.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } return instance; } public T getBean(Class clz) { Future
上述就是小编为大家分享的基于akka怎样实现RPC了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。
集群
方法
对象
过程
代码
功能
接口
核心
服务
成功
就是
时候
端的
计算机
通信
事件
内容
分布式
动态
原理
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
广州智慧电梯软件开发费用
服务器贵不贵
网络安全灵域 把已经被发现
计算机专业有软件开发吗
广州太平洋网络技术公司地址
千叶互联网科技短视频变现
网络安全类招聘
交通网络安全网络格言
网络安全数据保护系统
app区块链软件开发
arcgis数据库怎么删除
中国网络安全张硕
我的世界服务器图片信息
高并发是一个数据库
河源教师网络技术基础知识
七宇互联网科技
陕西有没有lol服务器云空间
中科院软件开发厉害吗
广州博纳斯互联网科技公司
软件开发岗位实习任务安排
网络安全社会工程学是啥意思
网络安全审查是底线
网络安全故障
网络安全防控研究
宁波营销软件开发流程
邮件服务器协议是
h5创建本地数据库
上海社交软件开发外包
服务器存储数据用关系型数据库
软件开发同步服务器