RPC(Remote Procedure Call) 在介绍分布是RPC前首先介绍一个下JAVA中简单的RPC实现
服务器端,通过SocketServer,持续接收客户端的请求,并将客户端的请求分发到指定的处理器出去处理。
阻塞通线模型,是server对每一个请求都开启一条线程去执行请求,此种方式的缺点是服务器端线程的数量和客户端并发访问请求树呈1:1的正比关系。
此处对此作出了一定优化,伪异步IO通信,将所有用户请求放到线程池中处理。
/** * * @author zhangwei_david * @version $Id: ServiceServer.java, v 0.1 2015年8月8日 上午11:40:41 zhangwei_david Exp $ */ public class ServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware { /**服务端口号**/ private int port = 12000; private ServerSocket server; //线程池 @Autowired private Executor executorService; public Map<String, Object> handlerMap = new ConcurrentHashMap<>(); private void publishedService() throws Exception { server = new ServerSocket(port); // 一直服务 for (;;) { try { // 获取socket final Socket socket = server.accept(); executorService.execute(new Runnable() { @Override public void run() { try { // 获取input ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); ObjectOutputStream output = new ObjectOutputStream(socket .getOutputStream()); try { // 获取引用 String interfaceName = input.readUTF(); //获取 方法名 String methodName = input.readUTF(); // Class<?>[] parameterTypes = (Class<?>[]) input.readObject(); Object[] arguments = (Object[]) input.readObject(); try { Object service = handlerMap.get(interfaceName); Method method = service.getClass().getMethod(methodName, parameterTypes); Object result = method.invoke(service, arguments); output.writeObject(result); } catch (Throwable t) { output.writeObject(t); } finally { input.close(); } } finally { socket.close(); } } catch (Exception e) { } } }); } catch (Exception e) { } } } public void init() { } /** * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() */ @Override public void afterPropertiesSet() throws Exception { //发布服务 publishedService(); } /** * @see org.springframework.context.Lifecycle#start() */ @Override public void start() { } /** * @see org.springframework.context.Lifecycle#stop() */ @Override public void stop() { if (server != null) { try { server.close(); } catch (IOException e) { } } } /** * @see org.springframework.context.Lifecycle#isRunning() */ @Override public boolean isRunning() { return false; } /** * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext) */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class); System.out.println(serviceBeanMap); if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) { for (Object serviceBean : serviceBeanMap.values()) { String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf() .getName(); handlerMap.put(interfaceName, serviceBean); } } } /** * Setter method for property <tt>executorService</tt>. * * @param executorService value to be assigned to property executorService */ public void setExecutorService(Executor executorService) { this.executorService = executorService; } }
/** * * @author zhangwei_david * @version $Id: SRPC.java, v 0.1 2015年8月8日 下午12:51:17 zhangwei_david Exp $ */ @Documented @Target({ ElementType.TYPE }) @Retention(RetentionPolicy.RUNTIME) @Component public @interface SRPC { public Class<?> interf(); }
至此就实现了服务的自动发现自动注册,当然这个仅针对单机环境下的自动注册。
/** * * @author zhangwei_david * @version $Id: Client.java, v 0.1 2015年8月8日 下午12:28:44 zhangwei_david Exp $ */ public class Client { /** * 引用服务 * * @param <T> 接口泛型 * @param interfaceClass 接口类型 * @param host 服务器主机名 * @param port 服务器端口 * @return 远程服务 * @throws Exception */ @SuppressWarnings("unchecked") public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception { if (interfaceClass == null || !interfaceClass.isInterface()) { throw new IllegalArgumentException("必须指定服务接口"); } if (host == null || host.length() == 0) { throw new IllegalArgumentException("必须指定服务器的地址和端口号"); } return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] { interfaceClass }, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable { Socket socket = new Socket(host, port); try { ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); try { output.writeUTF(interfaceClass.getName()); output.writeUTF(method.getName()); output.writeObject(method.getParameterTypes()); output.writeObject(arguments); ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { Object result = input.readObject(); if (result instanceof Throwable) { throw (Throwable) result; } return result; } finally { input.close(); } } finally { output.close(); } } finally { socket.close(); } } }); } }
上面在没有使用第三方依赖包实现了简单的RPC,优化增加 request和reponse,定义RPC协议。
/** * * @author zhangwei_david * @version $Id: SrpcRequest.java, v 0.1 2015年8月8日 下午1:45:53 zhangwei_david Exp $ */ public class SrpcRequest implements Serializable { /** */ private static final long serialVersionUID = 6132853628325824727L; // 请求Id private String requestId; // 远程调用接口名称 private String interfaceName; //远程调用方法名称 private String methodName; // 参数类型 private Class<?>[] parameterTypes; // 参数值 private Object[] parameters; /** * Getter method for property <tt>requestId</tt>. * * @return property value of requestId */ public String getRequestId() { return requestId; } /** * Setter method for property <tt>requestId</tt>. * * @param requestId value to be assigned to property requestId */ public void setRequestId(String requestId) { this.requestId = requestId; } /** * Getter method for property <tt>interfaceName</tt>. * * @return property value of interfaceName */ public String getInterfaceName() { return interfaceName; } /** * Setter method for property <tt>interfaceName</tt>. * * @param interfaceName value to be assigned to property interfaceName */ public void setInterfaceName(String interfaceName) { this.interfaceName = interfaceName; } /** * Getter method for property <tt>methodName</tt>. * * @return property value of methodName */ public String getMethodName() { return methodName; } /** * Setter method for property <tt>methodName</tt>. * * @param methodName value to be assigned to property methodName */ public void setMethodName(String methodName) { this.methodName = methodName; } /** * Getter method for property <tt>parameterTypes</tt>. * * @return property value of parameterTypes */ public Class<?>[] getParameterTypes() { return parameterTypes; } /** * Setter method for property <tt>parameterTypes</tt>. * * @param parameterTypes value to be assigned to property parameterTypes */ public void setParameterTypes(Class<?>[] parameterTypes) { this.parameterTypes = parameterTypes; } /** * Getter method for property <tt>parameters</tt>. * * @return property value of parameters */ public Object[] getParameters() { return parameters; } /** * Setter method for property <tt>parameters</tt>. * * @param parameters value to be assigned to property parameters */ public void setParameters(Object[] parameters) { this.parameters = parameters; } }
/** * * @author zhangwei_david * @version $Id: SrpcResponse.java, v 0.1 2015年8月8日 下午1:47:46 zhangwei_david Exp $ */ public class SrpcResponse implements Serializable { /** */ private static final long serialVersionUID = -5934073769679010930L; // 请求的Id private String requestId; // 异常 private Throwable error; // 响应 private Object result; /** * Getter method for property <tt>requestId</tt>. * * @return property value of requestId */ public String getRequestId() { return requestId; } /** * Setter method for property <tt>requestId</tt>. * * @param requestId value to be assigned to property requestId */ public void setRequestId(String requestId) { this.requestId = requestId; } /** * Getter method for property <tt>error</tt>. * * @return property value of error */ public Throwable getError() { return error; } /** * Setter method for property <tt>error</tt>. * * @param error value to be assigned to property error */ public void setError(Throwable error) { this.error = error; } /** * Getter method for property <tt>result</tt>. * * @return property value of result */ public Object getResult() { return result; } /** * Setter method for property <tt>result</tt>. * * @param result value to be assigned to property result */ public void setResult(Object result) { this.result = result; } }
/** * * @author zhangwei_david * @version $Id: ServiceServer.java, v 0.1 2015年8月8日 上午11:40:41 zhangwei_david Exp $ */ public class ServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware { /**服务端口号**/ private int port = 12000; private ServerSocket server; //线程池 @Autowired private Executor executorService; public Map<String, Object> handlerMap = new ConcurrentHashMap<>(); private void publishedService() throws Exception { server = new ServerSocket(port); // 一直服务 for (;;) { try { // 获取socket final Socket socket = server.accept(); executorService.execute(new Runnable() { @Override public void run() { try { // 获取input ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { // 获取RPC请求 SrpcRequest request = (SrpcRequest) input.readObject(); ObjectOutputStream output = new ObjectOutputStream(socket .getOutputStream()); try { SrpcResponse response = doHandle(request); output.writeObject(response); } finally { input.close(); } } finally { socket.close(); } } catch (Exception e) { } } }); } catch (Exception e) { } } } private SrpcResponse doHandle(SrpcRequest request) { SrpcResponse response = new SrpcResponse(); response.setRequestId(request.getRequestId()); try { Object service = handlerMap.get(request.getInterfaceName()); Method method = service.getClass().getMethod(request.getMethodName(), request.getParameterTypes()); response.setResult(method.invoke(service, request.getParameters())); } catch (Exception e) { response.setError(e); } return response; } public void init() { } /** * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() */ @Override public void afterPropertiesSet() throws Exception { //发布 publishedService(); } /** * @see org.springframework.context.Lifecycle#start() */ @Override public void start() { } /** * @see org.springframework.context.Lifecycle#stop() */ @Override public void stop() { if (server != null) { try { server.close(); } catch (IOException e) { } } } /** * @see org.springframework.context.Lifecycle#isRunning() */ @Override public boolean isRunning() { return false; } /** * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext) */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class); System.out.println(serviceBeanMap); if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) { for (Object serviceBean : serviceBeanMap.values()) { String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf() .getName(); handlerMap.put(interfaceName, serviceBean); } } } /** * Setter method for property <tt>executorService</tt>. * * @param executorService value to be assigned to property executorService */ public void setExecutorService(Executor executorService) { this.executorService = executorService; } }
/** * * @author zhangwei_david * @version $Id: Client.java, v 0.1 2015年8月8日 下午12:28:44 zhangwei_david Exp $ */ public class Client { /** * 引用服务 * * @param <T> 接口泛型 * @param interfaceClass 接口类型 * @param host 服务器主机名 * @param port 服务器端口 * @return 远程服务 * @throws Exception */ @SuppressWarnings("unchecked") public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception { if (interfaceClass == null || !interfaceClass.isInterface()) { throw new IllegalArgumentException("必须指定服务接口"); } if (host == null || host.length() == 0) { throw new IllegalArgumentException("必须指定服务器的地址和端口号"); } return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] { interfaceClass }, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable { Socket socket = new Socket(host, port); try { ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); try { SrpcRequest request = new SrpcRequest(); request.setRequestId(UUID.randomUUID().toString()); request.setInterfaceName(interfaceClass.getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParameters(arguments); output.writeObject(request); ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { SrpcResponse response = (SrpcResponse) input.readObject(); if (response.getError() != null && response.getError() instanceof Throwable) { throw response.getError(); } return response.getResult(); } finally { input.close(); } } finally { output.close(); } } finally { socket.close(); } } }); } }
相关推荐
以上就是使用socket、反射和序列化技术实现简单RPC框架的主要过程。这个框架虽然基础,但已经能够展示RPC的核心思想。对于更复杂的场景,如负载均衡、服务发现、容错机制等,可以在此基础上进行扩展和完善。在实际...
实现RPC涉及到多个关键组件: - **Proxy(代理)**:客户端的调用接口,让客户端可以像调用本地方法一样调用远程服务。 - **Serialization(序列化)/Deserialization(反序列化)**:处理数据在客户端和服务器之间...
以上就是基于Java实现简单RPC框架的关键知识点和实现流程。理解这些概念对于深入学习分布式系统和Java高级编程至关重要。通过实践这样一个项目,你可以更好地掌握这些技术,并能够灵活运用到实际工作中去。
在提供的"school-rpc(手写rpc)"压缩包中,我们可以看到作者实现的客户端和服务端源代码,通过阅读这些代码,可以深入理解RPC的工作流程和实现细节,有助于提升对分布式系统和网络通信的理解。在实际项目中,我们还...
RPC(Remote Procedure Call)是一种计算机通信协议,允许一个程序在某个网络中的一个计算机上执行远程操作,就像它在本地...通过了解这些基础知识,开发者可以更好地理解和实现RPC,从而构建高效、可靠的分布式系统。
要实现一个简单的RPC架构Web Service,我们通常会遵循以下步骤: 1. **定义服务接口**:首先,我们需要定义服务的接口,这通常是一个包含若干方法的Java接口。例如,我们可以创建一个名为`CalculatorService`的接口...
在这个“netty的rpc协议的简单实现”项目中,我们将探讨Netty如何被用来构建RPC框架的关键组件和流程。 首先,我们需要理解RPC的基本原理。RPC的核心是封装网络通信细节,使得客户端和服务器之间通过接口进行通信,...
本项目提供了一个简易版的Java RPC框架实现,旨在模仿著名的Dubbo框架,但采用了更基础的Socket通信方式进行分布式服务的搭建。以下是这个项目的核心知识点: 1. **RPC原理**:RPC使得客户端可以像调用本地方法一样...
总的来说,这个“菜鸡”实现虽然简单,但能帮助理解RPC的基本原理,是学习和掌握RPC技术的良好起点。实际生产环境中,我们需要考虑更多的因素,如性能、稳定性、扩展性等,选择或定制更适合的RPC框架。
总之,Apache Mina提供了一种灵活且高效的方式来实现RPC,它的事件驱动模型和强大的过滤器框架使得构建网络服务变得更加简单。在实际开发中,可以根据具体需求调整和扩展这些基础架构,以满足不同场景下的性能和功能...
实现一个简单的RPC框架主要涉及以下几个关键点: 1. **网络通信库**:Netty是一个高性能、异步事件驱动的网络应用框架,常用于构建高并发、低延迟的服务器。在RPC框架中,Netty可以处理客户端与服务端之间的网络...
在IT行业中,分布式系统间的通信通常是一个复杂的问题。...RabbitMQ的灵活性和强大的功能使其成为实现RPC的一种理想选择,尤其是在Java这样的多层架构中,能够有效地解耦组件,提高系统的可扩展性和可靠性。
selenium—jsrpc,selenium实现JS的rpc,最近简单的JSRpc,10行代码实现 支持POST,GET 安装 pip install Flask pip install selenium 就可以运行了
简单实现一个RPC 简单实现一个RPC 简单实现一个RPC 简单实现一个RPC 简单实现一个RPC 简单实现一个RPC 简单实现一个RPC ...
在Java中实现一个简单的RPC框架,我们需要理解以下几个关键概念和技术: 1. **网络通信**:RPC的核心是通过网络进行通信。在Java中,我们可以使用Socket API来实现客户端和服务器端之间的数据传输。Socket提供了低...
基于RPC的简单应用通常涉及服务提供者(Server)和消费者(Client)两部分,使得分布式系统间的通信变得更加便捷。 在本压缩包中,"基于RPC的简单应用"可能包括以下组成部分: 1. **实验数据**:这部分可能包含了...
本教程将深入探讨如何在SpringBoot中实现RPC服务。 首先,理解RPC的基本概念是必要的。RPC允许服务提供者暴露一组服务接口,服务消费者通过这些接口像调用本地方法一样调用远程服务。整个过程对调用者透明,隐藏了...
sunrpc是开源实现RPC协议的一个库,广泛用于Linux环境。在NFS系统中,sunrpc提供了解决远程调用的关键机制。 NFS SIMULATOR的主要功能: 1. 目录操作:包括创建、列出和删除目录。 2. 文件操作:支持创建、列出和...
在Java中,我们可以使用RMI(Remote Method Invocation)或者自定义TCP/HTTP协议来实现RPC框架。RMI是Java内置的一种轻量级远程调用方式,但它的跨平台性和扩展性有限。自定义协议则可以更灵活地控制网络通信的细节...