1.介绍
现在都喜欢用什么高大上的东西,弄出一堆框架来。RPC即为远程过程调用协议,让两个终端之间不需再关注网络传输的实现。
这里以实现简单聊天室为目的,一步一步搭建属于自己的RPC架构。这里起名为everyw,意为eyerywhere,在任何地方都可以使用。
服务器,提供服务者;客户端,使用服务者。
2.服务端功能
聊天服务器最基本的服务就是注册用户,发送聊天信息,这里新建一个表示服务器功能的接口
/** * 服务器功能 */ public interface Api { /** * 注册用户 * @param name * @return */ Message<Void> regist(String name); /** * 发送聊天信息 * @param message */ Message<Void> sendMessage(String message); }
为了显现服务器网络连通等出现的异常,这里提供了一个类似Optional的类Message
/** * 消息体封装类 */ public class Message<T> implements java.io.Serializable { private T value; private String error; private boolean hashError; public Message( ) { } public Message(T obj) { this.value=obj; } public Message(T obj,String error) { this.value=obj; this.error=error; hashError=true; } /** * 是否有异常 * * @return */ public boolean hasError() { return hashError; } /** * 返回默认信息 * * @return */ public String getError() { return error; } /** * 有网络异常时调用 * @param action * @return */ public Message<T> error(Consumer<String> action) { if(hashError) { action.accept(error); } return this; } /** * 成功返回时调用 * @param action * @return */ public Message<T> success(Consumer<Void> action) { if(error==null) { action.accept(null); } return this; } /** * 有返回值时返回 * @param action * @return */ public Message<T> value(Consumer<T> action) { if(value!=null) { action.accept(value); } return this; } public T getValue() { return value; } /** * @param error */ public void setError(String error) { this.error=error; hashError=true; } }
为了让Message可以传输,添加了java.io.Serializable接口。
3.客户端能力
客户端有时需要向服务器展现自己的能力,方便服务器调用。一个简单的聊天室就具有接收消息,处理消息的能力。这里新建一个表示客户端能力的接口
/** * 客户端能力 */ public interface Listener { /** * 接收到消息 */ void onMessage(String msg); }
5.客户端实现
作为一个客户端,除了需要知道服务器的地址,就不需要关注其具体通信了。
public static void main(String[] args) { String url = "127.0.0.1:8080"; ApplicationContext context = ApplicationContext.getContext(url); Api api = context.getService(Api.class); Listener listener = message -> { System.out.println("消息:" + message); }; context.registListener(listener); try (Scanner br = new Scanner(System.in)) { System.out.println("请输入你的名字:"); String line = br.nextLine(); api.regist(line).error(e -> { System.err.println("注册失败:" + e); System.exit(1); }); while (true) { line = br.nextLine(); api.sendMessage(line); } } }
代码量很少很简洁,也无任何网络通信的代码。
ApplicationContext 将是重点要实现的内容,这里通过getContext获取单例应用上下文,getService获取具体服务实现,registListener则是向服务展现客户端能力。
6.服务端实现
首先需要服务的实现者,任何类只需要实现服务Api接口即可。
/** * 服务实现者 */ public static class ServerWoker implements Api { private ClientContext client; private String name; /** * * @param client * 当前客户端上下文 */ public ServerWoker(ClientContext client) { this.client = client; } @Override public Message<Void> regist(String name) { Logger.d(client +"注册用户名:"+name); this.name = name; List<ClientContext> clients = client.getServerContext().getClientContexts(); if (clients != null) { for (ClientContext c : clients) { if (name.equals(c.getSession(true).get("name"))) { return new Message<Void>(null, "用户名已存在"); } } } // 储存用户名 client.getSession(true).put("name", name); return new Message<Void>(); } @Override public Message<Void> sendMessage(String message) { Logger.d("客户端["+client +"]发送消息:"+name+":"+message); List<ClientContext> clients = client.getServerContext().getClientContexts(); if (clients != null) { clients.forEach(c -> { Listener listener = c.getListener(Listener.class); if (listener != null) { listener.onMessage(name + ":" + message); } }); } return new Message<Void>(); } }
业务逻辑也非常的清晰明了。
服务端启动代码也非常简洁容易明白。
public static void main(String[] args) { //创建服务上下文 ServerContext context = ServerContext.create(8080); //为每个客户端绑定能力 context.bindService(client -> { return new ServerWoker(client); }); context.start();// 启动服务 }
这里ServerContext为服务器上下文,包括了创建、启动服务,绑定能力,获取所有客户端上下文等功能。
ClientContext客户端上下文,可以获取使用获取客户端能力,保存使用客户信息等功能。
7.项目组织结构
在实现具体代码之前,先看一下项目结构,如下
依赖关系图,
8.客户端架构具体实现过程
ApplicationContext首先需要通过socket连接,和创建一个读取线程
private ApplicationContext(String url) { int i = url.lastIndexOf(":"); if (i == -1) { this.url = url; this.port = 80; } else { this.url = url.substring(0, i); this.port = Integer.parseInt(url.substring(i + 1)); } isStart = false; listeners=new ArrayList<>(); map=new HashMap<>(); } /** * 启动连接 * @throws IOException @throws */ private synchronized void start() { try { socket = new Socket(url, port); writer = new DataOutputStream(socket.getOutputStream()); reader = new DataInputStream(socket.getInputStream()); new ReadWorker().start(); isStart = true; } catch (IOException e) { e.printStackTrace(); } }
ApplicationContext#getService 传入的是Class对象,并不是具体实现对象。Proxy 提供用于创建动态代理类和实例的静态方法,它还是由这些方法创建的所有动态代理类的超类,通过代理反射的方式可以将抽象接口在运行时转变为具体实现类。
/** * 获取api服务 * * @param clz * @param url * @return */ public <T> T getService(Class<T> clz) { if (!isStart) { start(); } InvocationHandler handler = new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Exception { try { // 向服务端发送数据 long bid=System.currentTimeMillis();//消息块id StringBuffer head = new StringBuffer("clz=" + clz.getName() + ";mth=" + method.getName()+";type="+method.getGenericReturnType() + ";length=" + args.length + ";bid="+bid+";"); Logger.d(head); if (args.length != 0) { head.append(Object2String(args)); } writer.writeUTF(head.toString()); writer.flush(); Logger.d("发送完消息"); //等待来接服务器的消息 String data =getAndwait(bid); Logger.d("接收消息"); return String2Object(data); // 服务端接收数据 } catch (Throwable ex) { ex.printStackTrace(); Message result = (Message) method.getReturnType().newInstance(); result.setError(ex.getMessage()); return result; } } }; return (T) Proxy.newProxyInstance(clz.getClassLoader(), new Class[] { clz }, handler); }
将请求方法,参数封装为协议,通过DataOutputStream将实际请求通过发送出去,并在读取线程中从DataInputStream获取响应数据并转换为返回对象。
Object2String,String2Object这里采用ObjectStream的方式进行转换。
/** * 转换为Object * * @param data * @return * @throws IOException * @throws ClassNotFoundException */ public static Object String2Object(String data) throws IOException, ClassNotFoundException { byte[] bytes = Base64.getDecoder().decode(data); ObjectInputStream oi = new ObjectInputStream(new ByteArrayInputStream(bytes)); return oi.readObject(); } /** * 转换为字符串 * * @param obj * @return * @throws IOException */ public static String Object2String(Object[] objs) throws IOException { return Base64.getEncoder().encodeToString(Object2Bytes(objs)); } /** * 转换为byte数组 * * @param obj * @return * @throws IOException */ public static byte[] Object2Bytes(Object[] objs) throws IOException { ByteArrayOutputStream bot = new ByteArrayOutputStream(); ObjectOutputStream ot = new ObjectOutputStream(bot); for (int i = 0; i < objs.length; i++) { ot.writeObject(objs[i]); } return bot.toByteArray(); }
registListener保存注册器,以等待服务器的调用
/** * 注册能力 */ public void registListener(Object listener) { listeners.add(listener); Class clz = listener.getClass(); long bid=System.currentTimeMillis();//消息块id StringBuffer head = new StringBuffer("clz=" + clz.getName() + ";bid="+bid+";"); try { writer.writeUTF(head.toString()); writer.flush(); } catch (IOException e) { e.printStackTrace(); } }
最后客户端读取线程读取数据,根据不同实现分发功能
public void run() { String line=null; try { while((line=reader.readUTF())!=null) { String[] datas = line.split(";"); if(datas.length==2)//服务端能力回调 { long bid=Long.parseLong(datas[0].split("=")[1]); map.put(bid, datas[1]); Logger.d("服务端能力回调:"+bid); synchronized(lock) { lock.notifyAll(); } }else//服务器调用客户端能力 { String cls = datas[0].split("=")[1]; Class clz = Class.forName(cls); String mth = datas[1].split("=")[1]; String type = datas[2].split("=")[1]; int alen = Integer.parseInt(datas[3].split("=")[1]); long bid = Long.parseLong(datas[4].split("=")[1]); Logger.d("调用客户端能力:"+datas[0]+";"+datas[1]); Object[] args = null; if (alen > 0) { args = String2Objects(datas[5], alen); } for (Object listener : listeners) { Class lcz = listener.getClass(); if (clz.isAssignableFrom(lcz)) { try { Method ath = findMethod(lcz, mth, type, alen); Logger.d("找到能力方法:"+ath); ath.invoke(listener, args); } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { e.printStackTrace(); } } } } } } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); } }
9.服务端架构具体实现过程
首先服务器开启ServerSocket监听,处理来接客户端的连接。
@Override public void run() { Logger.d("正在启动服务"); try (ServerSocket server = new ServerSocket(port);) { while (true) { Socket socket = server.accept(); Logger.d("新客户端连接"); ClientContext client = new ClientContext(this, socket,funs); clients.add(client); } } catch (IOException e) { e.printStackTrace(); } }
bindService保存服务具体实现对象,连接时以传递给每个新客户端上下文
/** * 暴露服务给客户端 * * @param fun */ public void bindService(Function<ClientContext, Object> fun) { this.funs.add(fun); }
ClientContext客户端上下文,包括了处理读写信息的转换,服务能力的回调,创建读取处理线程等
/** * @param serverContext * @param socket * @param funs * @throws IOException */ ClientContext(ServerContext serverContext, Socket socket, List<Function<ClientContext, Object>> funs) throws IOException { this.serverContext = serverContext; this.socket = socket; writer = new DataOutputStream(socket.getOutputStream()); reader = new DataInputStream(socket.getInputStream()); // 初始化能力 apis = new ArrayList<>(); for (Function<ClientContext, Object> fun : funs) { Object api = fun.apply(this); apis.add(api); } new ReadWoker().start(); }
getListener获取客户端能力,然服务端并没有客户端具体的实现对象,同样是通过代理方式实现远程的调用
/** * 返回客户端能力 * * @param <T> * @param clz */ public <T> T getListener(Class<T> clz) { Logger.d("返回能力"); // 检验客户端是否有这能力 InvocationHandler handler = new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 向客户端发送数据 Logger.d("向客户端发送消息"); long bid = System.currentTimeMillis();// 消息块id StringBuffer head = new StringBuffer("clz=" + clz.getName() + ";mth=" + method.getName() + ";type=" + method.getGenericReturnType() + ";length=" + args.length + ";bid=" + bid + ";"); Logger.d(head); if (args.length != 0) { head.append(Object2String(args)); } writer.writeUTF(head.toString()); writer.flush(); // 为了简化,客户端能力不返回数据 return null; } }; return (T) Proxy.newProxyInstance(clz.getClassLoader(), new Class[] { clz }, handler); }
最后在读取线程里,分发处理来自客户端不同的请求协议,调用服务端功能或者注册客户端能力
@Override public void run() { try { String line = null; while ((line = reader.readUTF()) != null) { Logger.d("接收指令"); String[] datas = line.split(";"); String cls = datas[0].split("=")[1]; if (datas.length >= 4)// 调用服务功能 { Class clz = Class.forName(cls); String mth = datas[1].split("=")[1]; String type = datas[2].split("=")[1]; int alen = Integer.parseInt(datas[3].split("=")[1]); long bid = Long.parseLong(datas[4].split("=")[1]); Logger.d("调用服务器功能:" + datas[0] + ";" + datas[1] + ";" + datas[2] + ";" + datas[3] + ";" + datas[3]); Object[] args = null; if (alen > 0) { args = String2Objects(datas[5], alen); } for (Object api : apis) { Class apz = api.getClass(); if (clz.isAssignableFrom(apz)) { try { Method ath = findMethod(apz, mth, type, alen); Object robj = ath.invoke(api, args); writer.writeUTF("bid=" + bid + ";" + Object2String(new Object[] { robj })); Logger.d("写入完毕!"); } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { e.printStackTrace(); // TODO 异常放入返回? } break; } } } else// 注册客户端功能 { long bid = Long.parseLong(datas[1].split("=")[1]); Logger.d("注册客服端能力:" + cls); } } } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); }finally { serverContext.getClientContexts().remove(ClientContext.this); } }
10.运行结构
运行demo服务端,运行多个demo客户端,实现简单聊天室功能
相关推荐
在这个场景中,你提到的是你自己编写了一个RPC框架,这是一项技术挑战,因为RPC框架涉及到网络通信、序列化、服务注册与发现等多个关键领域。 在你提供的博客链接中...
本篇将详细讲解如何使用socket、反射和序列化等技术来实现一个简单的RPC框架。 首先,让我们了解RPC的基本原理。在RPC模型中,客户端(Client)发起一个函数调用请求,这个请求包含了目标函数的名称和参数。RPC框架...
本教程将通过一个简单的Demo来讲解如何在自己的项目中利用Hadoop的RPC框架。 首先,理解RPC的基本概念是非常必要的。RPC允许一个程序调用另一个远程程序,就像调用本地函数一样,无需关心远程调用的细节。Hadoop的...
在实现过程中,你可能会遇到如网络编程、线程模型、协议设计、异常处理等问题,这些都是构建RPC框架时需要考虑的关键点。通过这个项目,你可以深入理解RPC的工作原理,同时提升对Netty框架的应用能力。
本篇将深入探讨RPC框架的底层模拟,主要围绕以下几个核心概念进行讨论: 1. **服务接口与实现**: 在`HelloService.java`中定义了服务接口,如`sayHello(String name)`方法,而`HelloServiceImpl.java`则是该接口...
在这个“JAVA实现简单RPC框架”的项目中,我们将探讨如何利用Java的核心特性来构建这样的框架。以下是关键知识点的详细说明: 1. **JDK动态代理**: JDK动态代理是Java提供的一种机制,可以在运行时创建一个实现了...
nfs-rpc框架很可能就是这样一种解决方案,它在处理大规模并发请求时具备高性能,适合于构建高可用、高并发的分布式应用。 高性能通常涉及到以下几个方面: 1. **并发处理**:nfs-rpc可能采用了多线程或者异步IO...
在Java开发中,使用JSON-RPC框架如`xincao9-jsonrpc-686fade`,开发者可以快速构建分布式服务。该框架通常会提供以下功能: 1. **自动序列化和反序列化**:将Java对象转换为JSON,以及将接收到的JSON数据还原为Java...
本篇将介绍如何从零开始构建一个简单的RPC框架,并关注其中的失败策略。 首先,RPC框架的核心在于客户端和服务端之间的通信。在这个例子中,框架是基于Netty实现的,Netty是一个高效的异步事件驱动的网络应用程序...
深入研究这个源码可以帮助我们理解RPC的工作原理,并且可能为开发自己的RPC框架提供灵感和实践基础。 总结来说,自定义实现RPC框架是一项涉及多方面技术的任务,包括网络编程、序列化、服务发现和错误处理等。通过...
总结以上,Java实现的RPC框架综合运用了JDK动态代理、NIO Socket通信、反射、注解、Protostuff序列化以及Zookeeper分布式协调服务,构建了一套高效、灵活的远程调用解决方案。通过理解这些核心技术,开发者可以更好...
在RPC框架中,Netty可以作为传输层,负责建立网络连接,接收和发送数据包,确保数据的可靠传输。 Zookeeper是一个分布式协调服务,它为分布式应用提供了高可用、高性能的配置管理、命名服务、分布式锁和组服务等。...
构建RPC框架是一个复杂的系统工程,涉及到的知识点和实现细节非常丰富。上述内容仅作为一个大致的概述,每一步背后都有大量的细节需要解决。实现一个高性能、高可用的RPC框架需要对网络通信、多线程编程、服务治理等...
轻量级RPC框架是指在不引入过多复杂性的情况下,设计用于快速开发和部署分布式应用的框架。在这个场景中,我们将讨论如何利用Zookeeper、Socket和Java动态代理来构建这样一个框架。 Zookeeper是Apache Hadoop的一个...
RPC(Remote Procedure Call)框架...总结,RPC框架是构建分布式系统的关键技术,QiuRPC作为一个开源的RPC框架,提供了学习和实践的机会。通过深入学习其源码,可以增强对RPC原理的理解,同时为解决实际问题提供思路。
通过上述技术,我们可以构建出一个基本的RPC框架。例如,在给定的LCRPCProgram中,可能包含了实现这些功能的源代码,如Netty的服务器和客户端实现,序列化工具类,服务注册与发现模块,以及异常处理和日志记录部分。...
这个项目虽然简单,但它涵盖了构建RPC框架的基本要素,对于理解RPC的工作原理和Java网络编程有很大的帮助。开发者可以通过这个项目学习如何从零开始实现一个简单的分布式服务框架,并进一步优化和扩展其功能。
综上所述,这个Java RPC框架利用了Netty的高性能网络通信能力,Zookeeper的分布式协调服务,以及Spring的依赖管理和企业级功能,构建了一个高效、灵活的服务治理解决方案。开发者可以利用这个框架轻松地实现服务间的...
本项目"适用于桌面开发的JSONRPC框架"提供了一个原型实现,旨在集成JSONRPC协议和JSON反序列化功能,帮助开发者快速构建桌面应用的通信基础设施。以下是对这个框架的关键知识点的详细解析: 1. **JSONRPC协议**:...
基于Dubbo实现的RPC框架,是Java开发中常见的一种高效率、高性能的服务治理方案,尤其在微服务架构中广泛应用。 Dubbo是由阿里巴巴开源的高性能RPC框架,它提供了服务注册、服务发现、负载均衡、流量控制、熔断降级...