在看hadoop的源代码的时候,看到hadoop实现了一个自定义的RPC,于是有了自己写代码实现RPC的想法。
RPC的全名Remote Process Call,即远程过程调用。使用RPC,可以像使用本地的程序一样使用远程服务器上的程序。下面是一个简单的RPC 调用实例,从中可以看到RPC如何使用以及好处:
- public class MainClient {
- public static void main(String[] args) {
- Echo echo = RPC.getProxy(Echo.class, "127.0.0.1", 20382);
- System.out.println(echo.echo("hello,hello"));
- }
- }
public class MainClient {
public static void main(String[] args) {
Echo echo = RPC.getProxy(Echo.class, "127.0.0.1", 20382);
System.out.println(echo.echo("hello,hello"));
}
}
- public interface Echo {
- public String echo(String string);
- }
public interface Echo {
public String echo(String string);
}
使用RPC.getProxy生成接口Echo的代理实现类。然后就可以像使用本地的程序一样来调用Echo中的echo方法。
使用RPC的好处是简化了远程服务访问。提高了开发效率。在分发代码时,只需要将接口分发给客户端使用,在客户端看来只有接口,没有具体类实现。这样保证了代码的可扩展性和安全性。
在看了RPCClient如何使用,我们再来定义一个RPC服务器的接口,看看服务器都提供什么操作:
- public interface Server {
- public void stop();
- public void start();
- public void register(Class interfaceDefiner,Class impl);
- public void call(Invocation invo);
- public boolean isRunning();
- public int getPort();
- }
public interface Server {
public void stop();
public void start();
public void register(Class interfaceDefiner,Class impl);
public void call(Invocation invo);
public boolean isRunning();
public int getPort();
}
服务器提供了start和stop方法。使用register注册一个接口和对应的实现类。call方法用于执行Invocation指定的接口的方法名。isRunning返回了服务器的状态,getPort()则返回了服务器使用的端口。
来看看Invocation的定义:
- public class Invocation implements Serializable{
- /**
- *
- */
- private static final long serialVersionUID = 1L;
- private Class interfaces;
- private Method method;
- private Object[] params;
- private Object result;
- /**
- * @return the result
- */
- public Object getResult() {
- return result;
- }
- /**
- * @param result the result to set
- */
- public void setResult(Object result) {
- this.result = result;
- }
- /**
- * @return the interfaces
- */
- public Class getInterfaces() {
- return interfaces;
- }
- /**
- * @param interfaces the interfaces to set
- */
- public void setInterfaces(Class interfaces) {
- this.interfaces = interfaces;
- }
- /**
- * @return the method
- */
- public Method getMethod() {
- return method;
- }
- /**
- * @param method the method to set
- */
- public void setMethod(Method method) {
- this.method = method;
- }
- /**
- * @return the params
- */
- public Object[] getParams() {
- return params;
- }
- /**
- * @param params the params to set
- */
- public void setParams(Object[] params) {
- this.params = params;
- }
- @Override
- public String toString() {
- return interfaces.getName()+"."+method.getMethodName()+"("+Arrays.toString(params)+")";
- }
- }
public class Invocation implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private Class interfaces;
private Method method;
private Object[] params;
private Object result;
/**
* @return the result
*/
public Object getResult() {
return result;
}
/**
* @param result the result to set
*/
public void setResult(Object result) {
this.result = result;
}
/**
* @return the interfaces
*/
public Class getInterfaces() {
return interfaces;
}
/**
* @param interfaces the interfaces to set
*/
public void setInterfaces(Class interfaces) {
this.interfaces = interfaces;
}
/**
* @return the method
*/
public Method getMethod() {
return method;
}
/**
* @param method the method to set
*/
public void setMethod(Method method) {
this.method = method;
}
/**
* @return the params
*/
public Object[] getParams() {
return params;
}
/**
* @param params the params to set
*/
public void setParams(Object[] params) {
this.params = params;
}
@Override
public String toString() {
return interfaces.getName()+"."+method.getMethodName()+"("+Arrays.toString(params)+")";
}
}
具体服务器实现类中的call方法是这样使用Invocation的:
- @Override
- public void call(Invocation invo) {
- Object obj = serviceEngine.get(invo.getInterfaces().getName()); //根据接口名,找到对应的处理类
- if(obj!=null) {
- try {
- Method m = obj.getClass().getMethod(invo.getMethod().getMethodName(), invo.getMethod().getParams());
- Object result = m.invoke(obj, invo.getParams());
- invo.setResult(result);
- } catch (Throwable th) {
- th.printStackTrace();
- }
- } else {
- throw new IllegalArgumentException("has no these class");
- }
- }
@Override
public void call(Invocation invo) {
Object obj = serviceEngine.get(invo.getInterfaces().getName()); //根据接口名,找到对应的处理类
if(obj!=null) {
try {
Method m = obj.getClass().getMethod(invo.getMethod().getMethodName(), invo.getMethod().getParams());
Object result = m.invoke(obj, invo.getParams());
invo.setResult(result);
} catch (Throwable th) {
th.printStackTrace();
}
} else {
throw new IllegalArgumentException("has no these class");
}
}
下面来看服务器接收连接并处理连接请求的核心代码:
- public class Listener extends Thread {
- private ServerSocket socket;
- private Server server;
- public Listener(Server server) {
- this.server = server;
- }
- @Override
- public void run() {
- System.out.println("启动服务器中,打开端口" + server.getPort());
- try {
- socket = new ServerSocket(server.getPort());
- } catch (IOException e1) {
- e1.printStackTrace();
- return;
- }
- while (server.isRunning()) {
- try {
- Socket client = socket.accept();
- ObjectInputStream ois = new ObjectInputStream(client.getInputStream());
- Invocation invo = (Invocation) ois.readObject();
- server.call(invo);
- ObjectOutputStream oos = new ObjectOutputStream(client.getOutputStream());
- oos.writeObject(invo);
- oos.flush();
- oos.close();
- ois.close();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- try {
- if (socket != null && !socket.isClosed())
- socket.close();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
public class Listener extends Thread {
private ServerSocket socket;
private Server server;
public Listener(Server server) {
this.server = server;
}
@Override
public void run() {
System.out.println("启动服务器中,打开端口" + server.getPort());
try {
socket = new ServerSocket(server.getPort());
} catch (IOException e1) {
e1.printStackTrace();
return;
}
while (server.isRunning()) {
try {
Socket client = socket.accept();
ObjectInputStream ois = new ObjectInputStream(client.getInputStream());
Invocation invo = (Invocation) ois.readObject();
server.call(invo);
ObjectOutputStream oos = new ObjectOutputStream(client.getOutputStream());
oos.writeObject(invo);
oos.flush();
oos.close();
ois.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
try {
if (socket != null && !socket.isClosed())
socket.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
RPC具体的Server类是这样来使用Listener的:
- public static class RPCServer implements Server{
- private int port = 20382;
- private Listener listener;
- private boolean isRuning = true;
- /**
- * @param isRuning the isRuning to set
- */
- public void setRuning(boolean isRuning) {
- this.isRuning = isRuning;
- }
- /**
- * @return the port
- */
- public int getPort() {
- return port;
- }
- /**
- * @param port the port to set
- */
- public void setPort(int port) {
- this.port = port;
- }
- private Map<String ,Object> serviceEngine = new HashMap<String, Object>();
- @Override
- public void call(Invocation invo) {
- System.out.println(invo.getClass().getName());
- Object obj = serviceEngine.get(invo.getInterfaces().getName());
- if(obj!=null) {
- try {
- Method m = obj.getClass().getMethod(invo.getMethod().getMethodName(), invo.getMethod().getParams());
- Object result = m.invoke(obj, invo.getParams());
- invo.setResult(result);
- } catch (Throwable th) {
- th.printStackTrace();
- }
- } else {
- throw new IllegalArgumentException("has no these class");
- }
- }
- @Override
- public void register(Class interfaceDefiner, Class impl) {
- try {
- this.serviceEngine.put(interfaceDefiner.getName(), impl.newInstance());
- System.out.println(serviceEngine);
- } catch (Throwable e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- @Override
- public void start() {
- System.out.println("启动服务器");
- listener = new Listener(this);
- this.isRuning = true;
- listener.start();
- }
- @Override
- public void stop() {
- this.setRuning(false);
- }
- @Override
- public boolean isRunning() {
- return isRuning;
- }
- }
public static class RPCServer implements Server{
private int port = 20382;
private Listener listener;
private boolean isRuning = true;
/**
* @param isRuning the isRuning to set
*/
public void setRuning(boolean isRuning) {
this.isRuning = isRuning;
}
/**
* @return the port
*/
public int getPort() {
return port;
}
/**
* @param port the port to set
*/
public void setPort(int port) {
this.port = port;
}
private Map<String ,Object> serviceEngine = new HashMap<String, Object>();
@Override
public void call(Invocation invo) {
System.out.println(invo.getClass().getName());
Object obj = serviceEngine.get(invo.getInterfaces().getName());
if(obj!=null) {
try {
Method m = obj.getClass().getMethod(invo.getMethod().getMethodName(), invo.getMethod().getParams());
Object result = m.invoke(obj, invo.getParams());
invo.setResult(result);
} catch (Throwable th) {
th.printStackTrace();
}
} else {
throw new IllegalArgumentException("has no these class");
}
}
@Override
public void register(Class interfaceDefiner, Class impl) {
try {
this.serviceEngine.put(interfaceDefiner.getName(), impl.newInstance());
System.out.println(serviceEngine);
} catch (Throwable e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void start() {
System.out.println("启动服务器");
listener = new Listener(this);
this.isRuning = true;
listener.start();
}
@Override
public void stop() {
this.setRuning(false);
}
@Override
public boolean isRunning() {
return isRuning;
}
}
服务器端代码搞定后,来看看客户端的代码,先看看我们刚开始使用RPC.getProxy方法:
- public static <T> T getProxy(final Class<T> clazz,String host,int port) {
- final Client client = new Client(host,port);
- InvocationHandler handler = new InvocationHandler() {
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- Invocation invo = new Invocation();
- invo.setInterfaces(clazz);
- invo.setMethod(new org.jy.rpc.protocal.Method(method.getName(),method.getParameterTypes()));
- invo.setParams(args);
- client.invoke(invo);
- return invo.getResult();
- }
- };
- T t = (T) Proxy.newProxyInstance(RPC.class.getClassLoader(), new Class[] {clazz}, handler);
- return t;
- }
public static <T> T getProxy(final Class<T> clazz,String host,int port) {
final Client client = new Client(host,port);
InvocationHandler handler = new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Invocation invo = new Invocation();
invo.setInterfaces(clazz);
invo.setMethod(new org.jy.rpc.protocal.Method(method.getName(),method.getParameterTypes()));
invo.setParams(args);
client.invoke(invo);
return invo.getResult();
}
};
T t = (T) Proxy.newProxyInstance(RPC.class.getClassLoader(), new Class[] {clazz}, handler);
return t;
}
Client类的代码如下:
- public class Client {
- private String host;
- private int port;
- private Socket socket;
- private ObjectOutputStream oos;
- private ObjectInputStream ois;
- public String getHost() {
- return host;
- }
- public void setHost(String host) {
- this.host = host;
- }
- public int getPort() {
- return port;
- }
- public void setPort(int port) {
- this.port = port;
- }
- public Client(String host, int port) {
- this.host = host;
- this.port = port;
- }
- public void init() throws UnknownHostException, IOException {
- socket = new Socket(host, port);
- oos = new ObjectOutputStream(socket.getOutputStream());
- }
- public void invoke(Invocation invo) throws UnknownHostException, IOException, ClassNotFoundException {
- init();
- System.out.println("写入数据");
- oos.writeObject(invo);
- oos.flush();
- ois = new ObjectInputStream(socket.getInputStream());
- Invocation result = (Invocation) ois.readObject();
- invo.setResult(result.getResult());
- }
- }
public class Client {
private String host;
private int port;
private Socket socket;
private ObjectOutputStream oos;
private ObjectInputStream ois;
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public Client(String host, int port) {
this.host = host;
this.port = port;
}
public void init() throws UnknownHostException, IOException {
socket = new Socket(host, port);
oos = new ObjectOutputStream(socket.getOutputStream());
}
public void invoke(Invocation invo) throws UnknownHostException, IOException, ClassNotFoundException {
init();
System.out.println("写入数据");
oos.writeObject(invo);
oos.flush();
ois = new ObjectInputStream(socket.getInputStream());
Invocation result = (Invocation) ois.readObject();
invo.setResult(result.getResult());
}
}
至此,RPC的客户端和服务器端代码完成,启动服务器的代码如下:
- public class Main {
- public static void main(String[] args) {
- Server server = new RPC.RPCServer();
- server.register(Echo.class, RemoteEcho.class);
- server.start();
- }
- }
public class Main {
public static void main(String[] args) {
Server server = new RPC.RPCServer();
server.register(Echo.class, RemoteEcho.class);
server.start();
}
}
现在先运行服务器端代码,再运行客户端代码,就可以成功运行。
详细的代码,参考附件的源代码。
在写这个RPC时,没有想太多。在数据串行化上,使用了java的标准io序列化机制,虽然不能跨平台,但是做DEMO还是不错的;另外在处理客户端请求上,使用了ServerSocket,而没有使用ServerSocketChannel这个java nio中的新特性;在动态生成接口的实现类上,使用了java.lang.reflet中的Proxy类。他可以动态创建接口的实现类。
相关推荐
### 自定义RPC的Java实现详解 #### 一、引言 在现代软件开发中,分布式系统变得越来越普遍,其中远程过程调用(RPC)技术因其简单高效的特点而在跨网络节点间进行服务通信方面发挥着重要作用。本文将详细介绍如何...
本项目是基于Netty 4.0实现的自定义RPC通信框架,旨在为Java开发者提供一种高效、灵活的远程服务调用解决方案。 首先,我们来深入理解Netty的基本概念。Netty的核心是其NIO(非阻塞I/O)模型,它使用了Reactor模式...
本项目是一个基于Java、Etcd和Vert.x框架实现的自定义RPC(远程过程调用)框架。开发者可以通过引入Spring Boot Starter,使用注解和配置文件快速使用该框架,实现像调用本地方法一样轻松调用远程服务。框架支持通过...
本项目"Netty实现自定义RPC"旨在教你如何利用Netty构建自己的RPC框架。 首先,我们需要理解RPC的基本原理。RPC的核心是将远程服务调用的过程透明化,使它看起来就像是本地方法调用一样。这涉及到序列化、网络通信和...
本项目基于Java、Netty和Zookeeper实现了一个自定义的RPC框架,这三个组件各自在RPC框架中扮演着关键角色。 Java作为编程语言,提供了丰富的类库和API,使得开发高效稳定的服务器端应用成为可能。Netty则是一个高...
在这个“JAVA实现简单RPC框架”的项目中,我们将探讨如何利用Java的核心特性来构建这样的框架。以下是关键知识点的详细说明: 1. **JDK动态代理**: JDK动态代理是Java提供的一种机制,可以在运行时创建一个实现了...
在Netty中,实现自定义RPC模块通常包括以下步骤: 1. 定义服务接口:这是RPC调用的核心,包含客户端需要调用的方法。 2. 服务端实现:服务提供者实现这个接口,并暴露服务。 3. 编码与解码:为了通过网络传输,我们...
guide-rpc-framework ...通过这个简易的轮子,你可以学到RPC的替代原理和原理以及各种Java编码实践的运用。 你甚至可以把当做你的毕设/项目经验的选择,这是非常不错!对比其他求职者的项目经验都是各种系统,造轮子
Java实现RPC(Remote Procedure Call)框架是一个复杂而深入的话题,涉及到多个关键的技术点。RPC框架允许程序通过网络调用远程机器上的方法,就像调用本地方法一样,极大地简化了分布式系统开发。以下将详细讲解在...
实现自定义RPC的关键步骤包括: 1. **序列化与反序列化**:选择合适的序列化工具,如JSON、Protobuf或Hessian,将Java对象转换为字节流在网络中传输。 2. **编解码器**:实现自定义的编码器和解码器,将网络消息转化...
Phprpc是一个跨语言的远程过程调用(RPC)框架,它允许PHP和Java应用程序之间进行无缝通信。在本实例中,我们将深入探讨如何在Java环境中设置和使用Phprpc服务器,以便理解其核心概念和操作流程。 1. **Phprpc简介*...
guide-rpc-framework 是一款基于 Netty+Kyro+Zookeeper 实现的 RPC 框架。代码注释详细,结构清晰,并且集成了 Check Style 规范代码结构,非常适合阅读和学习。
在Java中实现一个简单的RPC框架,我们需要理解以下几个关键概念和技术: 1. **网络通信**:RPC的核心是通过网络进行通信。在Java中,我们可以使用Socket API来实现客户端和服务器端之间的数据传输。Socket提供了低...
2. "JSON-RPC.jar":这是编译后的Java库文件,包含了JSON-RPC for Java的核心组件,可以直接在Java项目中引用以实现JSON-RPC通信。 3. "JsonRpcClient.js":这可能是JavaScript版本的客户端库,用于在浏览器端或Node...
Java语言游戏项目实战资源包 内容概览: 这次分享为你带来了丰富的Java语言游戏项目实战资源,让你在实践中深入掌握Java语言,并开启游戏开发之旅。资源包中包括: 游戏项目代码:精心挑选了多个经典的小游戏项目...
通过上述知识点,我们可以构建一个基本的自定义RPC框架。这个框架不仅能够实现服务间的远程调用,还具有一定的扩展性和稳定性,可以满足不同分布式系统的需要。当然,实际的RPC框架可能还需要考虑更多的细节,如...
在JAVA中实现RPC,我们需要理解以下几个关键知识点: 1. **远程调用原理**:RPC的核心思想是让客户端可以像调用本地方法一样调用远程服务器上的方法,中间过程对用户透明。它通过网络协议将请求发送到远程服务,...