ServiceServer实现自动发现服务:
/** * * @author zhangwei * @version $Id: NettyServiceServer.java, v 0.1 2015年8月19日 下午2:08:37 $ */ public class NettyServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware { /**服务端口号**/ private int port = 12000; private RpcServerHandler rpcServerHandler; private void publishedService() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(5); EventLoopGroup workerGroup = new NioEventLoopGroup(5); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { channel.pipeline().addLast(new RpcDecoder(SrpcRequest.class)) .addLast(new RpcEncoder(SrpcResponse.class)).addLast(rpcServerHandler); } }); //绑定主机+端口 ChannelFuture future = serverBootstrap.bind("127.0.0.1", port).sync(); // 等待服务监听端口关闭 future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } /** * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() */ @Override public void afterPropertiesSet() throws Exception { publishedService(); } /** * @see org.springframework.context.Lifecycle#start() */ @Override public void start() { } /** * @see org.springframework.context.Lifecycle#stop() */ @Override public void stop() { } /** * @see org.springframework.context.Lifecycle#isRunning() */ @Override public boolean isRunning() { return false; } /** * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext) */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class); Map<String, Object> handlerMap = new HashMap<String, Object>(); if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) { for (Object serviceBean : serviceBeanMap.values()) { String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf() .getName(); handlerMap.put(interfaceName, serviceBean); } } System.out.println("自动注册的服务SRPC 服务有:" + handlerMap.keySet()); rpcServerHandler = new RpcServerHandler(handlerMap); } }
注意:在该类上添加了@Sharable注解,如果没有改注解则该ChannelHandler不允许多次读写
Netty将不会再同步地调用ChannelHandler的方法了,除非ChannelHandler由@Shareable注解
/** * * @author zhangwei_PF * @version $Id: RpcServerHandler.java, v 0.1 2015年8月19日 下午2:17:34 $ */ @Sharable public class RpcServerHandler extends SimpleChannelInboundHandler<SrpcRequest> { private final Map<String, Object> handlerMap; public RpcServerHandler(Map<String, Object> handlerMap) { this.handlerMap = handlerMap; } /** * @see io.netty.channel.SimpleChannelInboundHandler#channelRead0(io.netty.channel.ChannelHandlerContext, java.lang.Object) */ @Override protected void channelRead0(ChannelHandlerContext ctx, SrpcRequest srpcRequest) throws Exception { SrpcResponse response = new SrpcResponse(); response.setRequestId(srpcRequest.getRequestId()); try { response.setResult(handle(srpcRequest)); } catch (Exception e) { response.setError(e); e.printStackTrace(); } ctx.writeAndFlush(response); } /** * * @param srpcRequest * @return * @throws Exception */ private Object handle(SrpcRequest request) throws Exception { Object service = handlerMap.get(request.getInterfaceName()); Method method = service.getClass().getMethod(request.getMethodName(), request.getParameterTypes()); return method.invoke(service, request.getParameters()); } }
/** * * @author zhangwei_david * @version $Id: RpcEncoder.java, v 0.1 2014年12月31日 下午8:55:25 zhangwei_david Exp $ */ @SuppressWarnings("rawtypes") public class RpcEncoder extends MessageToByteEncoder { private Class<?> genericClass; public RpcEncoder(Class<?> genericClass) { this.genericClass = genericClass; } @Override public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception { if (genericClass.isInstance(in)) { byte[] data = SerializationUtil.serializer(in); out.writeInt(data.length); out.writeBytes(data); } } }
/** * * @author zhangwei_david * @version $Id: RpcDecoder.java, v 0.1 2014年12月31日 下午8:53:16 zhangwei_david Exp $ */ public class RpcDecoder extends ByteToMessageDecoder { private Class<?> genericClass; public RpcDecoder(Class<?> genericClass) { this.genericClass = genericClass; } @Override public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() < 4) { return; } in.markReaderIndex(); int dataLength = in.readInt(); if (dataLength < 0) { ctx.close(); } if (in.readableBytes() < dataLength) { in.resetReaderIndex(); } byte[] data = new byte[dataLength]; in.readBytes(data); Object obj = SerializationUtil.deserializer(data, genericClass); out.add(obj); }
/** * * @author zhangwei_david * @version $Id: SrpcResponse.java, v 0.1 2015年8月8日 下午1:47:46 zhangwei_david Exp $ */ public class SrpcResponse implements Serializable { /** */ private static final long serialVersionUID = -5934073769679010930L; // 请求的Id private String requestId; // 异常 private Throwable error; // 响应 private Object result; /** * Getter method for property <tt>requestId</tt>. * * @return property value of requestId */ public String getRequestId() { return requestId; } /** * Setter method for property <tt>requestId</tt>. * * @param requestId value to be assigned to property requestId */ public void setRequestId(String requestId) { this.requestId = requestId; } /** * Getter method for property <tt>error</tt>. * * @return property value of error */ public Throwable getError() { return error; } /** * Setter method for property <tt>error</tt>. * * @param error value to be assigned to property error */ public void setError(Throwable error) { this.error = error; } /** * Getter method for property <tt>result</tt>. * * @return property value of result */ public Object getResult() { return result; } /** * Setter method for property <tt>result</tt>. * * @param result value to be assigned to property result */ public void setResult(Object result) { this.result = result; } /** * @see java.lang.Object#toString() */ @Override public String toString() { return "SrpcResponse [requestId=" + requestId + ", error=" + error + ", result=" + result + "]"; } }
/** * * @author zhangwei_david * @version $Id: SrpcRequest.java, v 0.1 2015年8月8日 下午1:45:53 zhangwei_david Exp $ */ public class SrpcRequest implements Serializable { /** */ private static final long serialVersionUID = 6132853628325824727L; // 请求Id private String requestId; // 远程调用接口名称 private String interfaceName; //远程调用方法名称 private String methodName; // 参数类型 private Class<?>[] parameterTypes; // 参数值 private Object[] parameters; /** * Getter method for property <tt>requestId</tt>. * * @return property value of requestId */ public String getRequestId() { return requestId; } /** * Setter method for property <tt>requestId</tt>. * * @param requestId value to be assigned to property requestId */ public void setRequestId(String requestId) { this.requestId = requestId; } /** * Getter method for property <tt>interfaceName</tt>. * * @return property value of interfaceName */ public String getInterfaceName() { return interfaceName; } /** * Setter method for property <tt>interfaceName</tt>. * * @param interfaceName value to be assigned to property interfaceName */ public void setInterfaceName(String interfaceName) { this.interfaceName = interfaceName; } /** * Getter method for property <tt>methodName</tt>. * * @return property value of methodName */ public String getMethodName() { return methodName; } /** * Setter method for property <tt>methodName</tt>. * * @param methodName value to be assigned to property methodName */ public void setMethodName(String methodName) { this.methodName = methodName; } /** * Getter method for property <tt>parameterTypes</tt>. * * @return property value of parameterTypes */ public Class<?>[] getParameterTypes() { return parameterTypes; } /** * Setter method for property <tt>parameterTypes</tt>. * * @param parameterTypes value to be assigned to property parameterTypes */ public void setParameterTypes(Class<?>[] parameterTypes) { this.parameterTypes = parameterTypes; } /** * Getter method for property <tt>parameters</tt>. * * @return property value of parameters */ public Object[] getParameters() { return parameters; } /** * Setter method for property <tt>parameters</tt>. * * @param parameters value to be assigned to property parameters */ public void setParameters(Object[] parameters) { this.parameters = parameters; } /** * @see java.lang.Object#toString() */ @Override public String toString() { return "SrpcRequest [requestId=" + requestId + ", interfaceName=" + interfaceName + ", methodName=" + methodName + ", parameterTypes=" + Arrays.toString(parameterTypes) + ", parameters=" + Arrays.toString(parameters) + "]"; } }
/** * * @author zhangwei_PF * @version $Id: SrpcRequestSender.java, v 0.1 2015年8月20日 下午2:13:31 $ */ @Sharable public class SrpcRequestSender extends SimpleChannelInboundHandler<SrpcResponse> { //final CountDownLatch latch = new CountDownLatch(1); private BlockingQueue<SrpcResponse> responseHodler = new LinkedBlockingQueue<SrpcResponse>(1); // private SrpcResponse response; @Override public void channelRead0(ChannelHandlerContext ctx, SrpcResponse response) throws Exception { // this.response = response; // latch.countDown(); responseHodler.put(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } public SrpcResponse send(SrpcRequest request, String host, int port) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new RpcEncoder(SrpcRequest.class)) .addLast(new RpcDecoder(SrpcResponse.class)) .addLast(SrpcRequestSender.this); } }); ChannelFuture future = bootstrap.connect(host, port).sync(); Channel channel = future.channel(); channel.writeAndFlush(request).sync(); /** * * 使用闭锁实现等待 */ // latch.await(); SrpcResponse response = responseHodler.take(); System.out.println("send request is " + request + "receive response is " + response); channel.closeFuture(); return response; } finally { group.shutdownGracefully(); } } }
/** * * @author zhangwei_PF * @version $Id: RpcClientProxy.java, v 0.1 2015年8月19日 下午6:01:26 $ */ public class SrpcProxyFactory { /** * 引用服务 * * @param <T> 接口泛型 * @param interfaceClass 接口类型 * @param host 服务器主机名 * @param port 服务器端口 * @return 远程服务 * @throws Exception */ @SuppressWarnings("unchecked") public static <T> T create(final Class<T> interfaceClass, final String host, final int port) throws Exception { if (interfaceClass == null || !interfaceClass.isInterface()) { throw new IllegalArgumentException("必须指定服务接口"); } if (host == null || host.length() == 0) { throw new IllegalArgumentException("必须指定服务器的地址和端口号"); } return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] { interfaceClass }, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable { SrpcRequest request = new SrpcRequest(); request.setRequestId(UUID.randomUUID().toString()); request.setInterfaceName(interfaceClass.getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParameters(arguments); SrpcResponse response = new SrpcRequestSender().send(request, host, port); if (response == null || !StringUtils.equals(request.getRequestId(), response.getRequestId())) { return null; } if (response.getError() != null) { throw response.getError(); } return response.getResult(); } }); } }
相关推荐
标题中的“一款基于Netty+Zookeeper+Spring实现的轻量级Java RPC框架”揭示了这个项目的核心技术栈,它整合了三个在分布式系统中广泛使用的开源库:Netty、Zookeeper和Spring。让我们逐一深入探讨这三个技术以及它们...
【标题】基于Netty实现Dubbo RPC 在分布式系统中,RPC(Remote Procedure Call)是一种常见且重要的通信机制,它允许一个程序调用另一个在不同进程中运行的程序,就像调用本地函数一样简单。Dubbo作为阿里巴巴开源...
通过以上步骤,我们可以使用Netty实现一个简单的RPC框架。在实际应用中,还需要考虑安全性、性能优化、服务治理等多个方面。项目的源代码“rpc-demo”提供了具体的实现细节,包括服务端、客户端、编解码器以及相关...
标题中的“基于netty实现的支持长连接的rpc”是指利用Netty框架构建一个远程过程调用(RPC)系统,该系统能够维持长时间的连接状态,提高通信效率。Netty是一个高性能、异步事件驱动的网络应用程序框架,适用于开发...
**简单RPC:基于Netty的高效RPC框架** `Simple-RPC`是一个专为Java开发者设计的轻量级RPC(远程过程调用)框架,它利用了高性能的网络库`Netty`来实现实时、高效的远程通信。RPC框架在分布式系统中起到了关键作用,...
- `rpc_netty`:这个目录可能包含了使用Netty实现的RPC框架核心代码。 - `readme.txt`:项目说明文件,详细介绍了如何使用这个RPC框架。 - `rpc_web_test`:可能是一个测试示例,展示如何在Web应用中集成和使用这个...
基于Zookeeper+Netty+Protostuff实现的简单RPC框架源码,代码内有详细注释
NettyRpc 基于Netty,ZooKeeper和Spring的RPC框架中文详情: 特征: 简单的代码和框架 ZooKeeper的服务注册表/发现支持 高可用性,负载平衡和故障转移 支持不同的负载均衡策略 支持异步/同步调用 支持不同版本的...
guide 目前只实现了RPC框架最基本的功能,一些可优化点都在下面提到了,有兴趣的小伙伴可以自我完善。 通过这个简易的轮子,你可以学到RPC的替代原理和原理以及各种Java编码实践的运用。 你甚至可以把当做你的毕设/...
Netty是一个基于Java的高性能、高可靠性的网络应用程序框架,广泛用于实现各种网络协议,尤其是在需要处理大量并发连接的场景下。RPC(Remote Procedure Call)即远程过程调用,它是一种计算机通信协议。该协议允许...
开发者可以克隆或下载这个仓库,然后在本地环境中编译和运行项目,以了解和学习如何利用Nacos、Netty和Protobuf实现RPC框架。 这个框架的实现对于学习分布式系统、网络编程以及序列化技术非常有帮助。开发者可以...
本项目"Netty实现自定义RPC"旨在教你如何利用Netty构建自己的RPC框架。 首先,我们需要理解RPC的基本原理。RPC的核心是将远程服务调用的过程透明化,使它看起来就像是本地方法调用一样。这涉及到序列化、网络通信和...
用Netty实现一个简单的RPC框架,基本上rpc主要的知识点都涉及到了,包括协议的定义,序列化反序列化,动态代理,Spring自动装配,Netty编解码器等。可以通过这个项目加强对Netty的学习掌握,也可以加深对RPC的理解。...
### Netty源码剖析与NIO及Netty5各种RPC架构实战演练三部曲知识点解析 #### 一、Netty概述 ...以上就是对Netty源码剖析及NIO与Netty5在RPC架构中的实战演练的知识点总结,希望能够对大家有所帮助。
在深入理解Netty的同时,设计一个简单的RPC(Remote Procedure Call)框架可以帮助我们更好地掌握网络通信的核心原理。 首先,让我们来探讨Netty的基础知识。Netty的核心概念包括BossGroup和WorkerGroup,这两个都...
【作品名称】:基于Java实现的基于netty轻量的高性能分布式RPC服务框架 【适用人群】:适用于希望学习不同技术领域的小白或进阶学习者。可作为毕设项目、课程设计、大作业、工程实训或初期项目立项。 【项目介绍】...
Netty RPC 是一个基于 Netty 框架实现的远程过程调用(RPC)系统,它允许分布式系统中的不同节点之间高效地交换数据。RPC 的核心思想是让开发者能够像调用本地方法一样调用远程服务,从而简化分布式系统的开发。在本...
本项目结合了WebSocket与Netty,旨在深入理解消息推送中台的实现原理以及RPC的底层机制。 首先,让我们详细讨论WebSocket。WebSocket是一种在客户端和服务器之间建立长连接的协议,与传统的HTTP协议不同,它允许...
接着,通过一系列逐步进阶的案例,如构建简单的Echo服务器、处理HTTP请求、实现WebSocket通信,读者将深入理解如何使用Netty进行网络编程。 在处理TCP连接方面,书中会详细讲解如何建立、管理和关闭连接,以及如何...
oh-netty-rpc-client : 基于netty实现的RPC client,使用JDK动态代理实现RPC client,隐藏了底层实现细节,使服务调用看起来像是本地调用(网络通讯、编解码、远程调用) oh-netty-rpc-protocol : 封装了RPC通讯之间...