实现带监听功能简易RPC
通过上一编博文的学习,我们同样利用socket、JDK动态代理实现一个带监听功能并能停止服务器的简易RPC框架。
首先定义一个服务器接口
package org.bird.rpc2.support; import org.bird.rpc2.protocal.Invocation; /** * 服务器管理 * @author liangjf * */ public interface Server { /** * 停止服务器 */ public void stop(); /** * 启动服务器 */ public void start(); /** * 注册服务 * @param interfaceDefiner * @param impl */ public void register(Class<?> interfaceDefiner,Class<?> impl); /** * 远程调用 * @param invo */ public void call(Invocation invo); /** * 验证服务运行状态 * @return */ public boolean isRunning(); /** * 获取端口 * @return */ public Integer getPort(); /** * 获取主机IP * @return */ public String getHost(); }
接着我们写一个服务监听器
package org.bird.rpc2.support; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; import org.bird.rpc2.RPC; import org.bird.rpc2.local.ServerManager; import org.bird.rpc2.protocal.Result; import org.bird.rpc2.protocal.Invocation; /** * 服务监听器 * @author liangjf * */ public class Listener extends Thread { /** 服务器socket */ private ServerSocket socket; /** 服务器 */ private Server server; public Listener(Server server) { this.server = server; } @Override public void run() { String host = server.getHost(); Integer port = server.getPort(); port = port == null? RPC.DEFAULT_RPC_PORT:port; System.out.println("启动服务器中,打开端口" + port); try { ////创建服务器socket socket = host == null ? new ServerSocket(port) : new ServerSocket(server.getPort(), 50, InetAddress.getByName(host)); } catch (IOException e1) { e1.printStackTrace(); return; } while (server.isRunning()) { ObjectInputStream ois = null; ObjectOutputStream oos = null; try { System.out.println("等待请求"); Socket client = socket.accept();//堵塞监听客户socket请求 System.out.println("请求到来"); ois = new ObjectInputStream(client.getInputStream()); Invocation invo = (Invocation) ois.readObject(); System.out.println("远程调用:" + invo); server.call(invo); org.bird.rpc2.protocal.Method method = invo.getMethod(); if(invo.getInterfaces() == ServerManager.class && method.getMethodName().equals("stop") && invo.getResult() instanceof Result && ((Result)invo.getResult()).isStop()) {//接收到关闭服务器命令 server.stop(); } oos = new ObjectOutputStream(client.getOutputStream()); oos.writeObject(invo); oos.flush(); } catch (Exception e) { e.printStackTrace(); }finally { try { oos.close(); ois.close(); } catch (IOException e) { e.printStackTrace(); } } } try { if (socket != null && !socket.isClosed()) { System.out.println("正在关闭服务器..."); socket.close(); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
接着定义协议辅助类
package org.bird.rpc2.protocal; import java.io.Serializable; import java.util.Arrays; /** * 调用实例 * @author liangjf * */ public class Invocation implements Serializable{ /** * */ private static final long serialVersionUID = 1L; /** 接口 */ private Class<?> interfaces; /** 方法 */ private Method method; /** 参数数组 */ private Object[] params; /** 结果 */ private Object result; /** * @return the result */ public Object getResult() { return result; } /** * @param result the result to set */ public void setResult(Object result) { this.result = result; } /** * @return the interfaces */ public Class<?> getInterfaces() { return interfaces; } /** * @param interfaces the interfaces to set */ public void setInterfaces(Class<?> interfaces) { this.interfaces = interfaces; } /** * @return the method */ public Method getMethod() { return method; } /** * @param method the method to set */ public void setMethod(Method method) { this.method = method; } /** * @return the params */ public Object[] getParams() { return params; } /** * @param params the params to set */ public void setParams(Object[] params) { this.params = params; } @Override public String toString() { return interfaces.getName()+"."+method.getMethodName()+"("+Arrays.toString(params)+")"; } }
package org.bird.rpc2.protocal; import java.io.Serializable; /** * 调用方法 * @author liangjf * */ public class Method implements Serializable{ /** * */ private static final long serialVersionUID = 1L; /** 方法名 */ private String methodName; /** 参数数组 */ private Class<?>[] params; public Method(String name, Class<?>[] parameterTypes) { this.methodName = name; this.params = parameterTypes; } /** * @return the methodName */ public String getMethodName() { return methodName; } /** * @param methodName the methodName to set */ public void setMethodName(String methodName) { this.methodName = methodName; } /** * @return the params */ public Class<?>[] getParams() { return params; } /** * @param params the params to set */ public void setParams(Class<?>[] params) { this.params = params; } }
package org.bird.rpc2.protocal; import java.io.Serializable; /** * 停止服务器结果 * @author liangjf * */ public class Result implements Serializable { /** * */ private static final long serialVersionUID = 1L; /** 是否允许停止 */ private boolean isStop; /** 错误信息 */ private String msg; public boolean isStop() { return isStop; } public void setStop(boolean isStop) { this.isStop = isStop; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } }
然后利用JDK动态代理编写一个RPC核心框架类
package org.bird.rpc2; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.HashMap; import java.util.Map; import org.bird.rpc2.protocal.Invocation; import org.bird.rpc2.support.Client; import org.bird.rpc2.support.Listener; import org.bird.rpc2.support.Server; public class RPC { public static String DEFAULT_RPC_HOST = "127.0.0.1"; public static int DEFAULT_RPC_PORT = 8888; public static int DEFAULT_TIMEOUT = 1000 * 60;//一分钟 public static <T> T getProxy(final Class<T> clazz,String host,int port) { return getProxy(clazz, host, port, DEFAULT_TIMEOUT); } public static <T> T getProxy(final Class<T> clazz,String host,int port, int timeout) { if (port < 0 || port > 0xFFFF) {// 0-65535 throw new IllegalArgumentException("port out of range:" + port); } final Client client = new Client(host,port,timeout); InvocationHandler handler = new InvocationHandler() {//代理调用实例 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Invocation invo = new Invocation(); invo.setInterfaces(clazz); invo.setMethod(new org.bird.rpc2.protocal.Method(method.getName(),method.getParameterTypes())); invo.setParams(args); try { client.invoke(invo); return invo.getResult(); } catch (Exception e) { throw e; }finally { if(client != null) { if(client.getOis() != null){ client.getOis().close(); } if(client.getOos() != null) { client.getOos().close(); } if(client.getSocket() != null && !client.getSocket().isClosed()) { client.getSocket().close(); } } } } }; T t = (T) Proxy.newProxyInstance(RPC.class.getClassLoader(), new Class[] {clazz}, handler);//创建动态代理 return t; } public static class RPCServer implements Server{ private String host; private Integer port; private Listener listener; private boolean isRuning = true; private Map<String ,Object> serviceEngine = new HashMap<String, Object>(); public RPCServer(String host, Integer port) { if (port != null && (port < 0 || port > 0xFFFF)) { throw new IllegalArgumentException("port out of range:" + port); } this.host = host; this.port = port; } /** * @param isRuning the isRuning to set */ public void setRuning(boolean isRuning) { this.isRuning = isRuning; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public Integer getPort() { return port; } public void setPort(Integer port) { this.port = port; } public void call(Invocation invo) { System.out.println(invo.getClass().getName()); Object obj = serviceEngine.get(invo.getInterfaces().getName()); if(obj!=null) { try { Method m = obj.getClass().getMethod(invo.getMethod().getMethodName(), invo.getMethod().getParams()); Object result = m.invoke(obj, invo.getParams()); invo.setResult(result); } catch (Throwable th) { th.printStackTrace(); } } else { throw new IllegalArgumentException("has no these class"); } } public void register(Class<?> interfaceDefiner, Class<?> impl) { try { this.serviceEngine.put(interfaceDefiner.getName(), impl.newInstance()); System.out.println("注册服务:" + serviceEngine); } catch (Throwable e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void start() { System.out.println("启动服务器"); listener = new Listener(this); this.isRuning = true; listener.start(); } public void stop() { this.setRuning(false); } public boolean isRunning() { return isRuning; } } }
接着编写一个服务器管理本地接口
package org.bird.rpc2.local; import org.bird.rpc2.protocal.Result; /** * 本地调用-管理服务器 * @author liangjf * */ public interface ServerManager { /** * 注册服务 * @param interfaceDefiner * @param impl */ public void register(Class<?> interfaceDefiner,Class<?> impl); /** * 开启服务器 */ public void start(); /** * 根据端口开启服务器 * @param port */ public void start(Integer port); /** * 根据IP与端口开启服务器 * @param host * @param port */ public void start(String host, Integer port); /** * 停止服务器 * @param name 用户 * @param pawd 密码 * @return */ public Result stop(final String name, final String pawd); }
接着是具体实现类
package org.bird.rpc2.local.impl; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import org.bird.rpc2.RPC; import org.bird.rpc2.local.ServerManager; import org.bird.rpc2.protocal.Result; import org.bird.rpc2.support.Server; import org.bird.rpc2.util.StringUtil; import org.bird.security.EncryXOR; public final class ServerManagerImpl implements ServerManager { private Map<Class<?>,Class<?>> serviceEngine = new HashMap<Class<?>,Class<?>>(); public void start() { this.start(null, null); } public void start(Integer port) { this.start(null, port); } public void start(String host, Integer port) { Server server = new RPC.RPCServer(host, port); server.register(ServerManager.class, ServerManagerImpl.class);//注册本地服务 Iterator<Entry<Class<?>,Class<?>>> it = serviceEngine.entrySet().iterator(); while(it.hasNext()) {//注册远程服务列表 Entry<Class<?>,Class<?>> entry = it.next(); server.register(entry.getKey(), entry.getValue()); } server.start();//启动服务器 } public Result stop(String name, String pawd) { Result result = new Result(); boolean isStop = true; String msg = "服务器正常停止"; if(StringUtil.isEmpty(name)) { isStop = false; msg = "用户名不能为空"; } if(StringUtil.isEmpty(pawd)){ isStop = false; msg = "密码不能为空"; } String tname = EncryXOR.Decryptor(name); String tpawd = EncryXOR.Decryptor(pawd); if(!tname.equals("ljf") || !tpawd.equals("123456")){ isStop = false; msg = "用户或密码错误"; } result.setStop(isStop); result.setMsg(msg); return result; } /** * 注册服务 * @param interfaceDefiner * @param impl */ public void register(Class<?> interfaceDefiner,Class<?> impl) { serviceEngine.put(interfaceDefiner, impl); } }
再编写一个客户端的类
package org.bird.rpc2.support; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; import java.net.UnknownHostException; import org.bird.rpc2.protocal.Invocation; public class Client { private String host; private int port; private int timeout; private Socket socket; private ObjectOutputStream oos; private ObjectInputStream ois; public Socket getSocket() { return socket; } public void setSocket(Socket socket) { this.socket = socket; } public ObjectOutputStream getOos() { return oos; } public void setOos(ObjectOutputStream oos) { this.oos = oos; } public ObjectInputStream getOis() { return ois; } public void setOis(ObjectInputStream ois) { this.ois = ois; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } public Client(String host, int port, Integer timeout) { this.host = host; this.port = port; this.timeout = timeout; } public void init() throws UnknownHostException, IOException { SocketAddress socketaddress = new InetSocketAddress(host, port); socket = new Socket();// 创建socket socket.connect(socketaddress, timeout); oos = new ObjectOutputStream(socket.getOutputStream()); } public void invoke(Invocation invo) throws UnknownHostException, IOException, ClassNotFoundException { init(); System.out.println("发送请求"); Invocation result = null; try { oos.writeObject(invo); oos.flush(); ois = new ObjectInputStream(socket.getInputStream()); result = (Invocation) ois.readObject(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } invo.setResult(result.getResult()); } }
这个整个简易RPC架构就完成了。
接着我们写个测试用例,首先是服务器入口类
class MainServer { public static void main(String[] args) { ServerManager server = new ServerManagerImpl(); server.register(Calculate.class, RemoteCalculate.class); server.start(); Runtime.getRuntime().addShutdownHook(new Thread() {//添加jvm退出钩子 public void run() { System.out.println("服务器已停止..."); } }); } }
再编写一个服务器管理的本地调用类
class ServerStop { public static void main(String[] args) { Scanner sc = new Scanner(System.in); System.out.println("请输入用户:"); String name = sc.next(); System.out.println("请输入密码:"); String pawd = sc.next(); ServerManager client = RPC.getProxy(ServerManager.class, RPC.DEFAULT_RPC_HOST, RPC.DEFAULT_RPC_PORT); name = EncryXOR.Encrytor(name); pawd = EncryXOR.Encrytor(pawd); Result result = client.stop(name, pawd); System.out.println("响应信息:" + result.getMsg()); } }
写一个远程调用加法服务的客户端
public class MainClient1 { /** * @param args */ public static void main(String[] args) { Calculate client = RPC.getProxy(Calculate.class, RPC.DEFAULT_RPC_HOST, RPC.DEFAULT_RPC_PORT, 5000); int sum = client.add(2, 3); System.out.println(sum); }
写一个远程调用乘法服务的客户端
public class MainClient2 { /** * @param args */ public static void main(String[] args) { Calculate client = RPC.getProxy(Calculate.class, RPC.DEFAULT_RPC_HOST, RPC.DEFAULT_RPC_PORT, 5000); int sum = client.mult(2, 3); System.out.println(sum); }
最后把他们分别打成可执行的ServerStart.jar、ServerStop.jar、MainClient1.jar、MainClient2.jar。通过命令行窗口把ServerStart.jar运行起来
然后计算加法的远程调用运行起来看看运行结果
再打开一个窗口将乘法计算的远程调用运行起来
我们再看看服务端打印出来的远程调用过程
当我们想要停止服务器时,在本地命令行中运行ServerStop.jar
我们再看看服务器是不是正常停止了
整个测试顺利通过。能够简单实现远程调用功能。
相关推荐
基于Java的简易RPC框架 项目简介 本项目是一个简易的RPC(远程过程调用)框架,旨在通过模拟实现一个基本的RPC调用流程,帮助理解RPC的核心概念和实现原理。服务端采用Tomcat服务器,消费端使用HTTP协议发送网络...
本项目提供了一个简易版的Java RPC框架实现,旨在模仿著名的Dubbo框架,但采用了更基础的Socket通信方式进行分布式服务的搭建。以下是这个项目的核心知识点: 1. **RPC原理**:RPC使得客户端可以像调用本地方法一样...
该项目为基于Java语言的简易RPC框架设计源码,包含66个文件,其中包括51个Java源文件、5个XML配置文件、3个序列化工具类、2个Markdown文件、2个属性配置文件以及1个Git忽略文件和1个注册中心配置文件。此框架适用于...
在这个基于Java实现的简易RPC框架项目中,我们将深入探讨如何构建这样一个框架,并实现对`printf`函数的远程调用。 首先,我们要理解RPC的基本原理。RPC使得客户端可以像调用本地方法一样调用远程服务器上的方法,...
该项目是一个基于Java和Vert.x技术的简易RPC框架设计源码,共包含60个文件,主要由45个Java源文件、9个XML配置文件以及少量其他类型文件构成。该框架旨在提供一种高效、简洁的远程过程调用解决方案,适用于需要跨...
简介 demo-rpc(标准maven工程) 使用纯Java socket及简单多线程技术,不依赖任何第三方库类,实现简单实现类似dubbo的rpc调用。仅用于学习了解rpc调用过程, 实现略显简单,只体现rpc调用的关键步骤,存在很多优化细节,...
guide 目前只实现了RPC框架最基本的功能,一些可优化点都在下面提到了,有兴趣的小伙伴可以自我完善。 通过这个简易的轮子,你可以学到RPC的替代原理和原理以及各种Java编码实践的运用。 你甚至可以把当做你的毕设/...
# 基于Netty+Kyro+Zookeeper的RPC框架 [中文](./README.md)|English ## 前言 通过这个简易的轮子,你可以学到 RPC 的底层原理及原理以及各种 Java 编码实践的运用。 ## 介绍 由于 Guide哥自身精力和能力...
本文档将通过一系列章节详细介绍如何基于Netty 4.1版本实现一个简易的RPC框架。 #### 二、基础知识概述 ##### 2.1 RPC概念 - **定义**:RPC(Remote Procedure Call Protocol)即远程过程调用协议,它允许程序...
在这个Java简易RPC框架中,"f4框架"可能是框架的一个组成部分,可能是用于序列化、网络通信或者服务治理的组件。"caf_分布式 herselfcz9"这部分标签信息可能是指框架的作者或版本,"caf"可能是一个简写或代号,...
xml java系统源码 handwriting ...使用Java默认的序列化以及传统Socket通信实现简易的rpc框架 hw-mybatis 实现了mybatis的主要的核心功能,目前在xml解析以及查询返回值这些地方采用硬编码实现 待续中...
过滤器机制简易RPC框架-客户端限流配置简易RPC框架-上下文简易RPC框架-代理简易RPC框架-熔断降级机制简易RPC框架-SPI熔断降级实现影响上下文机制,后续更新解决基于注解的锁Spring boot实践WEBValidator (未同步...
rpc框架前言学习javaGuide,自己动手造个轮子,通过这个简易的轮子,可以学到RPC的扭曲原理和原理以及各种Java编码实践的运用。介绍是一种基于Netty + Kyro + Zookeeper实现的RPC框架。设计思路一个基本的RPC框架...
RPC 框架包含三个最重要的组件,分别是客户端、服务端和注册中心。在一次 RPC 调用流程中,这三个组件是这样交互的: 服务端在启动后,会将它提供的服务列表发布到注册中心,客户端向注册中心订阅服务地址; 客户端...
zrpc是一个针对Java开发的轻量级RPC框架,旨在简化服务间的通信,提高系统的可扩展性和解耦性。 在zrpc框架中,主要涉及以下几个核心概念和技术点: 1. **服务提供者(Service Provider)**:服务提供者是拥有特定...
nfs-rpc是一个集成了各种知名通信框架的高性能RPC框架,目前其最好的性能为在采用grizzly作为通信框架,采用pb作为序列化/反序列化时,tps为168k次/秒。 其支持的功能主要为: 1、透明的调用远端服务器提供的功能...
通过这个简易的轮子,你可以学到 RPC 的底层原理和原理以及各种 Java 编码实践的运用。 你甚至可以把 当做你的毕设/项目经验的选择,这是非常不错!对比其他求职者的项目经验都是各种系统,造轮子肯定是更加能赢得...
它使用 SpringBoot + Tio 网络框架实现,是一个非常好的区块链学习项目,目前只实现了 POW 共识算法,如果要用于生产项目需要根据自己的项目需求修改共识。 blockj-base 基础公共的工具包,如加密,区块,消息等...
nfs-rpc是一个集成了各种知名通信框架的高性能RPC框架,目前其最好的性能为在采用grizzly作为通信框架,采用pb作为序列化/反序列化时,tps为168k次/秒。 其支持的功能主要为: 1、透明的调用远端服务器提供的功能...
nfs-rpc是一个集成了各种知名通信框架的高性能RPC框架,目前其最好的性能为在采用grizzly作为通信框架,采用pb作为序列化/反序列化时,tps为168k次/秒。 其支持的功能主要为: 1、透明的调用远端服务器提供的功能...