`
xulang
  • 浏览: 4062 次
文章分类
社区版块
存档分类
最新评论

基于Netty的异步Rpc调用的小框架

阅读更多

基于netty写的一个异步Rpc调用小框架,欢迎拍砖,新手。

 

客户端与服务端通信的类

package cc.ymsoft.Framework;

import java.io.Serializable;

@SuppressWarnings("serial")
public class MethodAndArgs implements Serializable{
	private String methodName;//调用的方法名称
	private Class<?>[] types;//参数类型
	private Object[] objects;//参数列表
	public String getMethodName() {
		return methodName;
	}
	public void setMethodName(String methodName) {
		this.methodName = methodName;
	}
	public Class<?>[] getTypes() {
		return types;
	}
	public void setTypes(Class<?>[] types) {
		this.types = types;
	}
	public Object[] getObjects() {
		return objects;
	}
	public void setObjects(Object[] objects) {
		this.objects = objects;
	}
	public MethodAndArgs() {
		super();
		// TODO Auto-generated constructor stub
	}
	public MethodAndArgs(String methodName, Class<?>[] types, Object[] objects) {
		
		this.methodName = methodName;
		this.types = types;
		this.objects = objects;
	}
	
	
}

 

 框架类,有两个静态方法,regist(在服务器上注册服务)和getobjt(获得接口的代理类)

/**
 * @author xulang
 */
package cc.ymsoft.Framework;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.HashMap;
import java.util.Map;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
/**
 * 服务端处理
 * @author hadoop
 *
 */
 class TcpServerHandler extends ChannelInboundHandlerAdapter {
    
    private Object obj;
    private Object response;
    
    public TcpServerHandler(Object obj) {
		super();
		this.obj = obj;
		
	}
	@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        // TODO Auto-generated method stub
		MethodAndArgs methodAndArgs=(MethodAndArgs) msg;
		Method method=obj.getClass().getMethod(methodAndArgs.getMethodName(), methodAndArgs.getTypes());
		ctx.writeAndFlush(method.invoke(obj, methodAndArgs.getObjects()));
        ctx.close();
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    	// TODO Auto-generated method stub
    	System.out.println("client die");
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // TODO Auto-generated method stub
        System.out.println("channelActive>>>>>>>>");
        ctx.writeAndFlush("调用异常");
        ctx.close();
    }
      @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("服务器异常");
    }
}
/**
 * 客户端处理
 * @author hadoop
 *
 */
 class TcpClientHander extends ChannelInboundHandlerAdapter {
	 private Object response;
	 
		public Object getResponse() {
		return response;
	}

		@Override
	    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
			response=msg;
	        System.out.println("client接收到服务器返回的消息:" + msg);
	    }
	    
		@Override
	    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
	        System.out.println("client exception is general");
	    }
	}
 
public class RpcFramework {

	/**
	 * 服务注册
	 * @param obj 需要注册的服务对象
	 * @param port 端口
	 * @param ip 地址
	 * @throws InterruptedException 
	 */
	public static void regist(final Object obj,int port,String ip) throws InterruptedException {
		int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors()*2;
		    
		int BIZTHREADSIZE = 100;
		 EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE);
		  EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE);
		if (obj == null)
			throw new IllegalArgumentException("对象不能为null");
		if (port <= 0 || port > 65535)
			throw new IllegalArgumentException("错误的端口" + port);
		 ServerBootstrap bootstrap = new ServerBootstrap();
	        bootstrap.group(bossGroup, workerGroup);
	        bootstrap.channel(NioServerSocketChannel.class);
	        bootstrap.childHandler(new ChannelInitializer<Channel>() {

	            @Override
	            protected void initChannel(Channel ch) throws Exception {
	                // TODO Auto-generated method stub
	                ChannelPipeline pipeline = ch.pipeline();
	                 pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
	                    pipeline.addLast(new LengthFieldPrepender(4));
	                    pipeline.addLast("encoder", new ObjectEncoder());  
	                    pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
	                  //  pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
	                   // pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
	                    pipeline.addLast(new TcpServerHandler(obj));
	            }       
	        });
	          ChannelFuture f = bootstrap.bind(ip, port).sync();
	          f.channel().closeFuture().sync();
	          System.out.println("TCP服务器已启动");
	}
	@SuppressWarnings("unchecked")
	public static <T>T getObj(Class<T> interfaceClass,final String host,final int port) {
		if (interfaceClass == null)
			throw new IllegalArgumentException("接口类型不能为空");
		if (!interfaceClass.isInterface())
			throw new IllegalArgumentException("类名" + interfaceClass.getName() + "必须是接口");
		if (host == null || host.length() == 0)
			throw new IllegalArgumentException("目标主机不能为空");
		if (port <= 0 || port > 65535)
			throw new IllegalArgumentException("端口错误:" + port);
		
	return (T)	Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new InvocationHandler() {
			
			@Override
			public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
				
				MethodAndArgs mArgs=new MethodAndArgs(method.getName(), method.getParameterTypes(), args);
				 final TcpClientHander tcpClientHander=new TcpClientHander();
				  EventLoopGroup group = new NioEventLoopGroup();
		          try {
		              Bootstrap b = new Bootstrap();
		              b.group(group);
		         //     b.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true);
		              b.channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true);
		             
		              b.handler(new ChannelInitializer<SocketChannel>() {
		                  @Override
		                  protected void initChannel(SocketChannel ch) throws Exception {
		                      ChannelPipeline pipeline = ch.pipeline();
		                      pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
		                      pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
		                      pipeline.addLast("encoder", new ObjectEncoder());  
		                      pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
		                      pipeline.addLast("handler",tcpClientHander);
		                  }
		              });
		              
		                   ChannelFuture f = b.connect(host, port).sync();    
		                
		                  f.channel().writeAndFlush(mArgs).sync();
		                   f.channel().closeFuture().sync();
		                 
		          } catch (Exception e) {

		          } finally {
		        	  
		              group.shutdownGracefully();
		          }
				return tcpClientHander.getResponse();
		     
				
			}
		});
	}
}

  测试

接口

package cc.ymsoft.test;

interface HelloService {
	String SayHello(String name);
}

 接口实现类

package cc.ymsoft.test;

public class HelloImp implements HelloService {

	@Override
	public String SayHello(String name) {
		// TODO Auto-generated method stub
		
		return "你好:"+name;
	}


}

 客户端

package cc.ymsoft.test;

import cc.ymsoft.Framework.RpcFramework;

public class HelloInvoke {
	
	public static void main(String[] args) throws Exception {
		final HelloService helloService = RpcFramework.getObj(HelloService.class, "127.0.0.1", 1717);
	
		
					System.out.println(helloService.SayHello("XL"));
				
		
	}

}

 服务端

 

package cc.ymsoft.test;

import cc.ymsoft.Framework.RpcFramework;

public class HelloPro {
	public static void main(String[] args) throws Exception {
		HelloService hello=new HelloImp();
		RpcFramework.regist(hello, 1717, "127.0.0.1");
	}
	
}

 

完整代码在github https://github.com/xulang/NettyRpc

分享到:
评论

相关推荐

    基于Netty的高性能RPC框架 .zip

    异步回调支持异步RPC调用,提升客户端并行吞吐量。 JMX监控提供JMX监控支持,加强服务端调用请求的分析统计能力。 过滤器链支持RPC请求过滤器链和监听器链,实现请求的链式处理。 版本历史 NettyRPC 1.0 基于...

    基于Netty重构RPC框架.rar

    1. **高性能**:Netty的异步模型和优化的缓冲区处理使得RPC调用更快,尤其适合大数据量和高并发场景。 2. **灵活性**:通过自定义Handler,可以轻松地添加或修改协议解析、序列化等环节,适应不同的RPC需求。 3. *...

    使用netty自定义rpc通信框架

    本项目是基于Netty 4.0实现的自定义RPC通信框架,旨在为Java开发者提供一种高效、灵活的远程服务调用解决方案。 首先,我们来深入理解Netty的基本概念。Netty的核心是其NIO(非阻塞I/O)模型,它使用了Reactor模式...

    simple-rpc是一款基于netty的RPC框架

    `Simple-RPC`作为一款基于Netty的RPC框架,旨在提供简单、高效的远程调用解决方案。通过其丰富的特性和对Spring框架的良好支持,开发者能够快速构建分布式系统,并享受到高性能、低延迟的网络通信体验。无论是小型...

    基于Netty的Rpc-SpringBoot-Starter.zip

    2. **服务消费**:服务消费者通过服务注册中心获取到服务提供者的地址信息,然后使用Netty客户端发送RPC调用请求。 3. **请求处理**:Netty服务器接收到请求后,会将其解码为业务对象,然后通过Spring的AOP机制找到...

    基于Netty实现了dubbo rpc

    而Netty则是一款高效的异步事件驱动网络应用框架,常用于构建高并发、高性能的网络服务器。本项目结合两者,利用Netty实现了Dubbo的RPC通信机制。 首先,理解Netty的工作原理是关键。Netty基于IO多路复用模型,如...

    基于netty的手写rpc框架

    在本项目中,我们将基于Netty实现一个手写的RPC框架。Netty是Java领域的一个高性能、异步事件驱动的网络应用程序框架,常用于构建高效的服务器和客户端。 首先,我们需要理解RPC框架的基本组成部分: 1. **服务...

    BootNettyRpc是一个采用Netty实现的Rpc框架适用于SpringBoot项目支持SpringCloud

    Netty是一款基于NIO(非阻塞I/O)的异步事件驱动的网络通信框架,常用于构建高度可扩展的网络应用。在RPC框架中,Netty可以提供快速的数据传输和高效的连接管理,为微服务架构中的服务间通信提供了强大的基础。 ...

    netty自定义rpc实现

    综上所述,实现一个基于Netty的自定义RPC框架,需要理解Netty的异步I/O模型,设计合理的RPC通信协议,利用Zookeeper进行服务注册与发现,同时考虑服务的高可用性和性能优化。通过分析提供的压缩包文件,我们可以深入...

    如何用Netty写一个自己的RPC框架

    7. 序列化与反序列化:在进行RPC调用时,需要将对象序列化成可以在网络上传输的格式,比如JSON、XML或者更高效的二进制格式。在远程机器接收到数据后,需要将这些数据反序列化成原始的对象。常见的序列化框架包括...

    基于Netty框架的RPC服务.zip

    本项目是一个基于Netty框架实现的RPC(远程过程调用)服务。Netty是一个高性能、异步事件驱动的网络应用框架,适用于快速开发可维护的高性能协议服务器和客户端。本项目利用Netty的强大功能,实现了RPC服务的客户端...

    基于 Spark Netty Rpc 框架,重新实现的一个 Netty Rpc 框架 ( scala + java ).zip

    标题中的“基于 Spark Netty Rpc 框架,重新实现的一个 Netty Rpc 框架 ( scala + java )”指的是一个开发项目,该项目利用了Apache Spark的Netty RPC(远程过程调用)机制,并以此为基础创建了一个新的RPC框架。...

    minidubbo:基于Netty的完整RPC框架

    《基于Netty的完整RPC框架:Minidubbo详解》 RPC(Remote Procedure Call)远程过程调用,是一种在分布式环境中实现程序间通信的技术。它使得客户端可以像调用本地方法一样调用远程服务器上的方法,极大地简化了...

    NettyRPC-master

    NettyRPC-master是一个基于Netty框架实现的远程过程调用(RPC)系统。Netty是一个高性能、异步事件驱动的网络应用框架,适用于开发可维护的高性能协议服务器和客户端。RPC(Remote Procedure Call)是一种允许程序在...

    一款基于Netty+Zookeeper+Spring实现的轻量级Java RPC框架。提供服务注册,发现,负载均衡,.zip

    标题中的“一款基于Netty+Zookeeper+Spring实现的轻量级Java RPC框架”揭示了这个项目的核心技术栈,它整合了三个在分布式系统中广泛使用的开源库:Netty、Zookeeper和Spring。让我们逐一深入探讨这三个技术以及它们...

    基于netty实现的支持长连接的rpc

    标题中的“基于netty实现的支持长连接的rpc”是指利用Netty框架构建一个远程过程调用(RPC)系统,该系统能够维持长时间的连接状态,提高通信效率。Netty是一个高性能、异步事件驱动的网络应用程序框架,适用于开发...

    netty-rpc-master.zip

    Netty-RPC是一个基于Netty、Zookeeper和Protostuff构建的简单RPC(远程过程调用)框架,它允许应用程序在分布式环境中透明地调用远程服务,如同调用本地方法一样便捷。本文将详细探讨这三个关键组件在RPC框架中的...

    基于Java实现的基于netty轻量的高性能分布式RPC服务框架

    【作品名称】:基于Java实现的基于netty轻量的高性能分布式RPC服务框架 【适用人群】:适用于希望学习不同技术领域的小白或进阶学习者。可作为毕设项目、课程设计、大作业、工程实训或初期项目立项。 【项目介绍】...

    消息推送中台websocket部分 + netty 手写 RPC

    接下来,我们转向Netty,这是一个用Java编写的异步事件驱动的网络应用程序框架,特别适合用于构建高效的RPC系统。Netty提供了高度可定制的ChannelHandler,通过它们可以处理进来的连接请求、数据传输和异常情况。在...

    一种基于NETTY的远程过程调用方法.pdf

    该发明的优点在于,通过Netty的异步非阻塞I/O特性,实现了高效的RPC调用,减少了网络通信的等待时间。同时,利用容器管理服务接口与实现的映射,使得服务发现和调用更加便捷,增强了系统的可扩展性和灵活性。此外,...

Global site tag (gtag.js) - Google Analytics