`
zhangwei_david
  • 浏览: 475889 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

实现RPC就是这么简单 之 Netty 实现

    博客分类:
  • Java
 
阅读更多

 

ServiceServer实现自动发现服务:

/**
 *
 * @author zhangwei
 * @version $Id: NettyServiceServer.java, v 0.1 2015年8月19日 下午2:08:37  $
 */
public class NettyServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware {

    /**服务端口号**/
    private int              port = 12000;

    private RpcServerHandler rpcServerHandler;

    private void publishedService() throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup(5);
        EventLoopGroup workerGroup = new NioEventLoopGroup(5);
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel channel) throws Exception {
                    channel.pipeline().addLast(new RpcDecoder(SrpcRequest.class))
                    .addLast(new RpcEncoder(SrpcResponse.class)).addLast(rpcServerHandler);
                }
            });

            //绑定主机+端口
            ChannelFuture future = serverBootstrap.bind("127.0.0.1", port).sync();

            // 等待服务监听端口关闭
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    /**
     * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        publishedService();
    }

    /**
     * @see org.springframework.context.Lifecycle#start()
     */
    @Override
    public void start() {
    }

    /**
     * @see org.springframework.context.Lifecycle#stop()
     */
    @Override
    public void stop() {

    }

    /**
     * @see org.springframework.context.Lifecycle#isRunning()
     */
    @Override
    public boolean isRunning() {
        return false;
    }

    /**
     * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext)
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class);
        Map<String, Object> handlerMap = new HashMap<String, Object>();
        if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) {
            for (Object serviceBean : serviceBeanMap.values()) {
                String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf()
                        .getName();
                handlerMap.put(interfaceName, serviceBean);
            }
        }
        System.out.println("自动注册的服务SRPC 服务有:" + handlerMap.keySet());
        rpcServerHandler = new RpcServerHandler(handlerMap);
    }
}

 注意:在该类上添加了@Sharable注解,如果没有改注解则该ChannelHandler不允许多次读写

Netty将不会再同步地调用ChannelHandler的方法了,除非ChannelHandler由@Shareable注解

/**
 *
 * @author zhangwei_PF
 * @version $Id: RpcServerHandler.java, v 0.1 2015年8月19日 下午2:17:34  $
 */
@Sharable
public class RpcServerHandler extends SimpleChannelInboundHandler<SrpcRequest> {

    private final Map<String, Object> handlerMap;

    public RpcServerHandler(Map<String, Object> handlerMap) {
        this.handlerMap = handlerMap;
    }

    /**
     * @see io.netty.channel.SimpleChannelInboundHandler#channelRead0(io.netty.channel.ChannelHandlerContext, java.lang.Object)
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, SrpcRequest srpcRequest)
            throws Exception {

        SrpcResponse response = new SrpcResponse();
        response.setRequestId(srpcRequest.getRequestId());
        try {
            response.setResult(handle(srpcRequest));
        } catch (Exception e) {
            response.setError(e);
            e.printStackTrace();
        }

        ctx.writeAndFlush(response);
    }

    /**
     *
     * @param srpcRequest
     * @return
     * @throws Exception
     */
    private Object handle(SrpcRequest request) throws Exception {
        Object service = handlerMap.get(request.getInterfaceName());

        Method method = service.getClass().getMethod(request.getMethodName(),
            request.getParameterTypes());

        return method.invoke(service, request.getParameters());

    }

}

 

/**
 *
 * @author zhangwei_david
 * @version $Id: RpcEncoder.java, v 0.1 2014年12月31日 下午8:55:25 zhangwei_david Exp $
 */
@SuppressWarnings("rawtypes")
public class RpcEncoder extends MessageToByteEncoder {

    private Class<?> genericClass;

    public RpcEncoder(Class<?> genericClass) {
        this.genericClass = genericClass;
    }

    @Override
    public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
        if (genericClass.isInstance(in)) {
            byte[] data = SerializationUtil.serializer(in);
            out.writeInt(data.length);
            out.writeBytes(data);
        }
    }
}

 

/**
 *
 * @author zhangwei_david
 * @version $Id: RpcDecoder.java, v 0.1 2014年12月31日 下午8:53:16 zhangwei_david Exp $
 */
public class RpcDecoder extends ByteToMessageDecoder {

    private Class<?> genericClass;

    public RpcDecoder(Class<?> genericClass) {
        this.genericClass = genericClass;
    }

    @Override
    public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
            throws Exception {
        if (in.readableBytes() < 4) {
            return;
        }
        in.markReaderIndex();
        int dataLength = in.readInt();
        if (dataLength < 0) {
            ctx.close();
        }
        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex();
        }
        byte[] data = new byte[dataLength];
        in.readBytes(data);

        Object obj = SerializationUtil.deserializer(data, genericClass);
        out.add(obj);
    }

 

/**
 *
 * @author zhangwei_david
 * @version $Id: SrpcResponse.java, v 0.1 2015年8月8日 下午1:47:46 zhangwei_david Exp $
 */
public class SrpcResponse implements Serializable {
    /**  */
    private static final long serialVersionUID = -5934073769679010930L;
    // 请求的Id
    private String            requestId;
    // 异常
    private Throwable         error;
    // 响应
    private Object            result;

    /**
     * Getter method for property <tt>requestId</tt>.
     *
     * @return property value of requestId
     */
    public String getRequestId() {
        return requestId;
    }

    /**
     * Setter method for property <tt>requestId</tt>.
     *
     * @param requestId value to be assigned to property requestId
     */
    public void setRequestId(String requestId) {
        this.requestId = requestId;
    }

    /**
     * Getter method for property <tt>error</tt>.
     *
     * @return property value of error
     */
    public Throwable getError() {
        return error;
    }

    /**
     * Setter method for property <tt>error</tt>.
     *
     * @param error value to be assigned to property error
     */
    public void setError(Throwable error) {
        this.error = error;
    }

    /**
     * Getter method for property <tt>result</tt>.
     *
     * @return property value of result
     */
    public Object getResult() {
        return result;
    }

    /**
     * Setter method for property <tt>result</tt>.
     *
     * @param result value to be assigned to property result
     */
    public void setResult(Object result) {
        this.result = result;
    }

    /**
     * @see java.lang.Object#toString()
     */
    @Override
    public String toString() {
        return "SrpcResponse [requestId=" + requestId + ", error=" + error + ", result=" + result
                + "]";
    }

}

 

/**
 *
 * @author zhangwei_david
 * @version $Id: SrpcRequest.java, v 0.1 2015年8月8日 下午1:45:53 zhangwei_david Exp $
 */
public class SrpcRequest implements Serializable {

    /**  */
    private static final long serialVersionUID = 6132853628325824727L;
    // 请求Id
    private String            requestId;
    // 远程调用接口名称
    private String            interfaceName;
    //远程调用方法名称
    private String            methodName;
    // 参数类型
    private Class<?>[]        parameterTypes;
    // 参数值
    private Object[]          parameters;

    /**
     * Getter method for property <tt>requestId</tt>.
     *
     * @return property value of requestId
     */
    public String getRequestId() {
        return requestId;
    }

    /**
     * Setter method for property <tt>requestId</tt>.
     *
     * @param requestId value to be assigned to property requestId
     */
    public void setRequestId(String requestId) {
        this.requestId = requestId;
    }

    /**
     * Getter method for property <tt>interfaceName</tt>.
     *
     * @return property value of interfaceName
     */
    public String getInterfaceName() {
        return interfaceName;
    }

    /**
     * Setter method for property <tt>interfaceName</tt>.
     *
     * @param interfaceName value to be assigned to property interfaceName
     */
    public void setInterfaceName(String interfaceName) {
        this.interfaceName = interfaceName;
    }

    /**
     * Getter method for property <tt>methodName</tt>.
     *
     * @return property value of methodName
     */
    public String getMethodName() {
        return methodName;
    }

    /**
     * Setter method for property <tt>methodName</tt>.
     *
     * @param methodName value to be assigned to property methodName
     */
    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    /**
     * Getter method for property <tt>parameterTypes</tt>.
     *
     * @return property value of parameterTypes
     */
    public Class<?>[] getParameterTypes() {
        return parameterTypes;
    }

    /**
     * Setter method for property <tt>parameterTypes</tt>.
     *
     * @param parameterTypes value to be assigned to property parameterTypes
     */
    public void setParameterTypes(Class<?>[] parameterTypes) {
        this.parameterTypes = parameterTypes;
    }

    /**
     * Getter method for property <tt>parameters</tt>.
     *
     * @return property value of parameters
     */
    public Object[] getParameters() {
        return parameters;
    }

    /**
     * Setter method for property <tt>parameters</tt>.
     *
     * @param parameters value to be assigned to property parameters
     */
    public void setParameters(Object[] parameters) {
        this.parameters = parameters;
    }

    /**
     * @see java.lang.Object#toString()
     */
    @Override
    public String toString() {
        return "SrpcRequest [requestId=" + requestId + ", interfaceName=" + interfaceName
                + ", methodName=" + methodName + ", parameterTypes="
                + Arrays.toString(parameterTypes) + ", parameters=" + Arrays.toString(parameters)
                + "]";
    }

}

 

/**
 *
 * @author zhangwei_PF
 * @version $Id: SrpcRequestSender.java, v 0.1 2015年8月20日 下午2:13:31  $
 */
@Sharable
public class SrpcRequestSender extends SimpleChannelInboundHandler<SrpcResponse> {

    //final CountDownLatch latch = new CountDownLatch(1);

    private BlockingQueue<SrpcResponse> responseHodler = new LinkedBlockingQueue<SrpcResponse>(1);

    //   private SrpcResponse                response;

    @Override
    public void channelRead0(ChannelHandlerContext ctx, SrpcResponse response) throws Exception {
        //  this.response = response;
        // latch.countDown();
        responseHodler.put(response);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    public SrpcResponse send(SrpcRequest request, String host, int port) throws Exception {

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {

                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new RpcEncoder(SrpcRequest.class))
                            .addLast(new RpcDecoder(SrpcResponse.class))
                            .addLast(SrpcRequestSender.this);
                    }

                });
            ChannelFuture future = bootstrap.connect(host, port).sync();
            Channel channel = future.channel();

            channel.writeAndFlush(request).sync();
            /**
             *
             * 使用闭锁实现等待
             */
            //  latch.await();
            SrpcResponse response = responseHodler.take();
            System.out.println("send request is " + request + "receive response is " + response);
            channel.closeFuture();
            return response;
        } finally {
            group.shutdownGracefully();
        }

    }

}

 

/**
 *
 * @author zhangwei_PF
 * @version $Id: RpcClientProxy.java, v 0.1 2015年8月19日 下午6:01:26  $
 */
public class SrpcProxyFactory {

    /**
     * 引用服务
     *
     * @param <T> 接口泛型
     * @param interfaceClass 接口类型
     * @param host 服务器主机名
     * @param port 服务器端口
     * @return 远程服务
     * @throws Exception
     */
    @SuppressWarnings("unchecked")
    public static <T> T create(final Class<T> interfaceClass, final String host, final int port)
                                                                                                throws Exception {
        if (interfaceClass == null || !interfaceClass.isInterface()) {
            throw new IllegalArgumentException("必须指定服务接口");
        }

        if (host == null || host.length() == 0) {
            throw new IllegalArgumentException("必须指定服务器的地址和端口号");
        }

        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
            new Class<?>[] { interfaceClass }, new InvocationHandler() {
                @Override
                public Object invoke(Object proxy, Method method, Object[] arguments)
                                                                                     throws Throwable {

                    SrpcRequest request = new SrpcRequest();
                    request.setRequestId(UUID.randomUUID().toString());
                    request.setInterfaceName(interfaceClass.getName());
                    request.setMethodName(method.getName());
                    request.setParameterTypes(method.getParameterTypes());
                    request.setParameters(arguments);
                    SrpcResponse response = new SrpcRequestSender().send(request, host, port);

                    if (response == null
                        || !StringUtils.equals(request.getRequestId(), response.getRequestId())) {
                        return null;
                    }
                    if (response.getError() != null) {
                        throw response.getError();
                    }
                    return response.getResult();

                }
            });

    }
}

 

1
3
分享到:
评论

相关推荐

    一款基于Netty+Zookeeper+Spring实现的轻量级Java RPC框架。提供服务注册,发现,负载均衡,.zip

    标题中的“一款基于Netty+Zookeeper+Spring实现的轻量级Java RPC框架”揭示了这个项目的核心技术栈,它整合了三个在分布式系统中广泛使用的开源库:Netty、Zookeeper和Spring。让我们逐一深入探讨这三个技术以及它们...

    基于Netty实现了dubbo rpc

    【标题】基于Netty实现Dubbo RPC 在分布式系统中,RPC(Remote Procedure Call)是一种常见且重要的通信机制,它允许一个程序调用另一个在不同进程中运行的程序,就像调用本地函数一样简单。Dubbo作为阿里巴巴开源...

    netty的rpc协议的简单实现

    通过以上步骤,我们可以使用Netty实现一个简单的RPC框架。在实际应用中,还需要考虑安全性、性能优化、服务治理等多个方面。项目的源代码“rpc-demo”提供了具体的实现细节,包括服务端、客户端、编解码器以及相关...

    基于netty实现的支持长连接的rpc

    标题中的“基于netty实现的支持长连接的rpc”是指利用Netty框架构建一个远程过程调用(RPC)系统,该系统能够维持长时间的连接状态,提高通信效率。Netty是一个高性能、异步事件驱动的网络应用程序框架,适用于开发...

    simple-rpc是一款基于netty的RPC框架

    **简单RPC:基于Netty的高效RPC框架** `Simple-RPC`是一个专为Java开发者设计的轻量级RPC(远程过程调用)框架,它利用了高性能的网络库`Netty`来实现实时、高效的远程通信。RPC框架在分布式系统中起到了关键作用,...

    使用netty自定义rpc通信框架

    - `rpc_netty`:这个目录可能包含了使用Netty实现的RPC框架核心代码。 - `readme.txt`:项目说明文件,详细介绍了如何使用这个RPC框架。 - `rpc_web_test`:可能是一个测试示例,展示如何在Web应用中集成和使用这个...

    基于Zookeeper+Netty+Protostuff实现的简单RPC框架源码

    基于Zookeeper+Netty+Protostuff实现的简单RPC框架源码,代码内有详细注释

    NettyRpc:一个基于Netty,ZooKeeper和Spring的简单RPC框架

    NettyRpc 基于Netty,ZooKeeper和Spring的RPC框架中文详情: 特征: 简单的代码和框架 ZooKeeper的服务注册表/发现支持 高可用性,负载平衡和故障转移 支持不同的负载均衡策略 支持异步/同步调用 支持不同版本的...

    一个基于Nacos、Netty、Protobuf 实现的简单易懂的RCP框架.zip

    开发者可以克隆或下载这个仓库,然后在本地环境中编译和运行项目,以了解和学习如何利用Nacos、Netty和Protobuf实现RPC框架。 这个框架的实现对于学习分布式系统、网络编程以及序列化技术非常有帮助。开发者可以...

    guide-rpc-framework:由Netty + Kyro + Zookeeper实现的自定义RPC框架。(基于Netty + Kyro + Zookeeper实现的自定义RPC框架-附加详细实现过程和相关教程。)

    guide 目前只实现了RPC框架最基本的功能,一些可优化点都在下面提到了,有兴趣的小伙伴可以自我完善。 通过这个简易的轮子,你可以学到RPC的替代原理和原理以及各种Java编码实践的运用。 你甚至可以把当做你的毕设/...

    如何用Netty写一个自己的RPC框架

    Netty是一个基于Java的高性能、高可靠性的网络应用程序框架,广泛用于实现各种网络协议,尤其是在需要处理大量并发连接的场景下。RPC(Remote Procedure Call)即远程过程调用,它是一种计算机通信协议。该协议允许...

    netty实现自定义rpc.zip

    本项目"Netty实现自定义RPC"旨在教你如何利用Netty构建自己的RPC框架。 首先,我们需要理解RPC的基本原理。RPC的核心是将远程服务调用的过程透明化,使它看起来就像是本地方法调用一样。这涉及到序列化、网络通信和...

    用Netty实现一个简单的RPC框架

    用Netty实现一个简单的RPC框架,基本上rpc主要的知识点都涉及到了,包括协议的定义,序列化反序列化,动态代理,Spring自动装配,Netty编解码器等。可以通过这个项目加强对Netty的学习掌握,也可以加深对RPC的理解。...

    【项目实战】Netty源码剖析&NIO;+Netty5各种RPC架构实战演练三部曲视频教程(未加密)

    ### Netty源码剖析与NIO及Netty5各种RPC架构实战演练三部曲知识点解析 #### 一、Netty概述 ...以上就是对Netty源码剖析及NIO与Netty5在RPC架构中的实战演练的知识点总结,希望能够对大家有所帮助。

    netty解读及如何设计一个简单的RPC框架

    在深入理解Netty的同时,设计一个简单的RPC(Remote Procedure Call)框架可以帮助我们更好地掌握网络通信的核心原理。 首先,让我们来探讨Netty的基础知识。Netty的核心概念包括BossGroup和WorkerGroup,这两个都...

    基于Java实现的基于netty轻量的高性能分布式RPC服务框架

    【作品名称】:基于Java实现的基于netty轻量的高性能分布式RPC服务框架 【适用人群】:适用于希望学习不同技术领域的小白或进阶学习者。可作为毕设项目、课程设计、大作业、工程实训或初期项目立项。 【项目介绍】...

    netty RPC

    Netty RPC 是一个基于 Netty 框架实现的远程过程调用(RPC)系统,它允许分布式系统中的不同节点之间高效地交换数据。RPC 的核心思想是让开发者能够像调用本地方法一样调用远程服务,从而简化分布式系统的开发。在本...

    消息推送中台websocket部分 + netty 手写 RPC

    本项目结合了WebSocket与Netty,旨在深入理解消息推送中台的实现原理以及RPC的底层机制。 首先,让我们详细讨论WebSocket。WebSocket是一种在客户端和服务器之间建立长连接的协议,与传统的HTTP协议不同,它允许...

    Netty进阶之路-跟着案例学Netty

    接着,通过一系列逐步进阶的案例,如构建简单的Echo服务器、处理HTTP请求、实现WebSocket通信,读者将深入理解如何使用Netty进行网络编程。 在处理TCP连接方面,书中会详细讲解如何建立、管理和关闭连接,以及如何...

    oh-netty-rpc:netty4构建一个简单的rpc服务器和客户端

    oh-netty-rpc-client : 基于netty实现的RPC client,使用JDK动态代理实现RPC client,隐藏了底层实现细节,使服务调用看起来像是本地调用(网络通讯、编解码、远程调用) oh-netty-rpc-protocol : 封装了RPC通讯之间...

Global site tag (gtag.js) - Google Analytics