基于Java的Socket API,我们能够实现一个简单的RPC调用,在这个例子中,包括了服务的接口及接口的远端实现,服务的消费者与远端的提供方。基于TCP协议所实现的RPC的类图,如下:
项目的目录结构如下:
1.首先编码服务端代码:
①定义接口
package com.bjsxt.tcp; public interface SayHelloService { /* * 问好的接口 * */ public String sayHello(String helloArg); }
②接口实现
package com.bjsxt.tcp; public class SayHelloServiceImpl implements SayHelloService{ public String sayHello(String helloArg) { if(helloArg.equals("hello")){ return "hello"; }else{ return "bye bye"; } } }
③定义消息体
package com.bjsxt.tcp; import java.io.Serializable; /** * 远程调用信息封装(包含.1.调用接口名称(包名+接口名) 2.调用方法名 3.调用参数Class类型数组) * @author 316311 * */ public class TransportMessage implements Serializable { //包名+接口名 private String interfaceName; //方法名 private String methodName; //参数类型 按照接口参数顺序 private Class[] parameterTypes; //参数 按照接口参数顺序 private Object[] parameters; public TransportMessage(){ super(); } public TransportMessage(String interfaceName, String methodName, Class[] parameterTypes, Object[] parameters){ this.interfaceName = interfaceName; this.methodName = methodName; this.parameterTypes = parameterTypes; this.parameters = parameters; } public String getInterfaceName() { return interfaceName; } public void setInterfaceName(String interfaceName) { this.interfaceName = interfaceName; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Class[] getParameterTypes() { return parameterTypes; } public void setParameterTypes(Class[] parameterTypes) { this.parameterTypes = parameterTypes; } public Object[] getParameters() { return parameters; } public void setParameters(Object[] parameters) { this.parameters = parameters; } }
④定义服务端代码,简单客户端请求
package com.bjsxt.tcp; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.ServerSocket; import java.net.Socket; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Provider { private int threadSize = 10; private ExecutorService threadPool; private Map<String, Object> servicePool; private int port = 4321; public Provider(){ super(); synchronized(this){ threadPool = Executors.newFixedThreadPool(this.threadSize); } } /** * * @param threadSize * 内部线程池的大小 * @param port * 当前TCP服务的端口号 */ public Provider(int threadSize, int port){ this.threadSize = threadSize; this.port = port; synchronized(this){ this.threadPool = Executors.newFixedThreadPool(this.threadSize); } } public Provider(int threadSize, int port, Map<String,Object> servicePool){ this.threadSize = threadSize; this.port = port; this.servicePool = servicePool; synchronized(this){ this.threadPool = Executors.newFixedThreadPool(this.threadSize); } } /** * RPC服务端处理函数 监听指定的TPC端口,每次有请求过来的时候调用服务,放入线程池中处理 */ public void service() throws IOException{ ServerSocket serverSocket = new ServerSocket(port); while(true){ System.out.println("Provider start...."); final Socket receiveSocket = serverSocket.accept(); System.out.println("Provider end..."); threadPool.execute(new Runnable() { @Override public void run() { try { process(receiveSocket); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (InstantiationException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IllegalAccessException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (NoSuchMethodException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (SecurityException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IllegalArgumentException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InvocationTargetException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } } /* * 调用服务 通过TCP Socket返回结果对象 */ public void process(Socket receiveSocket) throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException, NoSuchMethodException, SecurityException, IllegalArgumentException, InvocationTargetException{ ObjectInputStream objectInputStream = new ObjectInputStream(receiveSocket.getInputStream()); TransportMessage message = (TransportMessage)objectInputStream.readObject(); //调用服务 Object result = call(message); //返回结果 ObjectOutputStream objectOutputStream = new ObjectOutputStream(receiveSocket.getOutputStream()); objectOutputStream.writeObject(result); objectInputStream.close(); objectOutputStream.close(); } public Object call(TransportMessage message) throws ClassNotFoundException, InstantiationException, IllegalAccessException, NoSuchMethodException, SecurityException, IllegalArgumentException, InvocationTargetException{ //根据接口的全限定名 String interfaceName = message.getInterfaceName(); //从容器中获取服务对象 Object service = servicePool.get(interfaceName); System.out.println(service); Class<?> serviceClass = Class.forName(interfaceName); Method method = serviceClass.getMethod(message.getMethodName(), message.getParameterTypes()); Object result = method.invoke(service, message.getParameters()); return result; } }
2.编码客户端代码(客户端代码中接口定义和消息体与服务端代码一致,在此省略):
客户端代码如下:
package com.bjsxt.tcp; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.Method; import java.net.Socket; import java.net.UnknownHostException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Consumer { //服务端地址 private String serverAddress; //服务端端口 private int serverPort; //线程池大小 private int threadPoolSize = 10; //线程池 private ExecutorService executorService = null; public Consumer(){ } public Consumer(String serverAddress, int serverPort){ this.serverAddress = serverAddress; this.serverPort = serverPort; this.executorService = Executors.newFixedThreadPool(threadPoolSize); } /** * 同步的请求和接收结果 */ public Object sendAndReceive(TransportMessage transportMessage){ Object result = null; Socket socket = null; try { socket = new Socket(serverAddress, serverPort); //反序列化 TransportMessage对象 ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); objectOutputStream.writeObject(transportMessage); ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream()); //阻塞等待读取结果并返回序列化结果对象 result = objectInputStream.readObject(); objectOutputStream.close(); objectInputStream.close(); } catch (UnknownHostException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } finally{ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } return result; } }
3.测试:
①编码服务端测试类:
package com.bjsxt.test; import java.io.IOException; import java.util.HashMap; import java.util.Map; import com.bjsxt.tcp.Provider; import com.bjsxt.tcp.SayHelloServiceImpl; public class ServerTest { /** * 启动服务端服务 * @param args */ public static void main(String[] args){ Map<String,Object> servicePool = new HashMap<String, Object>(); // servicePool.put("com.bjsxt.tcp.SayHelloService", new SayHelloServiceImpl()); Provider server = new Provider(4, 4321, servicePool); try { server.service(); } catch (Exception e) { e.printStackTrace(); } } }
②编写客户端测试类:
package com.bjsxt.test; import com.bjsxt.tcp.Consumer; import com.bjsxt.tcp.TransportMessage; public class ClientTest { public static void main(String[] args) { String serverAddress = "127.0.0.1"; int serverPort = 4321; final Consumer client = new Consumer(serverAddress, serverPort); final TransportMessage transportMessage = buildTransportMessage(); for (int i = 0; i < 3; i++) { final int waitTime = i * 10; new Thread(new Runnable() { public void run() { Object result = client.sendAndReceive(transportMessage); System.out.println(result); } }).start(); } } private static TransportMessage buildTransportMessage() { String interfaceName = "com.bjsxt.tcp.SayHelloService"; Class[] paramsTypes = {String.class }; Object[] parameters = {"hello"}; String methodName = "sayHello"; TransportMessage transportMessage = new TransportMessage(interfaceName, methodName, paramsTypes, parameters); return transportMessage; } }
③先执行服务端,后执行客户端,输出结果如下:
相关推荐
基于TCP协议的二进制RPC通信协议的Java实现源码+项目说明.zip 一种基于TCP协议的二进制高性能RPC通信协议实现。它以Protobuf作为基本的数据交换格式,支持完全基于POJO的发布方式,极大的简化了开发复杂性 - 完全...
总结,Protobuf RPC是一种高效的RPC通信方式,它利用protobuf进行数据序列化,基于TCP协议保证传输的可靠性。在Java中实现Protobuf RPC涉及服务接口定义、客户端和服务端的实现、TCP连接管理等多个方面,适合于对...
总的来说,基于TCP协议的WCF调用简单程序涵盖了服务开发的基本流程,从服务端的设计、实现到客户端的调用,都是学习和掌握WCF技术的关键步骤。这种通信方式在企业级应用中非常常见,尤其是在需要高性能、高可靠性的...
《大型分布式网站架构设计与实践》主要介绍了大型...第1章主要介绍企业内部SOA(Service Oriented Architecture,即面向服务的体系结构)架构的实现,包括HTTP协议的工作原理,基于TCP协议和基于HTTP协议的RPC实现,如
在这个基于Socket的RPC实现中,`socketServer`扮演服务端的角色,它监听客户端的请求并处理这些请求;而`socketClient`则是客户端,它负责发送请求到服务器,并接收返回的结果。 1. **SocketServer的实现**: - ...
- **网络通信模块**:实现了基于TCP或HTTP的网络通信接口,用于发送和接收RPC请求。 - **示例代码**:提供了使用该库的示例,帮助开发者理解和使用库中的功能。 - **配置和构建文件**:包含了编译和链接库所需的...
Protobuf RPC是一种基于TCP协议的二进制高性能RPC通信协议实现。它以Protobuf作为基本的数据交换格式,支持完全基于POJO的发布方式,极大的简化了开发复杂性。 Features: 完全支持POJO方式发布,使用非常简单 内置...
本项目是一个基于Netty框架实现的RPC(远程过程调用)服务。Netty是一个高性能、异步事件驱动的网络应用框架,适用于快速开发可维护的高性能协议服务器和客户端。本项目利用Netty的强大功能,实现了RPC服务的客户端...
#####基于TCP协议的远程过程调用框架——客户端,服务端基于TCP协议实现的一套RFC(远程过程调用)框架。 客户端和服务端均分为业务层,协议层(JSON),网络层(libevent),可以根据自己的业务定制每一层的接口; ...
本项目则是在Swoole的基础上,构建了一个基于JSON协议的远程过程调用(RPC)框架。通过此框架,开发者可以轻松地实现服务间的通信,提高系统的可扩展性和解耦性。 【描述】 这个简易的JSON-RPC框架是作者自己的实践...
xrpc基于TCP的RPC框架RPC框架包含几大部分一,通讯1,协议:TCP和HTTP2的优劣TCP包较小,有更好的传输速率,但易用性没有HTTP2好HTTP2虽然会稍微占一些流量,但由于其具有一些重叠,头部压缩等新特性,速度应该并不...
在本文中,我们将探讨基于Socket的RPC实现以及后续的改进计划,包括升级到Netty作为底层通信库,以及引入Zookeeper作为注册中心。 首先,让我们深入了解基于Socket的RPC工作原理。在客户端,我们定义了一个接口,...
SpringBoot以其简洁、快速的特性,成为了现代Java开发中的首选框架,它使得构建基于RPC的服务变得非常简单。本教程将深入探讨如何在SpringBoot中实现RPC服务。 首先,理解RPC的基本概念是必要的。RPC允许服务提供者...
根据rpc协议的思想,使用python的协程gevent实现的一个基于tcp,只能python使用的rpc协议, 不能夸语言, 不过不需要写额外的比如protobuf协议。 通过把类实例化后put到一个自定义对象中,实现方法的注册。 客户端...
Apache Mina是一个开源项目,它提供了一个高度可扩展且高性能的网络通信框架,支持多种协议,如TCP、UDP等。 首先,理解Mina的核心概念是必要的。Mina基于事件驱动和异步模型,允许开发者创建高效的网络应用。其...
例如,可以使用HTTP/1.1或HTTP/2,结合RESTful API设计原则,实现基于HTTP的RPC通信。Java的HttpURLConnection类和HttpClient库(如Apache HttpClient)可以方便地处理HTTP请求和响应。 综上所述,构建一个Java实现...
这里给出一个简单的RPC实现示例,使用Python的socket库: ```python # 服务器端 import socket def remote_service(data): return "Service result: " + data s = socket.socket(socket.AF_INET, socket.SOCK_...
TCP连接使用策略是基于TCP连接最近使用时间来判断的 关于Protocol 目前实现的协议有java自带的二进制协议和hessian协议 协议是可扩展的 关于传输协议 该框架使用自定义协议,头四个字节表示数总长度,第五个字节表示...
自定义协议是指根据特定需求设计的通信协议,它可能基于TCP、UDP或者其他传输层协议。在Netty中,你可以通过定义ByteToMessageDecoder和MessageToByteEncoder等编解码器来处理自定义协议的数据帧。这些编解码器允许...