`
weigang.gao
  • 浏览: 493602 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

基于TCP协议实现RPC

 
阅读更多

基于Java的Socket API,我们能够实现一个简单的RPC调用,在这个例子中,包括了服务的接口及接口的远端实现,服务的消费者与远端的提供方。基于TCP协议所实现的RPC的类图,如下:


项目的目录结构如下:


 

1.首先编码服务端代码:

①定义接口

package com.bjsxt.tcp;

public interface SayHelloService {

	/*
	 * 问好的接口
	 * 
	 */
	public String sayHello(String helloArg);
}

②接口实现

package com.bjsxt.tcp;

public class SayHelloServiceImpl implements SayHelloService{

	public String sayHello(String helloArg) {
		if(helloArg.equals("hello")){
			return "hello";
		}else{
			return "bye bye";
		}
	}

}

③定义消息体

package com.bjsxt.tcp;

import java.io.Serializable;

/**
 * 远程调用信息封装(包含.1.调用接口名称(包名+接口名) 2.调用方法名 3.调用参数Class类型数组)
 * @author 316311
 *
 */
public class TransportMessage implements Serializable {
	
	//包名+接口名
	private String interfaceName;
	
	//方法名
	private String methodName;
	
	//参数类型 按照接口参数顺序
	private Class[] parameterTypes;
	
	//参数 按照接口参数顺序
	private Object[] parameters;
	
	
	public TransportMessage(){
		super();
	}
	
	public TransportMessage(String interfaceName, String methodName, Class[] parameterTypes,
			Object[] parameters){
		this.interfaceName = interfaceName;
		
		this.methodName = methodName;
		
		this.parameterTypes = parameterTypes;
		
		this.parameters = parameters;
	}

	public String getInterfaceName() {
		return interfaceName;
	}

	public void setInterfaceName(String interfaceName) {
		this.interfaceName = interfaceName;
	}

	public String getMethodName() {
		return methodName;
	}

	public void setMethodName(String methodName) {
		this.methodName = methodName;
	}

	public Class[] getParameterTypes() {
		return parameterTypes;
	}

	public void setParameterTypes(Class[] parameterTypes) {
		this.parameterTypes = parameterTypes;
	}

	public Object[] getParameters() {
		return parameters;
	}

	public void setParameters(Object[] parameters) {
		this.parameters = parameters;
	}

}

④定义服务端代码,简单客户端请求

package com.bjsxt.tcp;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Provider {
	
	private int threadSize = 10;
	
	private ExecutorService threadPool;
	
	private Map<String, Object> servicePool;
	
	private int port = 4321;
	
	public Provider(){
		super();
		synchronized(this){
			threadPool = Executors.newFixedThreadPool(this.threadSize);
		}
	}
	
	/**
	 * 
	 * @param threadSize
	 *        内部线程池的大小
	 * @param port
	 *        当前TCP服务的端口号
	 */
	public Provider(int threadSize, int port){
		this.threadSize = threadSize;
		
		this.port = port;
		
		synchronized(this){
			this.threadPool = Executors.newFixedThreadPool(this.threadSize);
		}
	}
	
	
	public Provider(int threadSize, int port, Map<String,Object> servicePool){
		this.threadSize = threadSize;
		
		this.port =  port;
		
		this.servicePool = servicePool;
		
		synchronized(this){
			this.threadPool = Executors.newFixedThreadPool(this.threadSize);
		}
		
	}
	
    /**
     * RPC服务端处理函数 监听指定的TPC端口,每次有请求过来的时候调用服务,放入线程池中处理	
     */
    public void service() throws IOException{
    	ServerSocket serverSocket = new ServerSocket(port);
    	
    	while(true){
    		System.out.println("Provider start....");
    		final Socket receiveSocket = serverSocket.accept();
    		System.out.println("Provider end...");
    		threadPool.execute(new Runnable() {
				
				@Override
				public void run() {
					try {
						process(receiveSocket);
					} catch (ClassNotFoundException e) {
						e.printStackTrace();
					} catch (InstantiationException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					} catch (IllegalAccessException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					} catch (NoSuchMethodException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					} catch (SecurityException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					} catch (IllegalArgumentException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					} catch (InvocationTargetException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					} catch (IOException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					
				}
			});
    	}
    }
    
    /*
     * 调用服务 通过TCP Socket返回结果对象
     */
    public void process(Socket receiveSocket) throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException, NoSuchMethodException, SecurityException, IllegalArgumentException, InvocationTargetException{
    	ObjectInputStream objectInputStream = new ObjectInputStream(receiveSocket.getInputStream());
    	
    	TransportMessage message = (TransportMessage)objectInputStream.readObject();
    	
    	//调用服务
    	Object result = call(message);
    	
    	//返回结果
    	ObjectOutputStream objectOutputStream = new ObjectOutputStream(receiveSocket.getOutputStream());
        
    	objectOutputStream.writeObject(result);
    	
    	objectInputStream.close();
    	objectOutputStream.close();
    }
    
    public Object call(TransportMessage message) throws ClassNotFoundException, InstantiationException, IllegalAccessException, NoSuchMethodException, SecurityException, IllegalArgumentException, InvocationTargetException{
    	
    	//根据接口的全限定名
    	String interfaceName = message.getInterfaceName();
    	
    	//从容器中获取服务对象
    	Object service = servicePool.get(interfaceName);
    	System.out.println(service);
    	
    	Class<?> serviceClass = Class.forName(interfaceName);
    	
    	Method method = serviceClass.getMethod(message.getMethodName(), message.getParameterTypes());
    	
    	Object result = method.invoke(service, message.getParameters());
    	
    	return result;
    }
	
}

 

2.编码客户端代码(客户端代码中接口定义和消息体与服务端代码一致,在此省略):

客户端代码如下:

package com.bjsxt.tcp;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Consumer {
	
	//服务端地址
	private String serverAddress;
	
	//服务端端口
	private int serverPort;
	
	//线程池大小
	private int threadPoolSize = 10;
	
	//线程池
	private ExecutorService executorService = null;
	
	public Consumer(){
		
	}
	
	public Consumer(String serverAddress, int serverPort){
		
		this.serverAddress = serverAddress;
		
		this.serverPort = serverPort;
		
		this.executorService = Executors.newFixedThreadPool(threadPoolSize);
	}
	
	/**
	 * 同步的请求和接收结果
	 */
	public Object sendAndReceive(TransportMessage transportMessage){
		
		Object result = null;
		
		Socket socket = null;
		
		try {
			socket = new Socket(serverAddress, serverPort);
			
			//反序列化 TransportMessage对象
			ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
			
			objectOutputStream.writeObject(transportMessage);
			
			ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
			
			//阻塞等待读取结果并返回序列化结果对象
			result = objectInputStream.readObject();
			
			objectOutputStream.close();
			objectInputStream.close();
			
		} catch (UnknownHostException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		} finally{
			
			try {
				socket.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		
		return result;
	}
}

 

3.测试:

①编码服务端测试类:

package com.bjsxt.test;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import com.bjsxt.tcp.Provider;
import com.bjsxt.tcp.SayHelloServiceImpl;

public class ServerTest {

    
   /**
    * 启动服务端服务
    * @param args
    */
    public static void main(String[] args){

        Map<String,Object> servicePool = new  HashMap<String, Object>();
        //
        servicePool.put("com.bjsxt.tcp.SayHelloService", new SayHelloServiceImpl());

        Provider server = new Provider(4, 4321, servicePool);

        try {

            server.service();

        } catch (Exception e) {

            e.printStackTrace();

        }

         

    }
 
}

 ②编写客户端测试类:

package com.bjsxt.test;

import com.bjsxt.tcp.Consumer;
import com.bjsxt.tcp.TransportMessage;

public class ClientTest {

    public static void main(String[] args) {

        String serverAddress = "127.0.0.1";

        int serverPort = 4321;

         

        final Consumer client = new Consumer(serverAddress, serverPort);

        final TransportMessage transportMessage = buildTransportMessage();

         

        for (int i = 0; i < 3; i++) {

            final int waitTime = i * 10;

            new Thread(new Runnable() {

                public void run() {

                    Object result = client.sendAndReceive(transportMessage);

                    System.out.println(result);

                }

            }).start();

        }

    }
 

    private static TransportMessage buildTransportMessage() {
 

        String interfaceName = "com.bjsxt.tcp.SayHelloService";

        Class[] paramsTypes = {String.class };

        Object[] parameters = {"hello"};

        String methodName = "sayHello";
 

        TransportMessage transportMessage = new TransportMessage(interfaceName,
                methodName, paramsTypes, parameters);
 

        return transportMessage;

    }
 
}

 ③先执行服务端,后执行客户端,输出结果如下:


 

 

  • 大小: 10.9 KB
  • 大小: 14.7 KB
  • 大小: 3.8 KB
分享到:
评论

相关推荐

    基于TCP协议的二进制RPC通信协议的Java实现源码+项目说明.zip

    基于TCP协议的二进制RPC通信协议的Java实现源码+项目说明.zip 一种基于TCP协议的二进制高性能RPC通信协议实现。它以Protobuf作为基本的数据交换格式,支持完全基于POJO的发布方式,极大的简化了开发复杂性 - 完全...

    Protobuf RPC是一种基于TCP协议的二进制RPC通信协议的Java实现.zip

    总结,Protobuf RPC是一种高效的RPC通信方式,它利用protobuf进行数据序列化,基于TCP协议保证传输的可靠性。在Java中实现Protobuf RPC涉及服务接口定义、客户端和服务端的实现、TCP连接管理等多个方面,适合于对...

    基于TCP协议的WCF调用简单程序

    总的来说,基于TCP协议的WCF调用简单程序涵盖了服务开发的基本流程,从服务端的设计、实现到客户端的调用,都是学习和掌握WCF技术的关键步骤。这种通信方式在企业级应用中非常常见,尤其是在需要高性能、高可靠性的...

    大型分布式网站架构设计与实践.带目录书签.完整版.rar

    《大型分布式网站架构设计与实践》主要介绍了大型...第1章主要介绍企业内部SOA(Service Oriented Architecture,即面向服务的体系结构)架构的实现,包括HTTP协议的工作原理,基于TCP协议和基于HTTP协议的RPC实现,如

    基于socket实现的rpc调用demo

    在这个基于Socket的RPC实现中,`socketServer`扮演服务端的角色,它监听客户端的请求并处理这些请求;而`socketClient`则是客户端,它负责发送请求到服务器,并接收返回的结果。 1. **SocketServer的实现**: - ...

    rpc远程调用库C语言实现

    - **网络通信模块**:实现了基于TCP或HTTP的网络通信接口,用于发送和接收RPC请求。 - **示例代码**:提供了使用该库的示例,帮助开发者理解和使用库中的功能。 - **配置和构建文件**:包含了编译和链接库所需的...

    Jprotobuf-rpc-socket:Protobuf RPC是一种基于TCP协议的二进制RPC通信协议的Java实现

    Protobuf RPC是一种基于TCP协议的二进制高性能RPC通信协议实现。它以Protobuf作为基本的数据交换格式,支持完全基于POJO的发布方式,极大的简化了开发复杂性。 Features: 完全支持POJO方式发布,使用非常简单 内置...

    基于Netty框架的RPC服务.zip

    本项目是一个基于Netty框架实现的RPC(远程过程调用)服务。Netty是一个高性能、异步事件驱动的网络应用框架,适用于快速开发可维护的高性能协议服务器和客户端。本项目利用Netty的强大功能,实现了RPC服务的客户端...

    RPC_Framework:基于TCP协议的远程过程调用框架客户端实现

    #####基于TCP协议的远程过程调用框架——客户端,服务端基于TCP协议实现的一套RFC(远程过程调用)框架。 客户端和服务端均分为业务层,协议层(JSON),网络层(libevent),可以根据自己的业务定制每一层的接口; ...

    基于php扩展swoole封装的一个简易的JSON协议的RPC框架

    本项目则是在Swoole的基础上,构建了一个基于JSON协议的远程过程调用(RPC)框架。通过此框架,开发者可以轻松地实现服务间的通信,提高系统的可扩展性和解耦性。 【描述】 这个简易的JSON-RPC框架是作者自己的实践...

    xrpc:基于TCP的RPC框架

    xrpc基于TCP的RPC框架RPC框架包含几大部分一,通讯1,协议:TCP和HTTP2的优劣TCP包较小,有更好的传输速率,但易用性没有HTTP2好HTTP2虽然会稍微占一些流量,但由于其具有一些重叠,头部压缩等新特性,速度应该并不...

    基于socket的rpc

    在本文中,我们将探讨基于Socket的RPC实现以及后续的改进计划,包括升级到Netty作为底层通信库,以及引入Zookeeper作为注册中心。 首先,让我们深入了解基于Socket的RPC工作原理。在客户端,我们定义了一个接口,...

    基于springboot的rpc服务

    SpringBoot以其简洁、快速的特性,成为了现代Java开发中的首选框架,它使得构建基于RPC的服务变得非常简单。本教程将深入探讨如何在SpringBoot中实现RPC服务。 首先,理解RPC的基本概念是必要的。RPC允许服务提供者...

    RPC-使用python代码基于gevent实现。

    根据rpc协议的思想,使用python的协程gevent实现的一个基于tcp,只能python使用的rpc协议, 不能夸语言, 不过不需要写额外的比如protobuf协议。 通过把类实例化后put到一个自定义对象中,实现方法的注册。 客户端...

    Mina实现RPC的例子

    Apache Mina是一个开源项目,它提供了一个高度可扩展且高性能的网络通信框架,支持多种协议,如TCP、UDP等。 首先,理解Mina的核心概念是必要的。Mina基于事件驱动和异步模型,允许开发者创建高效的网络应用。其...

    java实现rpc框架

    例如,可以使用HTTP/1.1或HTTP/2,结合RESTful API设计原则,实现基于HTTP的RPC通信。Java的HttpURLConnection类和HttpClient库(如Apache HttpClient)可以方便地处理HTTP请求和响应。 综上所述,构建一个Java实现...

    rpc原理的简单实现

    这里给出一个简单的RPC实现示例,使用Python的socket库: ```python # 服务器端 import socket def remote_service(data): return "Service result: " + data s = socket.socket(socket.AF_INET, socket.SOCK_...

    tocol:提供TCP复用的RPC调用

    TCP连接使用策略是基于TCP连接最近使用时间来判断的 关于Protocol 目前实现的协议有java自带的二进制协议和hessian协议 协议是可扩展的 关于传输协议 该框架使用自定义协议,头四个字节表示数总长度,第五个字节表示...

    《netty实战》http协议、自定义协议、自定义RPC模块学习源码.zip

    自定义协议是指根据特定需求设计的通信协议,它可能基于TCP、UDP或者其他传输层协议。在Netty中,你可以通过定义ByteToMessageDecoder和MessageToByteEncoder等编解码器来处理自定义协议的数据帧。这些编解码器允许...

Global site tag (gtag.js) - Google Analytics