基于netty写的一个异步Rpc调用小框架,欢迎拍砖,新手。
客户端与服务端通信的类
package cc.ymsoft.Framework; import java.io.Serializable; @SuppressWarnings("serial") public class MethodAndArgs implements Serializable{ private String methodName;//调用的方法名称 private Class<?>[] types;//参数类型 private Object[] objects;//参数列表 public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Class<?>[] getTypes() { return types; } public void setTypes(Class<?>[] types) { this.types = types; } public Object[] getObjects() { return objects; } public void setObjects(Object[] objects) { this.objects = objects; } public MethodAndArgs() { super(); // TODO Auto-generated constructor stub } public MethodAndArgs(String methodName, Class<?>[] types, Object[] objects) { this.methodName = methodName; this.types = types; this.objects = objects; } }
框架类,有两个静态方法,regist(在服务器上注册服务)和getobjt(获得接口的代理类)
/** * @author xulang */ package cc.ymsoft.Framework; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.HashMap; import java.util.Map; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; /** * 服务端处理 * @author hadoop * */ class TcpServerHandler extends ChannelInboundHandlerAdapter { private Object obj; private Object response; public TcpServerHandler(Object obj) { super(); this.obj = obj; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // TODO Auto-generated method stub MethodAndArgs methodAndArgs=(MethodAndArgs) msg; Method method=obj.getClass().getMethod(methodAndArgs.getMethodName(), methodAndArgs.getTypes()); ctx.writeAndFlush(method.invoke(obj, methodAndArgs.getObjects())); ctx.close(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub System.out.println("client die"); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub System.out.println("channelActive>>>>>>>>"); ctx.writeAndFlush("调用异常"); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("服务器异常"); } } /** * 客户端处理 * @author hadoop * */ class TcpClientHander extends ChannelInboundHandlerAdapter { private Object response; public Object getResponse() { return response; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { response=msg; System.out.println("client接收到服务器返回的消息:" + msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("client exception is general"); } } public class RpcFramework { /** * 服务注册 * @param obj 需要注册的服务对象 * @param port 端口 * @param ip 地址 * @throws InterruptedException */ public static void regist(final Object obj,int port,String ip) throws InterruptedException { int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors()*2; int BIZTHREADSIZE = 100; EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE); EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE); if (obj == null) throw new IllegalArgumentException("对象不能为null"); if (port <= 0 || port > 65535) throw new IllegalArgumentException("错误的端口" + port); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { // TODO Auto-generated method stub ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); pipeline.addLast("encoder", new ObjectEncoder()); pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))); // pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); // pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new TcpServerHandler(obj)); } }); ChannelFuture f = bootstrap.bind(ip, port).sync(); f.channel().closeFuture().sync(); System.out.println("TCP服务器已启动"); } @SuppressWarnings("unchecked") public static <T>T getObj(Class<T> interfaceClass,final String host,final int port) { if (interfaceClass == null) throw new IllegalArgumentException("接口类型不能为空"); if (!interfaceClass.isInterface()) throw new IllegalArgumentException("类名" + interfaceClass.getName() + "必须是接口"); if (host == null || host.length() == 0) throw new IllegalArgumentException("目标主机不能为空"); if (port <= 0 || port > 65535) throw new IllegalArgumentException("端口错误:" + port); return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { MethodAndArgs mArgs=new MethodAndArgs(method.getName(), method.getParameterTypes(), args); final TcpClientHander tcpClientHander=new TcpClientHander(); EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group); // b.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true); b.channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true); b.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); pipeline.addLast("encoder", new ObjectEncoder()); pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))); pipeline.addLast("handler",tcpClientHander); } }); ChannelFuture f = b.connect(host, port).sync(); f.channel().writeAndFlush(mArgs).sync(); f.channel().closeFuture().sync(); } catch (Exception e) { } finally { group.shutdownGracefully(); } return tcpClientHander.getResponse(); } }); } }
测试
接口
package cc.ymsoft.test; interface HelloService { String SayHello(String name); }
接口实现类
package cc.ymsoft.test; public class HelloImp implements HelloService { @Override public String SayHello(String name) { // TODO Auto-generated method stub return "你好:"+name; } }
客户端
package cc.ymsoft.test; import cc.ymsoft.Framework.RpcFramework; public class HelloInvoke { public static void main(String[] args) throws Exception { final HelloService helloService = RpcFramework.getObj(HelloService.class, "127.0.0.1", 1717); System.out.println(helloService.SayHello("XL")); } }
服务端
package cc.ymsoft.test; import cc.ymsoft.Framework.RpcFramework; public class HelloPro { public static void main(String[] args) throws Exception { HelloService hello=new HelloImp(); RpcFramework.regist(hello, 1717, "127.0.0.1"); } }
完整代码在github https://github.com/xulang/NettyRpc
相关推荐
异步回调支持异步RPC调用,提升客户端并行吞吐量。 JMX监控提供JMX监控支持,加强服务端调用请求的分析统计能力。 过滤器链支持RPC请求过滤器链和监听器链,实现请求的链式处理。 版本历史 NettyRPC 1.0 基于...
1. **高性能**:Netty的异步模型和优化的缓冲区处理使得RPC调用更快,尤其适合大数据量和高并发场景。 2. **灵活性**:通过自定义Handler,可以轻松地添加或修改协议解析、序列化等环节,适应不同的RPC需求。 3. *...
本项目是基于Netty 4.0实现的自定义RPC通信框架,旨在为Java开发者提供一种高效、灵活的远程服务调用解决方案。 首先,我们来深入理解Netty的基本概念。Netty的核心是其NIO(非阻塞I/O)模型,它使用了Reactor模式...
`Simple-RPC`作为一款基于Netty的RPC框架,旨在提供简单、高效的远程调用解决方案。通过其丰富的特性和对Spring框架的良好支持,开发者能够快速构建分布式系统,并享受到高性能、低延迟的网络通信体验。无论是小型...
2. **服务消费**:服务消费者通过服务注册中心获取到服务提供者的地址信息,然后使用Netty客户端发送RPC调用请求。 3. **请求处理**:Netty服务器接收到请求后,会将其解码为业务对象,然后通过Spring的AOP机制找到...
而Netty则是一款高效的异步事件驱动网络应用框架,常用于构建高并发、高性能的网络服务器。本项目结合两者,利用Netty实现了Dubbo的RPC通信机制。 首先,理解Netty的工作原理是关键。Netty基于IO多路复用模型,如...
在本项目中,我们将基于Netty实现一个手写的RPC框架。Netty是Java领域的一个高性能、异步事件驱动的网络应用程序框架,常用于构建高效的服务器和客户端。 首先,我们需要理解RPC框架的基本组成部分: 1. **服务...
Netty是一款基于NIO(非阻塞I/O)的异步事件驱动的网络通信框架,常用于构建高度可扩展的网络应用。在RPC框架中,Netty可以提供快速的数据传输和高效的连接管理,为微服务架构中的服务间通信提供了强大的基础。 ...
综上所述,实现一个基于Netty的自定义RPC框架,需要理解Netty的异步I/O模型,设计合理的RPC通信协议,利用Zookeeper进行服务注册与发现,同时考虑服务的高可用性和性能优化。通过分析提供的压缩包文件,我们可以深入...
7. 序列化与反序列化:在进行RPC调用时,需要将对象序列化成可以在网络上传输的格式,比如JSON、XML或者更高效的二进制格式。在远程机器接收到数据后,需要将这些数据反序列化成原始的对象。常见的序列化框架包括...
本项目是一个基于Netty框架实现的RPC(远程过程调用)服务。Netty是一个高性能、异步事件驱动的网络应用框架,适用于快速开发可维护的高性能协议服务器和客户端。本项目利用Netty的强大功能,实现了RPC服务的客户端...
标题中的“基于 Spark Netty Rpc 框架,重新实现的一个 Netty Rpc 框架 ( scala + java )”指的是一个开发项目,该项目利用了Apache Spark的Netty RPC(远程过程调用)机制,并以此为基础创建了一个新的RPC框架。...
《基于Netty的完整RPC框架:Minidubbo详解》 RPC(Remote Procedure Call)远程过程调用,是一种在分布式环境中实现程序间通信的技术。它使得客户端可以像调用本地方法一样调用远程服务器上的方法,极大地简化了...
NettyRPC-master是一个基于Netty框架实现的远程过程调用(RPC)系统。Netty是一个高性能、异步事件驱动的网络应用框架,适用于开发可维护的高性能协议服务器和客户端。RPC(Remote Procedure Call)是一种允许程序在...
标题中的“一款基于Netty+Zookeeper+Spring实现的轻量级Java RPC框架”揭示了这个项目的核心技术栈,它整合了三个在分布式系统中广泛使用的开源库:Netty、Zookeeper和Spring。让我们逐一深入探讨这三个技术以及它们...
标题中的“基于netty实现的支持长连接的rpc”是指利用Netty框架构建一个远程过程调用(RPC)系统,该系统能够维持长时间的连接状态,提高通信效率。Netty是一个高性能、异步事件驱动的网络应用程序框架,适用于开发...
Netty-RPC是一个基于Netty、Zookeeper和Protostuff构建的简单RPC(远程过程调用)框架,它允许应用程序在分布式环境中透明地调用远程服务,如同调用本地方法一样便捷。本文将详细探讨这三个关键组件在RPC框架中的...
【作品名称】:基于Java实现的基于netty轻量的高性能分布式RPC服务框架 【适用人群】:适用于希望学习不同技术领域的小白或进阶学习者。可作为毕设项目、课程设计、大作业、工程实训或初期项目立项。 【项目介绍】...
接下来,我们转向Netty,这是一个用Java编写的异步事件驱动的网络应用程序框架,特别适合用于构建高效的RPC系统。Netty提供了高度可定制的ChannelHandler,通过它们可以处理进来的连接请求、数据传输和异常情况。在...
该发明的优点在于,通过Netty的异步非阻塞I/O特性,实现了高效的RPC调用,减少了网络通信的等待时间。同时,利用容器管理服务接口与实现的映射,使得服务发现和调用更加便捷,增强了系统的可扩展性和灵活性。此外,...