`

Java实现简易RPC框架(一)

 
阅读更多

早些天看Hadoop源码的时候了解到Hadoop分布式环境中各个组件间的通信采用的RPC,由于暂无时间深入分析Hadoop中的RPC实现方式。参考网上资料学习跟例子实现简易RPC框架。

一、什么是RPC

RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。JDK内部提供RMI实现RPC(仅限于JAVA对象的远程调用)。



二、利用RMI实现RPC

想要了解RMI详细知识请查看本人另一编blog《如何发布RMI服务》。

1、首先定义RMI服务器接口

package org.bird.rmi;

import java.io.Serializable;
import java.rmi.AccessException;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.rmi.RemoteException;

public interface RMIServer extends Remote,Serializable {
	
	static String DEFAULT_HOST = "127.0.0.1";
	static int DEFAULT_PORT = 8888;
	
	/**
	 * 注册本地对象
	 * @param interfaceDefiner
	 * @param impl
	 * @throws InstantiationException
	 * @throws IllegalAccessException
	 */
	public void registerLocalObject(Class<?> interfaceDefiner, Class<?> impl) throws RemoteException, InstantiationException, IllegalAccessException;
	
	/**
	 * 注册远程对象
	 * @param interfaceDefiner
	 * @param impl
	 * @throws RemoteException
	 * @throws IllegalAccessException 
	 * @throws InstantiationException 
	 */
	public void registerRemoteObject(Class<?> interfaceDefiner, Class<?> impl) throws RemoteException, InstantiationException, IllegalAccessException;
	
	/**
	 * 卸载指定对象
	 * @param interfaceDefiner
	 * @throws RemoteException
	 * @throws NotBoundException 
	 * @throws ClassNotFoundException 
	 */
	public void unregisterObject(Class<?> interfaceDefiner) throws RemoteException, NotBoundException, ClassNotFoundException;
	
	/**
	 * 卸载所有对象
	 * @throws AccessException
	 * @throws RemoteException
	 * @throws NotBoundException
	 * @throws ClassNotFoundException 
	 */
	public void unregisterAllObjects() throws AccessException, RemoteException, NotBoundException, ClassNotFoundException;
	
	/**
	 * 获取对象
	 * @param interfaceDefiner
	 * @return
	 */
	public <T> T getObject(Class<T> interfaceDefiner) throws RemoteException;
	
	/**
	 *  启动服务
	 * @throws RemoteException
	 */
	public void start() throws RemoteException;
	
	/**
	 * 关闭服务器
	 * @param remote 是否同时关闭远程RMI服务
	 * @throws AccessException
	 * @throws RemoteException
	 * @throws NotBoundException
	 * @throws ClassNotFoundException 
	 */
	public void stop(boolean remote) throws AccessException, RemoteException, NotBoundException, ClassNotFoundException;
	
	/**
	 * 关闭远程服务
	 * @throws RemoteException
	 */
	public void remoteStop() throws RemoteException;
	
	/**
	 * 判断服务器运行状态
	 * @return
	 * @throws RemoteException
	 */
	public boolean isRunning() throws RemoteException;
}

 2、接着实现RMI服务器相关功能的实现类

package org.bird.rmi;

import java.rmi.AccessException;
import java.rmi.ConnectException;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.bird.rmi.support.Listener;

public class RMIServerImpl extends UnicastRemoteObject implements RMIServer {
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	
	private boolean isRuning;
	private Listener listener;

	/** 本地主机RMI注册表 */
	private Registry localRegistry = null;
	/** 远程主机RMI注册表 */
	private Registry remoteRegistry = null;
	/** 远程服务器 */
	private RMIServer remoteServer = null ;
	/** 缓存本地对象实例 */
	private Map<String,Object> registeredLocalObjectMap = new HashMap<String,Object>();
	/** 缓存远程对象实例 */
	private Map<String,Remote> registeredRemoteObjectMap = new HashMap<String,Remote>();
	
	public RMIServerImpl() throws RemoteException {
		this(DEFAULT_HOST, DEFAULT_PORT);
	}
	
	public RMIServerImpl(int port) throws RemoteException {
		this(DEFAULT_HOST, port);
	}
	
	public RMIServerImpl(String cip, int cport) throws RemoteException  {
		registerLocalServer(cip, cport);
	}
	
	public RMIServerImpl(String cip,int cport, String sip, int sport) throws RemoteException  {
		getRemoteServer(sip, sport);
		registerLocalServer(cip, cport);
	}
	
	private void registerLocalServer(String cip, int cport) throws AccessException, RemoteException {
		System.setProperty("java.rmi.server.hostname", cip);
		if(localRegistry == null) {
			try {
				localRegistry = LocateRegistry.getRegistry(cip, cport);
				localRegistry.list();
			} catch (Exception e) {
				try {
					localRegistry = LocateRegistry.createRegistry(cport);//在本地主机创建并导出RMI注册表
				} catch (RemoteException e1) {
					throw new RuntimeException(e1);
				}
			}
		}
		localRegistry.rebind(getKey(cip, cport), this);
	}
	
	private void getRemoteServer(String sip, int sport) throws AccessException, RemoteException {
		if(null == sip || "".equals(sip.trim())){
			return;
		}
		remoteRegistry = LocateRegistry.getRegistry(sip, sport);
		try {
			remoteServer = (RMIServer) remoteRegistry.lookup(getKey(sip, sport));
		} catch (ConnectException e) {
			remoteRegistry = null;
			throw new RemoteException("无法获取远程服务:" + getKey(sip, sport));
		} catch (NotBoundException e) {
			remoteRegistry = null;
			throw new RemoteException("无法获取远程服务:" + getKey(sip, sport));
		}
	}
	
	public void registerLocalObject(Class<?> interfaceDefiner,
			Class<?> impl) throws InstantiationException, IllegalAccessException {
		Object instance = impl.newInstance();
		registeredLocalObjectMap.put(interfaceDefiner.getName(), instance);
		System.out.println("注册本地对象:" + interfaceDefiner.getName());
	}

	public void registerRemoteObject(Class<?> interfaceDefiner, Class<?> impl)
			throws RemoteException, InstantiationException, IllegalAccessException {
		Object object = impl.newInstance();//反射实例化
		if(!(object instanceof Remote)){
			throw new ClassCastException("cannot be cast to java.rmi.Remote");
		}
		System.out.println("注册远程对象:" + interfaceDefiner.getName());
		Remote instance = (Remote) object;
		registeredRemoteObjectMap.put(interfaceDefiner.getName(), instance);
		
		if (instance instanceof UnicastRemoteObject) {
			localRegistry.rebind(interfaceDefiner.getName(), instance);//更新对此注册表中指定name的远程引用。
			if(remoteServer != null) {
				remoteServer.registerLocalObject(interfaceDefiner, impl);//对象实例直接推送到远程服务器端
			}
		}else{
			Remote stub = UnicastRemoteObject.exportObject(instance, 0);
			localRegistry.rebind(interfaceDefiner.getName(), stub);//更新对此注册表中指定name的远程引用。
			if(remoteServer != null) {
				remoteServer.registerLocalObject(interfaceDefiner, stub.getClass());//对象实例直接推送到远程服务器端
			}
		}
	}

	public void unregisterObject(Class<?> interfaceDefiner) throws AccessException, RemoteException, NotBoundException, ClassNotFoundException {
		unregisterObject(interfaceDefiner.getName());
	}
	
	private void unregisterObject(String key) throws AccessException, RemoteException, NotBoundException, ClassNotFoundException {
		if(registeredLocalObjectMap.containsKey(key)) {
			registeredLocalObjectMap.remove(key);
			System.out.println("注销本地对象:" + key);
		}else if (registeredRemoteObjectMap.containsKey(key)) {
			localRegistry.unbind(key);//移除RMI注册表中的绑定
			Remote remote = registeredRemoteObjectMap.get(key);
			if(remote != null) {
				UnicastRemoteObject.unexportObject(remote, true);//从RMI中移除远程对象
				remote = null;
			}
			registeredRemoteObjectMap.remove(key);
			if(remoteServer != null) {
				remoteServer.unregisterObject(Class.forName(key));//将推送到远程服务器上对象实例注销
			}
			System.out.println("注销远程对象:" + key);
		}
	}
	
	public void unregisterAllObjects() throws AccessException, RemoteException, NotBoundException, ClassNotFoundException {
		registeredLocalObjectMap.clear();//注销本地对象
		
		List<String> keys = new ArrayList<String>();
		for(String key : registeredRemoteObjectMap.keySet()) {
			keys.add(key);
		}
		for(String key : keys) {
			unregisterObject(key);
		}
	}
	
	public <T> T getObject(Class<T> interfaceDefiner) throws RemoteException {
		String key = interfaceDefiner.getName();
		if(registeredLocalObjectMap.containsKey(key)) {
			System.out.println("调用本地对象:" + interfaceDefiner.getName());
			return (T) registeredLocalObjectMap.get(key);
		}
		if(registeredRemoteObjectMap.containsKey(key)) {
			System.out.println("调用远程对象:" + interfaceDefiner.getName());
			return (T) registeredRemoteObjectMap.get(key);
		}
		if(remoteServer != null) {
			return remoteServer.getObject(interfaceDefiner);
		}
		return null;
	}
	
	public void start() throws RemoteException {
		listener = new Listener(this);
		this.isRuning = true;
		listener.start();
	}

	public void stop(boolean remote) throws AccessException, RemoteException, NotBoundException, ClassNotFoundException {
		unregisterAllObjects();
		String[] names = localRegistry.list();
		for(String name : names) {
			localRegistry.unbind(name);
		}
		UnicastRemoteObject.unexportObject(localRegistry, true);
		localRegistry = null;
		if(remote && remoteServer != null){
			remoteServer.remoteStop();
		}
		System.out.println("本地RMI服务器停止...");
		System.exit(0);
	}
	
	public void remoteStop() throws RemoteException {
		this.isRuning = false;
	}

	private String getKey(String name, int port) {
		return name + "|" + port;
	}

	public boolean isRunning() {
		return isRuning;
	}
}

 3、然后再编写一个监听器用于监听服务器

package org.bird.rmi.support;

import java.rmi.AccessException;
import java.rmi.NotBoundException;
import java.rmi.RemoteException;

import org.bird.rmi.RMIServer;

/**
 * RMI服务监听器
 * @author liangjf
 *
 */
public class Listener extends Thread {

	private RMIServer server;
	
	public Listener(RMIServer server) {
		this.server = server;
	}
	
	public void run() {
		System.out.println("RMI服务器启动...");
		try {
			while(server.isRunning()) {
				
			}
		} catch (RemoteException e) {
			e.printStackTrace();
		}
		try {
			server.stop(false);
		} catch (AccessException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (RemoteException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (NotBoundException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}
}

 4、到目前一个简单的RPC框架就完成了。接着我们编写两个测试用例

package org.bird.rmi.test.echo;

import java.rmi.Remote;
import java.rmi.RemoteException;

public interface Echo extends Remote {

	public String sayHello(String name) throws RemoteException;
}

 

package org.bird.rmi.test.echo;

import java.rmi.Remote;
import java.rmi.RemoteException;

public interface Hello extends Remote {

	public String sayHello(String name) throws RemoteException;
}

 

package org.bird.rmi.test.echo;

import java.rmi.RemoteException;

public class EchoImpl implements Echo {

	public EchoImpl() throws RemoteException {
		super();
	}
	public String sayHello(String name) {
		return "Hello, " + name;
	}

}

 

package org.bird.rmi.test.echo;

import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;


public class RemoteHello extends UnicastRemoteObject implements Hello {

	public RemoteHello() throws RemoteException {
		super();
		// TODO Auto-generated constructor stub
	}

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	public String sayHello(String name) throws RemoteException {
		return "Hello, " + name;
	}

}

 再编写一个服务器测试入口类

package org.bird.rmi.test;

import java.rmi.NotBoundException;
import java.rmi.RemoteException;

import org.bird.rmi.RMIServer;
import org.bird.rmi.RMIServerImpl;
import org.bird.rmi.test.echo.Echo;
import org.bird.rmi.test.echo.EchoImpl;
import org.bird.rmi.test.echo.Hello;
import org.bird.rmi.test.echo.RemoteHello;

public class MainServer {

	/**
	 * @param args
	 * @throws RemoteException 
	 * @throws IllegalAccessException 
	 * @throws InstantiationException 
	 * @throws NotBoundException 
	 */
	public static void main(String[] args) throws RemoteException, InstantiationException, IllegalAccessException, NotBoundException {
		RMIServer server = new RMIServerImpl(8888);
		server.registerRemoteObject(Hello.class, RemoteHello.class);
		server.registerRemoteObject(Echo.class, EchoImpl.class);
		Runtime.getRuntime().addShutdownHook(new Thread() {
			public void run() {
				System.out.println("服务端JVM关闭...");
			}
		});
		server.start();
	}

}

最后写两个客户端的测试入口类

package org.bird.rmi.test;

import java.rmi.AccessException;
import java.rmi.NotBoundException;
import java.rmi.RemoteException;

import org.bird.rmi.RMIServer;
import org.bird.rmi.RMIServerImpl;
import org.bird.rmi.test.echo.Hello;

public class MainClient1 {

	/**
	 * @param args
	 * @throws NotBoundException 
	 * @throws RemoteException 
	 * @throws ClassNotFoundException 
	 * @throws IllegalAccessException 
	 * @throws InstantiationException 
	 * @throws InterruptedException 
	 * @throws AccessException 
	 */
	public static void main(String[] args) throws RemoteException, NotBoundException, ClassNotFoundException, InstantiationException, IllegalAccessException, InterruptedException {
		RMIServer client = new RMIServerImpl(RMIServer.DEFAULT_HOST, 9998, RMIServer.DEFAULT_HOST, RMIServer.DEFAULT_PORT);
		Hello hello = client.getObject(Hello.class);
		System.out.println(hello.sayHello("ljf"));

		Runtime.getRuntime().addShutdownHook(new Thread() {
			public void run() {
				System.out.println("客户端1关闭.....");
			}
		});
		client.stop(false);
	}

}

 

package org.bird.rmi.test;

import java.rmi.AccessException;
import java.rmi.NotBoundException;
import java.rmi.RemoteException;

import org.bird.rmi.RMIServer;
import org.bird.rmi.RMIServerImpl;
import org.bird.rmi.test.echo.Echo;

public class MainClient2 {

	/**
	 * @param args
	 * @throws NotBoundException 
	 * @throws RemoteException 
	 * @throws ClassNotFoundException 
	 * @throws IllegalAccessException 
	 * @throws InstantiationException 
	 * @throws InterruptedException 
	 * @throws AccessException 
	 */
	public static void main(String[] args) throws RemoteException, NotBoundException, ClassNotFoundException, InstantiationException, IllegalAccessException, InterruptedException {
		RMIServer client = new RMIServerImpl(RMIServer.DEFAULT_HOST, 9999, RMIServer.DEFAULT_HOST, RMIServer.DEFAULT_PORT);
		Echo echo = client.getObject(Echo.class);
		System.out.println(echo.sayHello("飞飞"));
		Runtime.getRuntime().addShutdownHook(new Thread() {
			public void run() {
				System.out.println("客户端2关闭.....");
			}
		});
		client.stop(true);
	}

}

 5、测试用例编写完成后将RMIServer、RMIServerImpl、Listener、Echo、Hello、EchoImp、RemoteHello、MainServer这些类打包成可运行的server.jar;RMIServer、RMIServerImpl、Listener、Hello、MainClient1这些类打包成可以运行的client1.jar;RMIServer、RMIServerImpl、Listener、Echo、MainClient2这些类打包成可以运行的client2.jar

6、首先在命令行下运行server.jar

7、再另打开一窗口运行client1.jar查看一下运行情况

 

8、另外再打开一个窗口运行client2.jar查看客户端与服务端的变化情况


 

 

总结:上面实现的RMI框架服务端与客户端可以相互调用(既是服务端也是客户端),可以注册本地对象或者注册远程对象,对象的调用就跟本地调用一样对上层使用者来说是完全透明的。

  • 大小: 3 KB
  • 大小: 7.7 KB
  • 大小: 2.5 KB
  • 大小: 4.1 KB
  • 大小: 2.9 KB
  • 大小: 4 KB
分享到:
评论

相关推荐

    基于Java的简易RPC框架.zip

    基于Java的简易RPC框架 项目简介 本项目是一个简易的RPC(远程过程调用)框架,旨在通过模拟实现一个基本的RPC调用流程,帮助理解RPC的核心概念和实现原理。服务端采用Tomcat服务器,消费端使用HTTP协议发送网络...

    Java rpc框架简易版,类似dubbo分布式实现 (纯socket实现).zip

    本项目提供了一个简易版的Java RPC框架实现,旨在模仿著名的Dubbo框架,但采用了更基础的Socket通信方式进行分布式服务的搭建。以下是这个项目的核心知识点: 1. **RPC原理**:RPC使得客户端可以像调用本地方法一样...

    基于Java语言的简易RPC框架设计源码

    该项目为基于Java语言的简易RPC框架设计源码,包含66个文件,其中包括51个Java源文件、5个XML配置文件、3个序列化工具类、2个Markdown文件、2个属性配置文件以及1个Git忽略文件和1个注册中心配置文件。此框架适用于...

    基于Java实现一个简易的RPC框架【100012743】

    在这个基于Java实现的简易RPC框架项目中,我们将深入探讨如何构建这样一个框架,并实现对`printf`函数的远程调用。 首先,我们要理解RPC的基本原理。RPC使得客户端可以像调用本地方法一样调用远程服务器上的方法,...

    基于Java和Vert.x的简易RPC框架设计源码

    该项目是一个基于Java和Vert.x技术的简易RPC框架设计源码,共包含60个文件,主要由45个Java源文件、9个XML配置文件以及少量其他类型文件构成。该框架旨在提供一种高效、简洁的远程过程调用解决方案,适用于需要跨...

    Java rpc框架简易版,类似dubbo分布式实现 (纯socket实现)

    简介 demo-rpc(标准maven工程) 使用纯Java socket及简单多线程技术,不依赖任何第三方库类,实现简单实现类似dubbo的rpc调用。仅用于学习了解rpc调用过程, 实现略显简单,只体现rpc调用的关键步骤,存在很多优化细节,...

    guide-rpc-framework:由Netty + Kyro + Zookeeper实现的自定义RPC框架。(基于Netty + Kyro + Zookeeper实现的自定义RPC框架-附加详细实现过程和相关教程。)

    guide 目前只实现了RPC框架最基本的功能,一些可优化点都在下面提到了,有兴趣的小伙伴可以自我完善。 通过这个简易的轮子,你可以学到RPC的替代原理和原理以及各种Java编码实践的运用。 你甚至可以把当做你的毕设/...

    基于Netty+Kyro+Zookeeper的RPC框架.zip

    # 基于Netty+Kyro+Zookeeper的RPC框架 [中文](./README.md)|English ## 前言 通过这个简易的轮子,你可以学到 RPC 的底层原理及原理以及各种 Java 编码实践的运用。 ## 介绍 由于 Guide哥自身精力和能力...

    Netty4.1实战-手写RPC框架.pdf

    本文档将通过一系列章节详细介绍如何基于Netty 4.1版本实现一个简易的RPC框架。 #### 二、基础知识概述 ##### 2.1 RPC概念 - **定义**:RPC(Remote Procedure Call Protocol)即远程过程调用协议,它允许程序...

    分布式代码rpc.rar_DEMO_caf 分布式_herselfcz9_java f4框架_分布式

    在这个Java简易RPC框架中,"f4框架"可能是框架的一个组成部分,可能是用于序列化、网络通信或者服务治理的组件。"caf_分布式 herselfcz9"这部分标签信息可能是指框架的作者或版本,"caf"可能是一个简写或代号,...

    rpc-framework:一个rpc框架演示

    rpc框架前言学习javaGuide,自己动手造个轮子,通过这个简易的轮子,可以学到RPC的扭曲原理和原理以及各种Java编码实践的运用。介绍是一种基于Netty + Kyro + Zookeeper实现的RPC框架。设计思路一个基本的RPC框架...

    xmljava系统源码-handwriting:开源框架之手写系列,主要手写了springmvc,mybatis,rpc框架的实现,目前是跟着

    xml java系统源码 handwriting ...使用Java默认的序列化以及传统Socket通信实现简易的rpc框架 hw-mybatis 实现了mybatis的主要的核心功能,目前在xml解析以及查询返回值这些地方采用硬编码实现 待续中...

    jim-framework:一些公共组件及学习应用:RPC统一配置中心基于注解的分布式锁dubbo请求级缓存调用链追踪RabbitMQElasticsearchzookeeperSping boot

    熔断降级机制简易RPC框架-SPI熔断降级实现影响上下文机制,后续更新解决基于注解的锁Spring boot实践WEBValidator (未同步代码)多字段动态运算符HandlerMethodArgumentResolver (未同步代码)接口参数注入消息...

    mini-rpc:一个简易版 RPC 框架

    在一次 RPC 调用流程中,这三个组件是这样交互的: 服务端在启动后,会将它提供的服务列表发布到注册中心,客户端向注册中心订阅服务地址; 客户端会通过本地代理模块 Proxy 调用服务端,Proxy 模块收到负责将方法、...

    java开源包4

    nfs-rpc是一个集成了各种知名通信框架的高性能RPC框架,目前其最好的性能为在采用grizzly作为通信框架,采用pb作为序列化/反序列化时,tps为168k次/秒。 其支持的功能主要为: 1、透明的调用远端服务器提供的功能...

    zrpc:简易的rpc框架

    zrpc是一个针对Java开发的轻量级RPC框架,旨在简化服务间的通信,提高系统的可扩展性和解耦性。 在zrpc框架中,主要涉及以下几个核心概念和技术点: 1. **服务提供者(Service Provider)**:服务提供者是拥有特定...

    java开源包3

    nfs-rpc是一个集成了各种知名通信框架的高性能RPC框架,目前其最好的性能为在采用grizzly作为通信框架,采用pb作为序列化/反序列化时,tps为168k次/秒。 其支持的功能主要为: 1、透明的调用远端服务器提供的功能...

    guide-rpc-framework:A custom RPC framework implemented by Netty+Kyro+Zookeeper.(一款基于 Netty+Kyro+Zookeeper 实现的自定义 RPC 框架-附详细实现过程和相关教程。)

    通过这个简易的轮子,你可以学到 RPC 的底层原理和原理以及各种 Java 编码实践的运用。 你甚至可以把 当做你的毕设/项目经验的选择,这是非常不错!对比其他求职者的项目经验都是各种系统,造轮子肯定是更加能赢得...

    Java 实现的一个简易区块链(联盟链)项目

    主程序使用 SpringBoot 实现, P2P 传输这块使用的是 t-io 网络框架。 运行环境为 JDK1.8 以上版本。 项目模块 blockj-base 基础公共的工具包,如加密,区块,消息等数据模型,数据存储等。 blockj-miner 区块链主...

    java开源包6

    nfs-rpc是一个集成了各种知名通信框架的高性能RPC框架,目前其最好的性能为在采用grizzly作为通信框架,采用pb作为序列化/反序列化时,tps为168k次/秒。 其支持的功能主要为: 1、透明的调用远端服务器提供的功能...

Global site tag (gtag.js) - Google Analytics