RPC(Remote Procedure Call) 在介绍分布是RPC前首先介绍一个下JAVA中简单的RPC实现
服务器端,通过SocketServer,持续接收客户端的请求,并将客户端的请求分发到指定的处理器出去处理。
/** * * @author zhangwei_david * @version $Id: ServiceServer.java, v 0.1 2015年8月8日 上午11:40:41 zhangwei_david Exp $ */ public class ServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware { /**服务端口号**/ private int port = 12000; private ServerSocket server; //线程池 @Autowired private Executor executorService; public Map<String, Object> handlerMap = new ConcurrentHashMap<>(); private void publishedService() throws Exception { server = new ServerSocket(port); // 一直服务 for (;;) { try { // 获取socket final Socket socket = server.accept(); executorService.execute(new Runnable() { @Override public void run() { try { // 获取input ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); ObjectOutputStream output = new ObjectOutputStream(socket .getOutputStream()); try { // 获取引用 String interfaceName = input.readUTF(); //获取 方法名 String methodName = input.readUTF(); // Class<?>[] parameterTypes = (Class<?>[]) input.readObject(); Object[] arguments = (Object[]) input.readObject(); try { Object service = handlerMap.get(interfaceName); Method method = service.getClass().getMethod(methodName, parameterTypes); Object result = method.invoke(service, arguments); output.writeObject(result); } catch (Throwable t) { output.writeObject(t); } finally { input.close(); } } finally { socket.close(); } } catch (Exception e) { } } }); } catch (Exception e) { } } } public void init() { } /** * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() */ @Override public void afterPropertiesSet() throws Exception { //发布服务 publishedService(); } /** * @see org.springframework.context.Lifecycle#start() */ @Override public void start() { } /** * @see org.springframework.context.Lifecycle#stop() */ @Override public void stop() { if (server != null) { try { server.close(); } catch (IOException e) { } } } /** * @see org.springframework.context.Lifecycle#isRunning() */ @Override public boolean isRunning() { return false; } /** * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext) */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class); System.out.println(serviceBeanMap); if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) { for (Object serviceBean : serviceBeanMap.values()) { String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf() .getName(); handlerMap.put(interfaceName, serviceBean); } } } /** * Setter method for property <tt>executorService</tt>. * * @param executorService value to be assigned to property executorService */ public void setExecutorService(Executor executorService) { this.executorService = executorService; } }
/** * * @author zhangwei_david * @version $Id: SRPC.java, v 0.1 2015年8月8日 下午12:51:17 zhangwei_david Exp $ */ @Documented @Target({ ElementType.TYPE }) @Retention(RetentionPolicy.RUNTIME) @Component public @interface SRPC { public Class<?> interf(); }
至此就实现了服务的自动发现自动注册,当然这个仅针对单机环境下的自动注册。
/** * * @author zhangwei_david * @version $Id: Client.java, v 0.1 2015年8月8日 下午12:28:44 zhangwei_david Exp $ */ public class Client { /** * 引用服务 * * @param <T> 接口泛型 * @param interfaceClass 接口类型 * @param host 服务器主机名 * @param port 服务器端口 * @return 远程服务 * @throws Exception */ @SuppressWarnings("unchecked") public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception { if (interfaceClass == null || !interfaceClass.isInterface()) { throw new IllegalArgumentException("必须指定服务接口"); } if (host == null || host.length() == 0) { throw new IllegalArgumentException("必须指定服务器的地址和端口号"); } return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] { interfaceClass }, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable { Socket socket = new Socket(host, port); try { ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); try { output.writeUTF(interfaceClass.getName()); output.writeUTF(method.getName()); output.writeObject(method.getParameterTypes()); output.writeObject(arguments); ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { Object result = input.readObject(); if (result instanceof Throwable) { throw (Throwable) result; } return result; } finally { input.close(); } } finally { output.close(); } } finally { socket.close(); } } }); } }
上面在没有使用第三方依赖包实现了简单的RPC,优化增加 request和reponse,定义RPC协议。
/** * * @author zhangwei_david * @version $Id: SrpcRequest.java, v 0.1 2015年8月8日 下午1:45:53 zhangwei_david Exp $ */ public class SrpcRequest implements Serializable { /** */ private static final long serialVersionUID = 6132853628325824727L; // 请求Id private String requestId; // 远程调用接口名称 private String interfaceName; //远程调用方法名称 private String methodName; // 参数类型 private Class<?>[] parameterTypes; // 参数值 private Object[] parameters; /** * Getter method for property <tt>requestId</tt>. * * @return property value of requestId */ public String getRequestId() { return requestId; } /** * Setter method for property <tt>requestId</tt>. * * @param requestId value to be assigned to property requestId */ public void setRequestId(String requestId) { this.requestId = requestId; } /** * Getter method for property <tt>interfaceName</tt>. * * @return property value of interfaceName */ public String getInterfaceName() { return interfaceName; } /** * Setter method for property <tt>interfaceName</tt>. * * @param interfaceName value to be assigned to property interfaceName */ public void setInterfaceName(String interfaceName) { this.interfaceName = interfaceName; } /** * Getter method for property <tt>methodName</tt>. * * @return property value of methodName */ public String getMethodName() { return methodName; } /** * Setter method for property <tt>methodName</tt>. * * @param methodName value to be assigned to property methodName */ public void setMethodName(String methodName) { this.methodName = methodName; } /** * Getter method for property <tt>parameterTypes</tt>. * * @return property value of parameterTypes */ public Class<?>[] getParameterTypes() { return parameterTypes; } /** * Setter method for property <tt>parameterTypes</tt>. * * @param parameterTypes value to be assigned to property parameterTypes */ public void setParameterTypes(Class<?>[] parameterTypes) { this.parameterTypes = parameterTypes; } /** * Getter method for property <tt>parameters</tt>. * * @return property value of parameters */ public Object[] getParameters() { return parameters; } /** * Setter method for property <tt>parameters</tt>. * * @param parameters value to be assigned to property parameters */ public void setParameters(Object[] parameters) { this.parameters = parameters; } }
/** * * @author zhangwei_david * @version $Id: SrpcResponse.java, v 0.1 2015年8月8日 下午1:47:46 zhangwei_david Exp $ */ public class SrpcResponse implements Serializable { /** */ private static final long serialVersionUID = -5934073769679010930L; // 请求的Id private String requestId; // 异常 private Throwable error; // 响应 private Object result; /** * Getter method for property <tt>requestId</tt>. * * @return property value of requestId */ public String getRequestId() { return requestId; } /** * Setter method for property <tt>requestId</tt>. * * @param requestId value to be assigned to property requestId */ public void setRequestId(String requestId) { this.requestId = requestId; } /** * Getter method for property <tt>error</tt>. * * @return property value of error */ public Throwable getError() { return error; } /** * Setter method for property <tt>error</tt>. * * @param error value to be assigned to property error */ public void setError(Throwable error) { this.error = error; } /** * Getter method for property <tt>result</tt>. * * @return property value of result */ public Object getResult() { return result; } /** * Setter method for property <tt>result</tt>. * * @param result value to be assigned to property result */ public void setResult(Object result) { this.result = result; } }
/** * * @author zhangwei_david * @version $Id: ServiceServer.java, v 0.1 2015年8月8日 上午11:40:41 zhangwei_david Exp $ */ public class ServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware { /**服务端口号**/ private int port = 12000; private ServerSocket server; //线程池 @Autowired private Executor executorService; public Map<String, Object> handlerMap = new ConcurrentHashMap<>(); private void publishedService() throws Exception { server = new ServerSocket(port); // 一直服务 for (;;) { try { // 获取socket final Socket socket = server.accept(); executorService.execute(new Runnable() { @Override public void run() { try { // 获取input ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { // 获取RPC请求 SrpcRequest request = (SrpcRequest) input.readObject(); ObjectOutputStream output = new ObjectOutputStream(socket .getOutputStream()); try { SrpcResponse response = doHandle(request); output.writeObject(response); } finally { input.close(); } } finally { socket.close(); } } catch (Exception e) { } } }); } catch (Exception e) { } } } private SrpcResponse doHandle(SrpcRequest request) { SrpcResponse response = new SrpcResponse(); response.setRequestId(request.getRequestId()); try { Object service = handlerMap.get(request.getInterfaceName()); Method method = service.getClass().getMethod(request.getMethodName(), request.getParameterTypes()); response.setResult(method.invoke(service, request.getParameters())); } catch (Exception e) { response.setError(e); } return response; } public void init() { } /** * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() */ @Override public void afterPropertiesSet() throws Exception { //发布 publishedService(); } /** * @see org.springframework.context.Lifecycle#start() */ @Override public void start() { } /** * @see org.springframework.context.Lifecycle#stop() */ @Override public void stop() { if (server != null) { try { server.close(); } catch (IOException e) { } } } /** * @see org.springframework.context.Lifecycle#isRunning() */ @Override public boolean isRunning() { return false; } /** * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext) */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class); System.out.println(serviceBeanMap); if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) { for (Object serviceBean : serviceBeanMap.values()) { String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf() .getName(); handlerMap.put(interfaceName, serviceBean); } } } /** * Setter method for property <tt>executorService</tt>. * * @param executorService value to be assigned to property executorService */ public void setExecutorService(Executor executorService) { this.executorService = executorService; } }
/** * * @author zhangwei_david * @version $Id: Client.java, v 0.1 2015年8月8日 下午12:28:44 zhangwei_david Exp $ */ public class Client { /** * 引用服务 * * @param <T> 接口泛型 * @param interfaceClass 接口类型 * @param host 服务器主机名 * @param port 服务器端口 * @return 远程服务 * @throws Exception */ @SuppressWarnings("unchecked") public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception { if (interfaceClass == null || !interfaceClass.isInterface()) { throw new IllegalArgumentException("必须指定服务接口"); } if (host == null || host.length() == 0) { throw new IllegalArgumentException("必须指定服务器的地址和端口号"); } return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] { interfaceClass }, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable { Socket socket = new Socket(host, port); try { ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); try { SrpcRequest request = new SrpcRequest(); request.setRequestId(UUID.randomUUID().toString()); request.setInterfaceName(interfaceClass.getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParameters(arguments); output.writeObject(request); ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { SrpcResponse response = (SrpcResponse) input.readObject(); if (response.getError() != null && response.getError() instanceof Throwable) { throw response.getError(); } return response.getResult(); } finally { input.close(); } } finally { output.close(); } } finally { socket.close(); } } }); } }
后续继续优化序列化和NIO优化
/** * * @author zhangwei_david * @version $Id: ServiceServer.java, v 0.1 2015年8月8日 上午11:40:41 zhangwei_david Exp $ */ public class ServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware { /**服务端口号**/ private int port = 12000; private Selector selector; private ServerSocketChannel serverSocketChannel; public Map<String, Object> handlerMap = new ConcurrentHashMap<>(); private void publishedService() throws Exception { // 一直服务 for (;;) { try { //超时1s selector.select(1000); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectionKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { e.printStackTrace(); } } } catch (Exception e) { e.printStackTrace(); } } } private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { if (key.isAcceptable()) { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); } if (key.isReadable()) { SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); SrpcRequest request = SerializationUtil.deserializer(bytes, SrpcRequest.class); SrpcResponse response = doHandle(request); doWriteResponse(sc, response); } else if (readBytes < 0) { key.cancel(); sc.close(); } } } } private void doWriteResponse(SocketChannel channel, SrpcResponse response) throws IOException { if (response == null) { return; } byte[] bytes = SerializationUtil.serializer(response); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer); } /** * * @throws IOException * @throws ClosedChannelException */ private void init() throws IOException, ClosedChannelException { // 打开socketChannel serverSocketChannel = ServerSocketChannel.open(); //设置非阻塞模式 serverSocketChannel.configureBlocking(false); // 绑定端口 serverSocketChannel.socket().bind(new InetSocketAddress(port)); // 创建selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } private SrpcResponse doHandle(SrpcRequest request) { SrpcResponse response = new SrpcResponse(); if (StringUtils.isBlank(request.getRequestId())) { response.setError(new IllegalArgumentException("request id must be not null")); } response.setRequestId(request.getRequestId()); try { Object service = handlerMap.get(request.getInterfaceName()); Method method = service.getClass().getMethod(request.getMethodName(), request.getParameterTypes()); response.setResult(method.invoke(service, request.getParameters())); } catch (Exception e) { response.setError(e); } return response; } /** * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() */ @Override public void afterPropertiesSet() throws Exception { init(); //发布 publishedService(); } /** * @see org.springframework.context.Lifecycle#start() */ @Override public void start() { } /** * @see org.springframework.context.Lifecycle#stop() */ @Override public void stop() { } /** * @see org.springframework.context.Lifecycle#isRunning() */ @Override public boolean isRunning() { return false; } /** * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext) */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class); System.out.println(serviceBeanMap); if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) { for (Object serviceBean : serviceBeanMap.values()) { String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf() .getName(); handlerMap.put(interfaceName, serviceBean); } } } }
/** * * @author zhangwei_david * @version $Id: Client.java, v 0.1 2015年8月8日 下午12:28:44 zhangwei_david Exp $ */ public class Client { /** * 引用服务 * * @param <T> 接口泛型 * @param interfaceClass 接口类型 * @param host 服务器主机名 * @param port 服务器端口 * @return 远程服务 * @throws Exception */ @SuppressWarnings("unchecked") public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception { if (interfaceClass == null || !interfaceClass.isInterface()) { throw new IllegalArgumentException("必须指定服务接口"); } if (host == null || host.length() == 0) { throw new IllegalArgumentException("必须指定服务器的地址和端口号"); } return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] { interfaceClass }, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable { //创建请求 SrpcRequest request = new SrpcRequest(); request.setRequestId(UUID.randomUUID().toString()); request.setInterfaceName(interfaceClass.getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParameters(arguments); SrpcResponse response = sendReqeust(request, host, port); if (response == null || !StringUtils.equals(request.getRequestId(), response.getRequestId())) { return null; } if (response.getError() != null) { throw response.getError(); } return response.getResult(); } }); } public static SrpcResponse sendReqeust(SrpcRequest request, String host, int port) throws IOException { SocketChannel socketChannel = connect(host, port); byte[] requestBytes = SerializationUtil.serializer(request); ByteBuffer writeBuffer = ByteBuffer.allocate(requestBytes.length); writeBuffer.put(requestBytes); writeBuffer.flip(); socketChannel.write(writeBuffer); return readResoponse(socketChannel); } /** * * @return * @throws IOException */ private static SrpcResponse readResoponse(SocketChannel socketChannel) throws IOException { try { ByteBuffer readBuffer = ByteBuffer.allocate(1024); while (socketChannel.read(readBuffer) != -1) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); return SerializationUtil.deserializer(bytes, SrpcResponse.class); } return null; } finally { socketChannel.close(); } } private static SocketChannel connect(String host, int port) throws IOException {//连接到CSDN InetSocketAddress socketAddress = new InetSocketAddress(host, port); return SocketChannel.open(socketAddress); } }
在分布式系统中,为了提供系统的可用性和稳定性一般都会将服务部署在多台服务器上,为了实现自动注册自动发现远程服务,通过ZK,和ProtocolBuffe 以及Netty实现一个简单的分布式RPC框架。
首先简单介绍一下Zookeeper和ProtocalBuffer
Zookeeper 是由Apache Handoop的子项目发展而来。是知名的互联网公司Yahoo创建的。Zookeeper为分布式应用提供了高效且可靠的分布式协调服务。
ProtocolBuffer是用于结构化数据串行化的灵活、高效、自动的方法,有如XML,不过它更小、更快、也更简单。你可以定义自己的数据结构,然后使用代码生成器生成的代码来读写这个数据结构。你甚至可以在无需重新部署程序的情况下更新数据结构。
RPC 就是Remote Procedure Call Protocol 远程过程调用协议。
JAVA对象要能够在网络上传输都必须序列化,使用高效的序列化框架ProtocolBuffer实现序列化。
/** * 序列化工具 * @author zhangwei_david * @version $Id: SerializationUtil.java, v 0.1 2014年12月31日 下午5:41:35 zhangwei_david Exp $ */ public class SerializationUtil { private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>(); private static Objenesis objenesis = new ObjenesisStd(true); private static <T> Schema<T> getSchema(Class<T> clazz) { @SuppressWarnings("unchecked") Schema<T> schema = (Schema<T>) cachedSchema.get(clazz); if (schema == null) { schema = RuntimeSchema.getSchema(clazz); if (schema != null) { cachedSchema.put(clazz, schema); } } return schema; } /** * 序列化 * * @param obj * @return */ public static <T> byte[] serializer(T obj) { @SuppressWarnings("unchecked") Class<T> clazz = (Class<T>) obj.getClass(); LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); try { Schema<T> schema = getSchema(clazz); return ProtostuffIOUtil.toByteArray(obj, schema, buffer); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } finally { buffer.clear(); } } /** * 反序列化 * * @param data * @param clazz * @return */ public static <T> T deserializer(byte[] data, Class<T> clazz) { try { T obj = objenesis.newInstance(clazz); Schema<T> schema = getSchema(clazz); ProtostuffIOUtil.mergeFrom(data, obj, schema); return obj; } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } }
远程调用的请求对象
/** *Rpc 请求的主体 * @author zhangwei_david * @version $Id: SrRequest.java, v 0.1 2014年12月31日 下午6:06:25 zhangwei_david Exp $ */ public class RpcRequest { // 请求Id private String requestId; // 远程调用类名称 private String className; //远程调用方法名称 private String methodName; // 参数类型 private Class<?>[] parameterTypes; // 参数值 private Object[] parameters; /** * Getter method for property <tt>requestId</tt>. * * @return property value of requestId */ public String getRequestId() { return requestId; } /** * Setter method for property <tt>requestId</tt>. * * @param requestId value to be assigned to property requestId */ public void setRequestId(String requestId) { this.requestId = requestId; } /** * Getter method for property <tt>className</tt>. * * @return property value of className */ public String getClassName() { return className; } /** * Setter method for property <tt>className</tt>. * * @param className value to be assigned to property className */ public void setClassName(String className) { this.className = className; } /** * Getter method for property <tt>methodName</tt>. * * @return property value of methodName */ public String getMethodName() { return methodName; } /** * Setter method for property <tt>methodName</tt>. * * @param methodName value to be assigned to property methodName */ public void setMethodName(String methodName) { this.methodName = methodName; } /** * Getter method for property <tt>parameterTypes</tt>. * * @return property value of parameterTypes */ public Class<?>[] getParameterTypes() { return parameterTypes; } /** * Setter method for property <tt>parameterTypes</tt>. * * @param parameterTypes value to be assigned to property parameterTypes */ public void setParameterTypes(Class<?>[] parameterTypes) { this.parameterTypes = parameterTypes; } /** * Getter method for property <tt>parameters</tt>. * * @return property value of parameters */ public Object[] getParameters() { return parameters; } /** * Setter method for property <tt>parameters</tt>. * * @param parameters value to be assigned to property parameters */ public void setParameters(Object[] parameters) { this.parameters = parameters; } /** * @see java.lang.Object#toString() */ @Override public String toString() { return "RpcRequest [requestId=" + requestId + ", className=" + className + ", methodName=" + methodName + ", parameterTypes=" + Arrays.toString(parameterTypes) + ", parameters=" + Arrays.toString(parameters) + "]"; } }
远程调用的响应对象
/** *Rpc 响应的主体 * @author zhangwei_david * @version $Id: SrResponse.java, v 0.1 2014年12月31日 下午6:07:27 zhangwei_david Exp $ */ public class RpcResponse { // 请求的Id private String requestId; // 异常 private Throwable error; // 响应 private Object result; /** * Getter method for property <tt>requestId</tt>. * * @return property value of requestId */ public String getRequestId() { return requestId; } /** * Setter method for property <tt>requestId</tt>. * * @param requestId value to be assigned to property requestId */ public void setRequestId(String requestId) { this.requestId = requestId; } /** * Getter method for property <tt>error</tt>. * * @return property value of error */ public Throwable getError() { return error; } /** * Setter method for property <tt>error</tt>. * * @param error value to be assigned to property error */ public void setError(Throwable error) { this.error = error; } /** * Getter method for property <tt>result</tt>. * * @return property value of result */ public Object getResult() { return result; } /** * Setter method for property <tt>result</tt>. * * @param result value to be assigned to property result */ public void setResult(Object result) { this.result = result; } /** *如果有异常则表示失败 * @return */ public boolean isError() { return error != null; } /** * @see java.lang.Object#toString() */ @Override public String toString() { return "RpcResponse [requestId=" + requestId + ", error=" + error + ", result=" + result + "]"; } }
RPC编码与解码
/** *RPC 解码 * @author zhangwei_david * @version $Id: RpcDecoder.java, v 0.1 2014年12月31日 下午8:53:16 zhangwei_david Exp $ */ public class RpcDecoder extends ByteToMessageDecoder { private Class<?> genericClass; public RpcDecoder(Class<?> genericClass) { this.genericClass = genericClass; } @Override public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() < 4) { return; } in.markReaderIndex(); int dataLength = in.readInt(); if (dataLength < 0) { ctx.close(); } if (in.readableBytes() < dataLength) { in.resetReaderIndex(); } byte[] data = new byte[dataLength]; in.readBytes(data); Object obj = SerializationUtil.deserializer(data, genericClass); out.add(obj); } }
/** * * @author zhangwei_david * @version $Id: RpcEncoder.java, v 0.1 2014年12月31日 下午8:55:25 zhangwei_david Exp $ */ @SuppressWarnings("rawtypes") public class RpcEncoder extends MessageToByteEncoder { private Class<?> genericClass; public RpcEncoder(Class<?> genericClass) { this.genericClass = genericClass; } @Override public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception { if (genericClass.isInstance(in)) { byte[] data = SerializationUtil.serializer(in); out.writeInt(data.length); out.writeBytes(data); } } }
RPC的请求处理器
/** *RPC请求处理器 * @author zhangwei_david * @version $Id: RpcHandler.java, v 0.1 2014年12月31日 下午9:04:52 zhangwei_david Exp $ */ public class RpcHandler extends SimpleChannelInboundHandler<RpcRequest> { private static final Logger logger = LogManager.getLogger(RpcHandler.class); private final Map<String, Object> handlerMap; public RpcHandler(Map<String, Object> handlerMap) { this.handlerMap = handlerMap; } @Override public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception { RpcResponse response = new RpcResponse(); // 将请求的Id写入Response response.setRequestId(request.getRequestId()); try { LogUtils.info(logger, "处理请求:{0}", request); Object result = handle(request); response.setResult(result); } catch (Throwable t) { response.setError(t); } ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } /** * 请求的处理主体 * * @param request * @return * @throws Throwable */ private Object handle(RpcRequest request) throws Throwable { String className = request.getClassName(); Object serviceBean = handlerMap.get(className); Class<?> serviceClass = serviceBean.getClass(); String methodName = request.getMethodName(); Class<?>[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getParameters(); FastClass serviceFastClass = FastClass.create(serviceClass); FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes); return serviceFastMethod.invoke(serviceBean, parameters); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); } }
为了方便实现服务的注册,定义一个注解
/** * 简单的RPC协议的方法的注解 * @author zhangwei_david * @version $Id: STRService.java, v 0.1 2014年12月31日 下午4:33:14 zhangwei_david Exp $ */ @Target({ ElementType.TYPE }) @Retention(RetentionPolicy.RUNTIME) @Component public @interface RpcService { String value() default ""; Class<?> inf(); }
将远程服务注册到ZK
/** * 简单RPC服务注册 * <ul> * 注册方法是register(),该方法的主要功能如下: * <li> 对目标服务器创建一个ZooKeeper实例</li> * <li> 如果可以成功创建ZooKeeper实例,则创建一个节点</li> * </ul> * @author zhangwei_david * @version $Id: ServiceRegistry.java, v 0.1 2014年12月31日 下午6:08:47 zhangwei_david Exp $ */ public class ServiceRegistry { // 日期记录器 private static final Logger logger = LogManager.getLogger(ServiceRegistry.class); // 使用计数器实现同步 private CountDownLatch latch = new CountDownLatch(1); private int timeout = Constant.DEFAULT_ZK_SESSION_TIMEOUT; private String registerPath = Constant.DEFAULT_ZK_REGISTRY_PATH; private String registerAddress; public void register(String data) { LogUtils.debug(logger, "注册服务{0}", data); if (data != null) { ZooKeeper zk = connectServer(); if (zk != null) { // 创建节点 createNode(zk, data); } } } /** * *创建zooKeeper * @return */ private ZooKeeper connectServer() { ZooKeeper zk = null; try { LogUtils.info(logger, "创建zk,参数是:address:{0},timeout:{1}", registerAddress, timeout); // 创建一个zooKeeper实例,第一个参数是目标服务器地址和端口,第二个参数是session 超时时间,第三个参数是节点发生变化时的回调方法 zk = new ZooKeeper(registerAddress, timeout, new Watcher() { public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { // 计数器减一 latch.countDown(); } } }); // 阻塞到计数器为0,直到节点的变化回调方法执行完成 latch.await(); } catch (Exception e) { LogUtils.error(logger, "connectServer exception", e); } // 返回ZooKeeper实例 return zk; } /** * * * @param zk ZooKeeper的实例 * @param data 注册数据 */ private void createNode(ZooKeeper zk, String data) { try { byte[] bytes = data.getBytes(); /** * 创建一个节点,第一个参数是该节点的路径,第二个参数是该节点的初始化数据,第三个参数是该节点的ACL,第四个参数指定节点的创建策略 */ String createResult = zk.create(registerPath, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); LogUtils.info(logger, "创建的结果是:{0}", createResult); } catch (Exception e) { LogUtils.error(logger, "createNode exception", e); } } /** * Getter method for property <tt>timeout</tt>. * * @return property value of timeout */ public int getTimeout() { return timeout; } /** * Setter method for property <tt>timeout</tt>. * * @param timeout value to be assigned to property timeout */ public void setTimeout(int timeout) { this.timeout = timeout; } /** * Getter method for property <tt>registerPath</tt>. * * @return property value of registerPath */ public String getRegisterPath() { return registerPath; } /** * Setter method for property <tt>registerPath</tt>. * * @param registerPath value to be assigned to property registerPath */ public void setRegisterPath(String registerPath) { this.registerPath = registerPath; } /** * Getter method for property <tt>registerAddress</tt>. * * @return property value of registerAddress */ public String getRegisterAddress() { return registerAddress; } /** * Setter method for property <tt>registerAddress</tt>. * * @param registerAddress value to be assigned to property registerAddress */ public void setRegisterAddress(String registerAddress) { this.registerAddress = registerAddress; } }
至此在服务启动时就可以方便地注册到ZK
RPC调用客户端
/** *RPC客户端 * @author zhangwei_david * @version $Id: RpcClient.java, v 0.1 2014年12月31日 下午9:18:34 zhangwei_david Exp $ */ public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> { private String host; private int port; private RpcResponse response; private final Object obj = new Object(); public RpcClient(String host, int port) { this.host = host; this.port = port; } @Override public void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception { this.response = response; synchronized (obj) { obj.notifyAll(); // 收到响应,唤醒线程 } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } public RpcResponse send(RpcRequest request) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { channel.pipeline().addLast(new RpcEncoder(RpcRequest.class)) // 将 RPC 请求进行编码(为了发送请求) .addLast(new RpcDecoder(RpcResponse.class)) // 将 RPC 响应进行解码(为了处理响应) .addLast(RpcClient.this); // 使用 RpcClient 发送 RPC 请求 } }).option(ChannelOption.SO_KEEPALIVE, true); ChannelFuture future = bootstrap.connect(host, port).sync(); future.channel().writeAndFlush(request).sync(); synchronized (obj) { obj.wait(); // 未收到响应,使线程等待 } if (response != null) { future.channel().closeFuture().sync(); } return response; } finally { group.shutdownGracefully(); } } }
RPC服务发现:
/** *Rpc 服务发现 * @author zhangwei_david * @version $Id: ServiceDiscovery.java, v 0.1 2014年12月31日 下午9:10:23 zhangwei_david Exp $ */ public class ServiceDiscovery { // 日志 private static final Logger logger = LogManager.getLogger(ServiceDiscovery.class); private CountDownLatch latch = new CountDownLatch(1); private volatile List<String> dataList = new ArrayList<String>(); private String registryAddress; public void init() { LogUtils.debug(logger, "Rpc 服务发现初始化..."); ZooKeeper zk = connectServer(); if (zk != null) { watchNode(zk); } } public String discover() { String data = null; int size = dataList.size(); if (size > 0) { if (size == 1) { data = dataList.get(0); } else { data = dataList.get(ThreadLocalRandom.current().nextInt(size)); } } return data; } private ZooKeeper connectServer() { ZooKeeper zk = null; try { zk = new ZooKeeper(registryAddress, Constant.DEFAULT_ZK_SESSION_TIMEOUT, new Watcher() { public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { latch.countDown(); } } }); latch.await(); } catch (Exception e) { } LogUtils.debug(logger, "zk 是{0}", zk); return zk; } private void watchNode(final ZooKeeper zk) { try { List<String> nodeList = zk.getChildren(Constant.ROOT, new Watcher() { public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeChildrenChanged) { watchNode(zk); } } }); LogUtils.debug(logger, "zk 节点有 {0}", nodeList); List<String> dataList = new ArrayList<String>(); for (String node : nodeList) { byte[] bytes = zk.getData(Constant.ROOT + node, false, null); dataList.add(new String(bytes)); } this.dataList = dataList; if (dataList.isEmpty()) { throw new RuntimeException("尚未注册任何服务"); } } catch (Exception e) { LogUtils.error(logger, "发现节点异常", e); } } /** * Setter method for property <tt>registryAddress</tt>. * * @param registryAddress value to be assigned to property registryAddress */ public void setRegistryAddress(String registryAddress) { this.registryAddress = registryAddress; } }
测试:
/** * * @author zhangwei_david * @version $Id: HelloService.java, v 0.1 2014年12月31日 下午9:27:28 zhangwei_david Exp $ */ public interface HelloService { String hello(); }
/** * * @author zhangwei_david * @version $Id: HelloServiceImpl.java, v 0.1 2014年12月31日 下午9:28:02 zhangwei_david Exp $ */ @RpcService(value = "helloService", inf = HelloService.class) public class HelloServiceImpl implements HelloService { public String hello() { return "Hello! "; } }
服务端配置:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:p="http://www.springframework.org/schema/p" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd "> <context:component-scan base-package="com.david.common.test"/> <!-- 配置服务注册组件 --> <bean id="serviceRegistry" class="com.david.common.rpc.registry.ServiceRegistry"> <property name="registerAddress" value="127.0.0.1:2181"/> </bean> <!-- 配置 RPC 服务器 --> <bean id="rpcServer" class="com.david.common.rpc.server.RpcServer"> <property name="serverAddress" value="127.0.0.1:8000"/> <property name="serviceRegistry" ref="serviceRegistry"/> </bean> </beans>
客户端配置:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:p="http://www.springframework.org/schema/p" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd "> <context:component-scan base-package="com.david.common.*"/> <bean id="serviceDiscovery" class="com.david.common.rpc.discovery.ServiceDiscovery" init-method="init"> <property name="registryAddress" value="127.0.0.1:2181"/> </bean> <!-- 配置 RPC 代理 --> <bean id="rpcProxy" class="com.david.common.rpc.proxy.RpcProxyFactory"> <property name="serviceDiscovery" ref="serviceDiscovery"/> </bean> </beans>
服务端:
/** * * @author zhangwei_david * @version $Id: Server.java, v 0.1 2014年12月31日 下午9:56:37 zhangwei_david Exp $ */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring.xml") public class Server { @Test public void helloTest() throws InterruptedException { System.out.println("启动"); TimeUnit.HOURS.sleep(1); } }
客户端:
/** * * @author zhangwei_david * @version $Id: MyTest.java, v 0.1 2014年12月31日 下午9:25:49 zhangwei_david Exp $ */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:client.xml") public class HelloServiceTest { @Autowired private RpcProxyFactory rpcProxy; @Test public void helloTest() { HelloService helloService = rpcProxy.create(HelloService.class); String result = helloService.hello(); Assert.assertEquals("Hello! ", result); } }
相关推荐
Dubbo 是阿里巴巴开源的一个高性能、轻量级的Java RPC框架,它提供了服务注册与发现、负载均衡、容错、监控等能力,旨在提高服务之间的通信效率和稳定性。Dubbo 支持多种协议,如Dubbo、HTTP、Hessian等,并且可以...
RPC-FromScratch是一个基于Java开发的轻量级分布式RPC框架,包含108个文件,其中包括50个XML文件、23个Java编译文件、20个Java源文件、8个Iml文件、4个Properties文件、1个Idea缓存文件和1个LICENSE文件。...
基于Java的分布式RPC框架 项目简介 本项目是一个基于Java的分布式RPC(远程过程调用)框架,旨在简化分布式系统中的服务调用。通过该框架,开发者可以轻松定义、发布和调用RPC服务,实现不同服务之间的无缝通信。...
①blackRpc是一个简单的分布式RPC框架 ② 框架组成:spring,netty,zookeeper 序列化方式支持:fastjson,msgpack,protostuff 集群负载均衡策略:轮训,加权轮训,随机,加权随机,一致性哈希 ③ 支持spring多种作用域...
分布式RPC系统框架-Dubbo(2.7)教程涵盖了在现代软件开发中至关重要的技术领域,主要涉及分布式系统、远程过程调用(RPC)以及相关的中间件和服务治理。Dubbo是阿里巴巴开源的一款高性能、轻量级的RPC框架,它极大地...
非常好的 轻量级分布式 RPC 框架 学习资料 根据以上技术需求,我们可使用如下技术选型: Spring:它是最强大的依赖注入框架,也是业界的权威标准。 Netty:它使 NIO 编程更加容易,屏蔽了 Java 底层的 NIO 细节。 ...
Dubbo是一个分布式服务框架,致力于提供高性能和透明化的RPC远程服务调用方案,以及SOA服务治理方案; Alibaba开源的SOA框架; 与Spring框架无缝集成;
Nepxion Thunder是一个基于Java的分布式RPC框架,集成了Netty、Hessian、Kafka、ActiveMQ、Tibco、Zookeeper、Redis、Spring Web MVC、Spring Boot和Docker等技术。它支持多协议、多组件和多序列化,为开发者提供了...
Dubbo+Zookeeper分布式RPC系统框架实战教程,课程内容从分布式系统技术知识点教学,Dubbo四大组件,Zookeeper,Dubbo源码解析,分布式系统技术图谱,分布式RPC系统框架全面解读。
分布式RPC框架Apache Dubbo是阿里巴巴开源的一款高性能、轻量级的Java远程调用框架,它致力于提供简单、高效、可扩展的服务发现与调用机制。本文将深入探讨Dubbo的核心特性,包括服务注册中心Zookeeper的安装使用、...
【描述】提到"zookeeper+dubbo分布式框架demo,含所有jar,工具类,可直接运行",这表明压缩包中包含了完整的开发环境,包括必要的库文件(jar包)和辅助工具类,用户可以直接下载解压后进行运行,无需额外配置,极...
本项目通过整合Dubbo、Zookeeper和SpringMVC,构建了一个完整的分布式服务治理框架,以实现高效的服务注册、发现、管理和监控。 首先,我们要理解Dubbo的核心功能。Dubbo提供了服务提供者(Provider)和服务消费者...
Dubbo是一款高性能、轻量级的开源Java RPC框架,它提供了面向接口的代理、服务自动注册与发现、动态配置、负载均衡等功能,使得服务的开发、部署和治理变得更加简单。Zookeeper是Apache的一个分布式协调服务,用于...
【Dubbo+Zookeeper的RPC分布式集群服务系统】服务端接口.zip这个压缩包包含了基于Dubbo和Zookeeper构建的RPC分布式服务系统的源代码。Dubbo是阿里巴巴开源的一个高性能、轻量级的服务框架,它提供了服务治理、负载...
这个项目提供了完整的Dubbo+Zookeeper分布式服务系统的基础架构,可以帮助开发者快速理解和实践分布式服务的开发和部署。通过深入学习和研究这些源码,你将能掌握如何构建一个稳定、可扩展的分布式服务集群。
目录网盘文件永久链接 1.01分布式系统技术知识点梳理 1.02系统架构的发展历程 1.03架构师的基本素养 1.04Dubbo简介 1.05Dubbo的四大组件 1.06Dubbo与依赖版本 ...4.1 分布式RPC系统框架第二节课 ............
当有多个服务提供者时,RPC框架需要实现负载均衡策略,如轮询、随机、权重分配等,以分发请求。`RpcFramework.java`可能包含实现负载均衡的逻辑,选择合适的提供者来处理请求。 7. **异常处理**: 在RPC调用中,...