早些天看Hadoop源码的时候了解到Hadoop分布式环境中各个组件间的通信采用的RPC,由于暂无时间深入分析Hadoop中的RPC实现方式。参考网上资料学习跟例子实现简易RPC框架。
一、什么是RPC
RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。JDK内部提供RMI实现RPC(仅限于JAVA对象的远程调用)。
二、利用RMI实现RPC
想要了解RMI详细知识请查看本人另一编blog《如何发布RMI服务》。
1、首先定义RMI服务器接口
package org.bird.rmi; import java.io.Serializable; import java.rmi.AccessException; import java.rmi.NotBoundException; import java.rmi.Remote; import java.rmi.RemoteException; public interface RMIServer extends Remote,Serializable { static String DEFAULT_HOST = "127.0.0.1"; static int DEFAULT_PORT = 8888; /** * 注册本地对象 * @param interfaceDefiner * @param impl * @throws InstantiationException * @throws IllegalAccessException */ public void registerLocalObject(Class<?> interfaceDefiner, Class<?> impl) throws RemoteException, InstantiationException, IllegalAccessException; /** * 注册远程对象 * @param interfaceDefiner * @param impl * @throws RemoteException * @throws IllegalAccessException * @throws InstantiationException */ public void registerRemoteObject(Class<?> interfaceDefiner, Class<?> impl) throws RemoteException, InstantiationException, IllegalAccessException; /** * 卸载指定对象 * @param interfaceDefiner * @throws RemoteException * @throws NotBoundException * @throws ClassNotFoundException */ public void unregisterObject(Class<?> interfaceDefiner) throws RemoteException, NotBoundException, ClassNotFoundException; /** * 卸载所有对象 * @throws AccessException * @throws RemoteException * @throws NotBoundException * @throws ClassNotFoundException */ public void unregisterAllObjects() throws AccessException, RemoteException, NotBoundException, ClassNotFoundException; /** * 获取对象 * @param interfaceDefiner * @return */ public <T> T getObject(Class<T> interfaceDefiner) throws RemoteException; /** * 启动服务 * @throws RemoteException */ public void start() throws RemoteException; /** * 关闭服务器 * @param remote 是否同时关闭远程RMI服务 * @throws AccessException * @throws RemoteException * @throws NotBoundException * @throws ClassNotFoundException */ public void stop(boolean remote) throws AccessException, RemoteException, NotBoundException, ClassNotFoundException; /** * 关闭远程服务 * @throws RemoteException */ public void remoteStop() throws RemoteException; /** * 判断服务器运行状态 * @return * @throws RemoteException */ public boolean isRunning() throws RemoteException; }
2、接着实现RMI服务器相关功能的实现类
package org.bird.rmi; import java.rmi.AccessException; import java.rmi.ConnectException; import java.rmi.NotBoundException; import java.rmi.Remote; import java.rmi.RemoteException; import java.rmi.registry.LocateRegistry; import java.rmi.registry.Registry; import java.rmi.server.UnicastRemoteObject; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.bird.rmi.support.Listener; public class RMIServerImpl extends UnicastRemoteObject implements RMIServer { /** * */ private static final long serialVersionUID = 1L; private boolean isRuning; private Listener listener; /** 本地主机RMI注册表 */ private Registry localRegistry = null; /** 远程主机RMI注册表 */ private Registry remoteRegistry = null; /** 远程服务器 */ private RMIServer remoteServer = null ; /** 缓存本地对象实例 */ private Map<String,Object> registeredLocalObjectMap = new HashMap<String,Object>(); /** 缓存远程对象实例 */ private Map<String,Remote> registeredRemoteObjectMap = new HashMap<String,Remote>(); public RMIServerImpl() throws RemoteException { this(DEFAULT_HOST, DEFAULT_PORT); } public RMIServerImpl(int port) throws RemoteException { this(DEFAULT_HOST, port); } public RMIServerImpl(String cip, int cport) throws RemoteException { registerLocalServer(cip, cport); } public RMIServerImpl(String cip,int cport, String sip, int sport) throws RemoteException { getRemoteServer(sip, sport); registerLocalServer(cip, cport); } private void registerLocalServer(String cip, int cport) throws AccessException, RemoteException { System.setProperty("java.rmi.server.hostname", cip); if(localRegistry == null) { try { localRegistry = LocateRegistry.getRegistry(cip, cport); localRegistry.list(); } catch (Exception e) { try { localRegistry = LocateRegistry.createRegistry(cport);//在本地主机创建并导出RMI注册表 } catch (RemoteException e1) { throw new RuntimeException(e1); } } } localRegistry.rebind(getKey(cip, cport), this); } private void getRemoteServer(String sip, int sport) throws AccessException, RemoteException { if(null == sip || "".equals(sip.trim())){ return; } remoteRegistry = LocateRegistry.getRegistry(sip, sport); try { remoteServer = (RMIServer) remoteRegistry.lookup(getKey(sip, sport)); } catch (ConnectException e) { remoteRegistry = null; throw new RemoteException("无法获取远程服务:" + getKey(sip, sport)); } catch (NotBoundException e) { remoteRegistry = null; throw new RemoteException("无法获取远程服务:" + getKey(sip, sport)); } } public void registerLocalObject(Class<?> interfaceDefiner, Class<?> impl) throws InstantiationException, IllegalAccessException { Object instance = impl.newInstance(); registeredLocalObjectMap.put(interfaceDefiner.getName(), instance); System.out.println("注册本地对象:" + interfaceDefiner.getName()); } public void registerRemoteObject(Class<?> interfaceDefiner, Class<?> impl) throws RemoteException, InstantiationException, IllegalAccessException { Object object = impl.newInstance();//反射实例化 if(!(object instanceof Remote)){ throw new ClassCastException("cannot be cast to java.rmi.Remote"); } System.out.println("注册远程对象:" + interfaceDefiner.getName()); Remote instance = (Remote) object; registeredRemoteObjectMap.put(interfaceDefiner.getName(), instance); if (instance instanceof UnicastRemoteObject) { localRegistry.rebind(interfaceDefiner.getName(), instance);//更新对此注册表中指定name的远程引用。 if(remoteServer != null) { remoteServer.registerLocalObject(interfaceDefiner, impl);//对象实例直接推送到远程服务器端 } }else{ Remote stub = UnicastRemoteObject.exportObject(instance, 0); localRegistry.rebind(interfaceDefiner.getName(), stub);//更新对此注册表中指定name的远程引用。 if(remoteServer != null) { remoteServer.registerLocalObject(interfaceDefiner, stub.getClass());//对象实例直接推送到远程服务器端 } } } public void unregisterObject(Class<?> interfaceDefiner) throws AccessException, RemoteException, NotBoundException, ClassNotFoundException { unregisterObject(interfaceDefiner.getName()); } private void unregisterObject(String key) throws AccessException, RemoteException, NotBoundException, ClassNotFoundException { if(registeredLocalObjectMap.containsKey(key)) { registeredLocalObjectMap.remove(key); System.out.println("注销本地对象:" + key); }else if (registeredRemoteObjectMap.containsKey(key)) { localRegistry.unbind(key);//移除RMI注册表中的绑定 Remote remote = registeredRemoteObjectMap.get(key); if(remote != null) { UnicastRemoteObject.unexportObject(remote, true);//从RMI中移除远程对象 remote = null; } registeredRemoteObjectMap.remove(key); if(remoteServer != null) { remoteServer.unregisterObject(Class.forName(key));//将推送到远程服务器上对象实例注销 } System.out.println("注销远程对象:" + key); } } public void unregisterAllObjects() throws AccessException, RemoteException, NotBoundException, ClassNotFoundException { registeredLocalObjectMap.clear();//注销本地对象 List<String> keys = new ArrayList<String>(); for(String key : registeredRemoteObjectMap.keySet()) { keys.add(key); } for(String key : keys) { unregisterObject(key); } } public <T> T getObject(Class<T> interfaceDefiner) throws RemoteException { String key = interfaceDefiner.getName(); if(registeredLocalObjectMap.containsKey(key)) { System.out.println("调用本地对象:" + interfaceDefiner.getName()); return (T) registeredLocalObjectMap.get(key); } if(registeredRemoteObjectMap.containsKey(key)) { System.out.println("调用远程对象:" + interfaceDefiner.getName()); return (T) registeredRemoteObjectMap.get(key); } if(remoteServer != null) { return remoteServer.getObject(interfaceDefiner); } return null; } public void start() throws RemoteException { listener = new Listener(this); this.isRuning = true; listener.start(); } public void stop(boolean remote) throws AccessException, RemoteException, NotBoundException, ClassNotFoundException { unregisterAllObjects(); String[] names = localRegistry.list(); for(String name : names) { localRegistry.unbind(name); } UnicastRemoteObject.unexportObject(localRegistry, true); localRegistry = null; if(remote && remoteServer != null){ remoteServer.remoteStop(); } System.out.println("本地RMI服务器停止..."); System.exit(0); } public void remoteStop() throws RemoteException { this.isRuning = false; } private String getKey(String name, int port) { return name + "|" + port; } public boolean isRunning() { return isRuning; } }
3、然后再编写一个监听器用于监听服务器
package org.bird.rmi.support; import java.rmi.AccessException; import java.rmi.NotBoundException; import java.rmi.RemoteException; import org.bird.rmi.RMIServer; /** * RMI服务监听器 * @author liangjf * */ public class Listener extends Thread { private RMIServer server; public Listener(RMIServer server) { this.server = server; } public void run() { System.out.println("RMI服务器启动..."); try { while(server.isRunning()) { } } catch (RemoteException e) { e.printStackTrace(); } try { server.stop(false); } catch (AccessException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (RemoteException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (NotBoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
4、到目前一个简单的RPC框架就完成了。接着我们编写两个测试用例
package org.bird.rmi.test.echo; import java.rmi.Remote; import java.rmi.RemoteException; public interface Echo extends Remote { public String sayHello(String name) throws RemoteException; }
package org.bird.rmi.test.echo; import java.rmi.Remote; import java.rmi.RemoteException; public interface Hello extends Remote { public String sayHello(String name) throws RemoteException; }
package org.bird.rmi.test.echo; import java.rmi.RemoteException; public class EchoImpl implements Echo { public EchoImpl() throws RemoteException { super(); } public String sayHello(String name) { return "Hello, " + name; } }
package org.bird.rmi.test.echo; import java.rmi.RemoteException; import java.rmi.server.UnicastRemoteObject; public class RemoteHello extends UnicastRemoteObject implements Hello { public RemoteHello() throws RemoteException { super(); // TODO Auto-generated constructor stub } /** * */ private static final long serialVersionUID = 1L; public String sayHello(String name) throws RemoteException { return "Hello, " + name; } }
再编写一个服务器测试入口类
package org.bird.rmi.test; import java.rmi.NotBoundException; import java.rmi.RemoteException; import org.bird.rmi.RMIServer; import org.bird.rmi.RMIServerImpl; import org.bird.rmi.test.echo.Echo; import org.bird.rmi.test.echo.EchoImpl; import org.bird.rmi.test.echo.Hello; import org.bird.rmi.test.echo.RemoteHello; public class MainServer { /** * @param args * @throws RemoteException * @throws IllegalAccessException * @throws InstantiationException * @throws NotBoundException */ public static void main(String[] args) throws RemoteException, InstantiationException, IllegalAccessException, NotBoundException { RMIServer server = new RMIServerImpl(8888); server.registerRemoteObject(Hello.class, RemoteHello.class); server.registerRemoteObject(Echo.class, EchoImpl.class); Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { System.out.println("服务端JVM关闭..."); } }); server.start(); } }
最后写两个客户端的测试入口类
package org.bird.rmi.test; import java.rmi.AccessException; import java.rmi.NotBoundException; import java.rmi.RemoteException; import org.bird.rmi.RMIServer; import org.bird.rmi.RMIServerImpl; import org.bird.rmi.test.echo.Hello; public class MainClient1 { /** * @param args * @throws NotBoundException * @throws RemoteException * @throws ClassNotFoundException * @throws IllegalAccessException * @throws InstantiationException * @throws InterruptedException * @throws AccessException */ public static void main(String[] args) throws RemoteException, NotBoundException, ClassNotFoundException, InstantiationException, IllegalAccessException, InterruptedException { RMIServer client = new RMIServerImpl(RMIServer.DEFAULT_HOST, 9998, RMIServer.DEFAULT_HOST, RMIServer.DEFAULT_PORT); Hello hello = client.getObject(Hello.class); System.out.println(hello.sayHello("ljf")); Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { System.out.println("客户端1关闭....."); } }); client.stop(false); } }
package org.bird.rmi.test; import java.rmi.AccessException; import java.rmi.NotBoundException; import java.rmi.RemoteException; import org.bird.rmi.RMIServer; import org.bird.rmi.RMIServerImpl; import org.bird.rmi.test.echo.Echo; public class MainClient2 { /** * @param args * @throws NotBoundException * @throws RemoteException * @throws ClassNotFoundException * @throws IllegalAccessException * @throws InstantiationException * @throws InterruptedException * @throws AccessException */ public static void main(String[] args) throws RemoteException, NotBoundException, ClassNotFoundException, InstantiationException, IllegalAccessException, InterruptedException { RMIServer client = new RMIServerImpl(RMIServer.DEFAULT_HOST, 9999, RMIServer.DEFAULT_HOST, RMIServer.DEFAULT_PORT); Echo echo = client.getObject(Echo.class); System.out.println(echo.sayHello("飞飞")); Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { System.out.println("客户端2关闭....."); } }); client.stop(true); } }
5、测试用例编写完成后将RMIServer、RMIServerImpl、Listener、Echo、Hello、EchoImp、RemoteHello、MainServer这些类打包成可运行的server.jar;RMIServer、RMIServerImpl、Listener、Hello、MainClient1这些类打包成可以运行的client1.jar;RMIServer、RMIServerImpl、Listener、Echo、MainClient2这些类打包成可以运行的client2.jar
6、首先在命令行下运行server.jar
7、再另打开一窗口运行client1.jar查看一下运行情况
8、另外再打开一个窗口运行client2.jar查看客户端与服务端的变化情况
总结:上面实现的RMI框架服务端与客户端可以相互调用(既是服务端也是客户端),可以注册本地对象或者注册远程对象,对象的调用就跟本地调用一样对上层使用者来说是完全透明的。
相关推荐
基于Java的简易RPC框架 项目简介 本项目是一个简易的RPC(远程过程调用)框架,旨在通过模拟实现一个基本的RPC调用流程,帮助理解RPC的核心概念和实现原理。服务端采用Tomcat服务器,消费端使用HTTP协议发送网络...
本项目提供了一个简易版的Java RPC框架实现,旨在模仿著名的Dubbo框架,但采用了更基础的Socket通信方式进行分布式服务的搭建。以下是这个项目的核心知识点: 1. **RPC原理**:RPC使得客户端可以像调用本地方法一样...
该项目为基于Java语言的简易RPC框架设计源码,包含66个文件,其中包括51个Java源文件、5个XML配置文件、3个序列化工具类、2个Markdown文件、2个属性配置文件以及1个Git忽略文件和1个注册中心配置文件。此框架适用于...
在这个基于Java实现的简易RPC框架项目中,我们将深入探讨如何构建这样一个框架,并实现对`printf`函数的远程调用。 首先,我们要理解RPC的基本原理。RPC使得客户端可以像调用本地方法一样调用远程服务器上的方法,...
该项目是一个基于Java和Vert.x技术的简易RPC框架设计源码,共包含60个文件,主要由45个Java源文件、9个XML配置文件以及少量其他类型文件构成。该框架旨在提供一种高效、简洁的远程过程调用解决方案,适用于需要跨...
简介 demo-rpc(标准maven工程) 使用纯Java socket及简单多线程技术,不依赖任何第三方库类,实现简单实现类似dubbo的rpc调用。仅用于学习了解rpc调用过程, 实现略显简单,只体现rpc调用的关键步骤,存在很多优化细节,...
guide 目前只实现了RPC框架最基本的功能,一些可优化点都在下面提到了,有兴趣的小伙伴可以自我完善。 通过这个简易的轮子,你可以学到RPC的替代原理和原理以及各种Java编码实践的运用。 你甚至可以把当做你的毕设/...
# 基于Netty+Kyro+Zookeeper的RPC框架 [中文](./README.md)|English ## 前言 通过这个简易的轮子,你可以学到 RPC 的底层原理及原理以及各种 Java 编码实践的运用。 ## 介绍 由于 Guide哥自身精力和能力...
本文档将通过一系列章节详细介绍如何基于Netty 4.1版本实现一个简易的RPC框架。 #### 二、基础知识概述 ##### 2.1 RPC概念 - **定义**:RPC(Remote Procedure Call Protocol)即远程过程调用协议,它允许程序...
在这个Java简易RPC框架中,"f4框架"可能是框架的一个组成部分,可能是用于序列化、网络通信或者服务治理的组件。"caf_分布式 herselfcz9"这部分标签信息可能是指框架的作者或版本,"caf"可能是一个简写或代号,...
rpc框架前言学习javaGuide,自己动手造个轮子,通过这个简易的轮子,可以学到RPC的扭曲原理和原理以及各种Java编码实践的运用。介绍是一种基于Netty + Kyro + Zookeeper实现的RPC框架。设计思路一个基本的RPC框架...
xml java系统源码 handwriting ...使用Java默认的序列化以及传统Socket通信实现简易的rpc框架 hw-mybatis 实现了mybatis的主要的核心功能,目前在xml解析以及查询返回值这些地方采用硬编码实现 待续中...
熔断降级机制简易RPC框架-SPI熔断降级实现影响上下文机制,后续更新解决基于注解的锁Spring boot实践WEBValidator (未同步代码)多字段动态运算符HandlerMethodArgumentResolver (未同步代码)接口参数注入消息...
在一次 RPC 调用流程中,这三个组件是这样交互的: 服务端在启动后,会将它提供的服务列表发布到注册中心,客户端向注册中心订阅服务地址; 客户端会通过本地代理模块 Proxy 调用服务端,Proxy 模块收到负责将方法、...
nfs-rpc是一个集成了各种知名通信框架的高性能RPC框架,目前其最好的性能为在采用grizzly作为通信框架,采用pb作为序列化/反序列化时,tps为168k次/秒。 其支持的功能主要为: 1、透明的调用远端服务器提供的功能...
zrpc是一个针对Java开发的轻量级RPC框架,旨在简化服务间的通信,提高系统的可扩展性和解耦性。 在zrpc框架中,主要涉及以下几个核心概念和技术点: 1. **服务提供者(Service Provider)**:服务提供者是拥有特定...
nfs-rpc是一个集成了各种知名通信框架的高性能RPC框架,目前其最好的性能为在采用grizzly作为通信框架,采用pb作为序列化/反序列化时,tps为168k次/秒。 其支持的功能主要为: 1、透明的调用远端服务器提供的功能...
通过这个简易的轮子,你可以学到 RPC 的底层原理和原理以及各种 Java 编码实践的运用。 你甚至可以把 当做你的毕设/项目经验的选择,这是非常不错!对比其他求职者的项目经验都是各种系统,造轮子肯定是更加能赢得...
主程序使用 SpringBoot 实现, P2P 传输这块使用的是 t-io 网络框架。 运行环境为 JDK1.8 以上版本。 项目模块 blockj-base 基础公共的工具包,如加密,区块,消息等数据模型,数据存储等。 blockj-miner 区块链主...
nfs-rpc是一个集成了各种知名通信框架的高性能RPC框架,目前其最好的性能为在采用grizzly作为通信框架,采用pb作为序列化/反序列化时,tps为168k次/秒。 其支持的功能主要为: 1、透明的调用远端服务器提供的功能...