RPC(Remote Procedure Call) 在介绍分布是RPC前首先介绍一个下JAVA中简单的RPC实现
/** * * @author zhangwei_david * @version $Id: ServiceServer.java, v 0.1 2015年8月8日 上午11:40:41 zhangwei_david Exp $ */ public class ServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware { /**服务端口号**/ private int port = 12000; private ServerSocket server; //线程池 @Autowired private Executor executorService; public Map<String, Object> handlerMap = new ConcurrentHashMap<>(); private void publishedService() throws Exception { server = new ServerSocket(port); // 一直服务 for (;;) { try { // 获取socket final Socket socket = server.accept(); executorService.execute(new Runnable() { @Override public void run() { try { // 获取input ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); ObjectOutputStream output = new ObjectOutputStream(socket .getOutputStream()); try { // 获取引用 String interfaceName = input.readUTF(); //获取 方法名 String methodName = input.readUTF(); // Class<?>[] parameterTypes = (Class<?>[]) input.readObject(); Object[] arguments = (Object[]) input.readObject(); try { Object service = handlerMap.get(interfaceName); Method method = service.getClass().getMethod(methodName, parameterTypes); Object result = method.invoke(service, arguments); output.writeObject(result); } catch (Throwable t) { output.writeObject(t); } finally { input.close(); } } finally { socket.close(); } } catch (Exception e) { } } }); } catch (Exception e) { } } } public void init() { } /** * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() */ @Override public void afterPropertiesSet() throws Exception { //发布服务 publishedService(); } /** * @see org.springframework.context.Lifecycle#start() */ @Override public void start() { } /** * @see org.springframework.context.Lifecycle#stop() */ @Override public void stop() { if (server != null) { try { server.close(); } catch (IOException e) { } } } /** * @see org.springframework.context.Lifecycle#isRunning() */ @Override public boolean isRunning() { return false; } /** * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext) */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class); System.out.println(serviceBeanMap); if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) { for (Object serviceBean : serviceBeanMap.values()) { String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf() .getName(); handlerMap.put(interfaceName, serviceBean); } } } /** * Setter method for property <tt>executorService</tt>. * * @param executorService value to be assigned to property executorService */ public void setExecutorService(Executor executorService) { this.executorService = executorService; } }
/** * * @author zhangwei_david * @version $Id: SRPC.java, v 0.1 2015年8月8日 下午12:51:17 zhangwei_david Exp $ */ @Documented @Target({ ElementType.TYPE }) @Retention(RetentionPolicy.RUNTIME) @Component public @interface SRPC { public Class<?> interf(); }
/** * * @author zhangwei_david * @version $Id: Client.java, v 0.1 2015年8月8日 下午12:28:44 zhangwei_david Exp $ */ public class Client { /** * 引用服务 * * @param <T> 接口泛型 * @param interfaceClass 接口类型 * @param host 服务器主机名 * @param port 服务器端口 * @return 远程服务 * @throws Exception */ @SuppressWarnings("unchecked") public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception { if (interfaceClass == null || !interfaceClass.isInterface()) { throw new IllegalArgumentException("必须指定服务接口"); } if (host == null || host.length() == 0) { throw new IllegalArgumentException("必须指定服务器的地址和端口号"); } return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] { interfaceClass }, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable { Socket socket = new Socket(host, port); try { ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); try { output.writeUTF(interfaceClass.getName()); output.writeUTF(method.getName()); output.writeObject(method.getParameterTypes()); output.writeObject(arguments); ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { Object result = input.readObject(); if (result instanceof Throwable) { throw (Throwable) result; } return result; } finally { input.close(); } } finally { output.close(); } } finally { socket.close(); } } }); } }
上面在没有使用第三方依赖包实现了简单的RPC,优化增加 request和reponse,定义RPC协议。
/** * * @author zhangwei_david * @version $Id: SrpcRequest.java, v 0.1 2015年8月8日 下午1:45:53 zhangwei_david Exp $ */ public class SrpcRequest implements Serializable { /** */ private static final long serialVersionUID = 6132853628325824727L; // 请求Id private String requestId; // 远程调用接口名称 private String interfaceName; //远程调用方法名称 private String methodName; // 参数类型 private Class<?>[] parameterTypes; // 参数值 private Object[] parameters; /** * Getter method for property <tt>requestId</tt>. * * @return property value of requestId */ public String getRequestId() { return requestId; } /** * Setter method for property <tt>requestId</tt>. * * @param requestId value to be assigned to property requestId */ public void setRequestId(String requestId) { this.requestId = requestId; } /** * Getter method for property <tt>interfaceName</tt>. * * @return property value of interfaceName */ public String getInterfaceName() { return interfaceName; } /** * Setter method for property <tt>interfaceName</tt>. * * @param interfaceName value to be assigned to property interfaceName */ public void setInterfaceName(String interfaceName) { this.interfaceName = interfaceName; } /** * Getter method for property <tt>methodName</tt>. * * @return property value of methodName */ public String getMethodName() { return methodName; } /** * Setter method for property <tt>methodName</tt>. * * @param methodName value to be assigned to property methodName */ public void setMethodName(String methodName) { this.methodName = methodName; } /** * Getter method for property <tt>parameterTypes</tt>. * * @return property value of parameterTypes */ public Class<?>[] getParameterTypes() { return parameterTypes; } /** * Setter method for property <tt>parameterTypes</tt>. * * @param parameterTypes value to be assigned to property parameterTypes */ public void setParameterTypes(Class<?>[] parameterTypes) { this.parameterTypes = parameterTypes; } /** * Getter method for property <tt>parameters</tt>. * * @return property value of parameters */ public Object[] getParameters() { return parameters; } /** * Setter method for property <tt>parameters</tt>. * * @param parameters value to be assigned to property parameters */ public void setParameters(Object[] parameters) { this.parameters = parameters; } }
/** * * @author zhangwei_david * @version $Id: SrpcResponse.java, v 0.1 2015年8月8日 下午1:47:46 zhangwei_david Exp $ */ public class SrpcResponse implements Serializable { /** */ private static final long serialVersionUID = -5934073769679010930L; // 请求的Id private String requestId; // 异常 private Throwable error; // 响应 private Object result; /** * Getter method for property <tt>requestId</tt>. * * @return property value of requestId */ public String getRequestId() { return requestId; } /** * Setter method for property <tt>requestId</tt>. * * @param requestId value to be assigned to property requestId */ public void setRequestId(String requestId) { this.requestId = requestId; } /** * Getter method for property <tt>error</tt>. * * @return property value of error */ public Throwable getError() { return error; } /** * Setter method for property <tt>error</tt>. * * @param error value to be assigned to property error */ public void setError(Throwable error) { this.error = error; } /** * Getter method for property <tt>result</tt>. * * @return property value of result */ public Object getResult() { return result; } /** * Setter method for property <tt>result</tt>. * * @param result value to be assigned to property result */ public void setResult(Object result) { this.result = result; } }
/** * * @author zhangwei_david * @version $Id: ServiceServer.java, v 0.1 2015年8月8日 上午11:40:41 zhangwei_david Exp $ */ public class ServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware { /**服务端口号**/ private int port = 12000; private ServerSocket server; //线程池 @Autowired private Executor executorService; public Map<String, Object> handlerMap = new ConcurrentHashMap<>(); private void publishedService() throws Exception { server = new ServerSocket(port); // 一直服务 for (;;) { try { // 获取socket final Socket socket = server.accept(); executorService.execute(new Runnable() { @Override public void run() { try { // 获取input ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { // 获取RPC请求 SrpcRequest request = (SrpcRequest) input.readObject(); ObjectOutputStream output = new ObjectOutputStream(socket .getOutputStream()); try { SrpcResponse response = doHandle(request); output.writeObject(response); } finally { input.close(); } } finally { socket.close(); } } catch (Exception e) { } } }); } catch (Exception e) { } } } private SrpcResponse doHandle(SrpcRequest request) { SrpcResponse response = new SrpcResponse(); response.setRequestId(request.getRequestId()); try { Object service = handlerMap.get(request.getInterfaceName()); Method method = service.getClass().getMethod(request.getMethodName(), request.getParameterTypes()); response.setResult(method.invoke(service, request.getParameters())); } catch (Exception e) { response.setError(e); } return response; } public void init() { } /** * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() */ @Override public void afterPropertiesSet() throws Exception { //发布 publishedService(); } /** * @see org.springframework.context.Lifecycle#start() */ @Override public void start() { } /** * @see org.springframework.context.Lifecycle#stop() */ @Override public void stop() { if (server != null) { try { server.close(); } catch (IOException e) { } } } /** * @see org.springframework.context.Lifecycle#isRunning() */ @Override public boolean isRunning() { return false; } /** * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext) */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class); System.out.println(serviceBeanMap); if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) { for (Object serviceBean : serviceBeanMap.values()) { String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf() .getName(); handlerMap.put(interfaceName, serviceBean); } } } /** * Setter method for property <tt>executorService</tt>. * * @param executorService value to be assigned to property executorService */ public void setExecutorService(Executor executorService) { this.executorService = executorService; } }
/** * * @author zhangwei_david * @version $Id: Client.java, v 0.1 2015年8月8日 下午12:28:44 zhangwei_david Exp $ */ public class Client { /** * 引用服务 * * @param <T> 接口泛型 * @param interfaceClass 接口类型 * @param host 服务器主机名 * @param port 服务器端口 * @return 远程服务 * @throws Exception */ @SuppressWarnings("unchecked") public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception { if (interfaceClass == null || !interfaceClass.isInterface()) { throw new IllegalArgumentException("必须指定服务接口"); } if (host == null || host.length() == 0) { throw new IllegalArgumentException("必须指定服务器的地址和端口号"); } return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] { interfaceClass }, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable { Socket socket = new Socket(host, port); try { ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); try { SrpcRequest request = new SrpcRequest(); request.setRequestId(UUID.randomUUID().toString()); request.setInterfaceName(interfaceClass.getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParameters(arguments); output.writeObject(request); ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { SrpcResponse response = (SrpcResponse) input.readObject(); if (response.getError() != null && response.getError() instanceof Throwable) { throw response.getError(); } return response.getResult(); } finally { input.close(); } } finally { output.close(); } } finally { socket.close(); } } }); } }
实现RPC涉及到多个关键组件: - **Proxy(代理)**:客户端的调用接口,让客户端可以像调用本地方法一样调用远程服务。 - **Serialization(序列化)/Deserialization(反序列化)**:处理数据在客户端和服务器之间...
RPC(Remote Procedure Call)是一种计算机通信协议,允许一个程序在某个网络中的一个计算机上执行远程操作,就像它在本地...通过了解这些基础知识,开发者可以更好地理解和实现RPC,从而构建高效、可靠的分布式系统。
要实现一个简单的RPC架构Web Service,我们通常会遵循以下步骤: 1. **定义服务接口**:首先,我们需要定义服务的接口,这通常是一个包含若干方法的Java接口。例如,我们可以创建一个名为`CalculatorService`的接口...
在这个“netty的rpc协议的简单实现”项目中,我们将探讨Netty如何被用来构建RPC框架的关键组件和流程。 首先,我们需要理解RPC的基本原理。RPC的核心是封装网络通信细节,使得客户端和服务器之间通过接口进行通信,...
本项目提供了一个简易版的Java RPC框架实现,旨在模仿著名的Dubbo框架,但采用了更基础的Socket通信方式进行分布式服务的搭建。以下是这个项目的核心知识点: 1. **RPC原理**:RPC使得客户端可以像调用本地方法一样...
知识点: 1. Selenium:Selenium 是一个用于Web应用程序测试的工具。Selenium测试直接运行在浏览器中,就像真正的用户在操作一样。...整个过程只需要简单的10行代码,就能够实现JS的rpc,且支持POST和GET请求。
总之,Apache Mina提供了一种灵活且高效的方式来实现RPC,它的事件驱动模型和强大的过滤器框架使得构建网络服务变得更加简单。在实际开发中,可以根据具体需求调整和扩展这些基础架构,以满足不同场景下的性能和功能...
实现一个简单的RPC框架主要涉及以下几个关键点: 1. **网络通信库**:Netty是一个高性能、异步事件驱动的网络应用框架,常用于构建高并发、低延迟的服务器。在RPC框架中,Netty可以处理客户端与服务端之间的网络...
简单实现一个RPC 简单实现一个RPC 简单实现一个RPC 简单实现一个RPC 简单实现一个RPC 简单实现一个RPC 简单实现一个RPC ...
在Apache XML-RPC中,实现RPC服务和客户端涉及四个主要部分: 1. **RPC服务器端代码**:这部分代码创建了一个WebServer实例,监听特定端口(例如8399),并配置XML-RPC服务器来处理请求。服务器还需要一个...
在Java中实现一个简单的RPC框架,我们需要理解以下几个关键概念和技术: 1. **网络通信**:RPC的核心是通过网络进行通信。在Java中,我们可以使用Socket API来实现客户端和服务器端之间的数据传输。Socket提供了低...
- XML-RPC(XML Remote Procedure Call)是一种简单的基于HTTP协议的远程调用方法,使用XML作为数据传输格式。 - 它允许客户端(如LabVIEW应用)通过发送一个HTTP请求到服务器执行特定的函数或方法,并接收服务器...
基于RPC的简单应用通常涉及服务提供者(Server)和消费者(Client)两部分,使得分布式系统间的通信变得更加便捷。 在本压缩包中,"基于RPC的简单应用"可能包括以下组成部分: 1. **实验数据**:这部分可能包含了...
本教程将深入探讨如何在SpringBoot中实现RPC服务。 首先,理解RPC的基本概念是必要的。RPC允许服务提供者暴露一组服务接口,服务消费者通过这些接口像调用本地方法一样调用远程服务。整个过程对调用者透明,隐藏了...
sunrpc是开源实现RPC协议的一个库,广泛用于Linux环境。在NFS系统中,sunrpc提供了解决远程调用的关键机制。 NFS SIMULATOR的主要功能: 1. 目录操作:包括创建、列出和删除目录。 2. 文件操作:支持创建、列出和...