`
beck5859509
  • 浏览: 110343 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

netty解读及如何设计一个简单的RPC框架

 
阅读更多
引言

RPC框架对于一个公司的系统来讲应该是一种非常重要的基础能力,承载各系统之间的各种远程调用,比如公司的HSF、TR。如何也实现一个简单的RPC框架,这个通常需要借助一些网络通信框架来实现,自己基于socket的编写实现从时间成本,稳定性上来讲并不推荐。JAVA应用借助比较流行的netty或者JVM自带的RMI来实现,而C应用可以利用像libevent库进行构建。


netty的优势

netty作为后起之秀,借鉴了很多前者优秀的经验。它是基于java nio包扩展的一个高性能高并发的异步网络通信框架,对比原来的java io包,做了很多的改进。最大的变化在于编程模型的改变,原来的输入输出(inputstream/outputstream)对于一次操作来讲是单向的,只能进行读或写操作,需要内部构造两个stream才能完成。而nio则使用channel的方式,读写共用一个管道对象,每个连接等于一个管道(实际使用中连接同一远程服务的多个请求可以共用同一个管道,是不是很高效,后面会有介绍)。
注:本文所指的RPC并不是APP端向网关发起的请求,而是指后台各系统之间的调用。APP端的RPC是走http请求,再由网关经过后台RPC请求到各业务方后台。


工作原理
一般的tcp server开启服务大概是如下几个步骤:bind(port) listen() accept() read() write(),netty也采用此种方式。不过是采用NIO的多路复用Selector模式来实现。参考以下NIO的实例。
        selector.select();
        Set selectedKeys = selector.selectedKeys();
        Iterator keyIterator = selectedKeys.iterator();
        while(keyIterator.hasNext()) {
        SelectionKey key = keyIterator.next();
        if(key.isAcceptable()) {
            // a connection was accepted by a ServerSocketChannel.
        } else if (key.isConnectable()) {
            // a connection was established with a remote server.
        } else if (key.isReadable()) {
            // a channel is ready for reading
        } else if (key.isWritable()) {
            // a channel is ready for writing
        }

上述模型乍一看是一个同步阻塞的,但实际上netty做了改进程,由一个boss线程进行循环收集就绪的IO事件(OP_CONNECT,OP_READ,OP_WRITE,OP_ACCEPT),然后初始化成一个channel,并进行dispatch到worker线程处理(Reactor模式),从应用层上看则是一个异步非阻塞。

netty tcp server构造示例:由一个boss线程组和一个worker线程组构成
        EventLoopGroup bossGroup = new NioEventLoopGroup();  
        EventLoopGroup workerGroup = new NioEventLoopGroup();  
        try {  
            ServerBootstrap b = new ServerBootstrap();  
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)

boss线程收集到就绪IO事件以后如何分发给worker线程组的?
可以参看NioServerSocketChannel.java

protected int doReadMessages(List<Object> buf) throws Exception {
    //收集IO事件,并构造成一个channel
    SocketChannel ch = javaChannel().accept();
    ...
boss线程中设置worker线程注册此channel,后续读写操作交由worker线程处理。ServerBootstrap.java
    try {
        childGroup.register(channel).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } 

childGroup是初始化NettyServer时传入的线程组,register方法里面做了比较多的设置,主要是针对channel的操作类ChannelHandler。



编码、解码及半包问题
上述的设计有点类似Serlvet中的filter,数据会传递到注册的每一个handler中,当然netty中是有顺序区分的。ChannelHandler的作用主要是解码、编码、数据转换、业务处理、写数据等,以下是一个经典的设置。
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    protected void initChannel(SocketChannel channel) throws Exception {
    ChannelPipeline pipeline = channel.pipeline();
    //解码
    pipeline.addLast("decoder", new ObjectDecoder());
    //编码
    pipeline.addLast("encoder", new ObjectEncoder());
    //业务处理
    pipeline.addLast("business", new BusinessHandler());
    ...

需要注意的是编解码类并不解决半包的问题,2种解决办法:

使用LengthFieldBasedFrameDecoder和LengthFieldPrepender,分别在构造函数里面设置长度字段。
自定义ChannelHandler,继承ByteToMessageDecoder,并实现decode方法。
decode()方法主要是从ByteBuf中收集数据,然后转换成Object并加到out这个List中,当然这个处理过程比较复杂,举一个场景:比如客户端发送了2048个字节,收包的缓冲区大小是1024,则下面的这个decode方法会执行两次。第一次收到的包会存储在cumulation的变量中,这就解释了为什么一般编解码器不标注@Sharable的原因。不同的channel不能共享cumulation这个ByteBuf数据。
    /*
     Decode the from one bytebuf to an other. This method will be called till either the input bytebuf has nothing to read when return from this method or till nothing was read from the input bytebuf 
     */
    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;


序列化
上面的编码、解码也可以理解成序列化,如果服务之间调用量不大,选择java默认的序列化或者json,对于调用量大,追求传输效率的可以选择hessian或者protobuf,tr调用中默认使用的是hessian。
public byte[] encode(Object obj) throws CodecException {
        ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
        Hessian2Output output = new Hessian2Output(byteArray);
        output.setSerializerFactory(serializerFactory);
        try {
            output.writeObject(obj);
            output.close();
        } catch (IOException e) {
            throw new CodecException("IOException occurred when Hessian serializer encode!", e);
        }

protobuf方式序列化:
有一个需要注意的地方就是在channelHandler中设置pb encoder/decoder时只能指定一种decoder.

pipeline.addLast(new ProtobufDecoder(BussinessPB.getDefaultInstance()));
上面例子中相当于是proto文件被写死了,在数据的头部时传入业务的cmdCode或者type,动态去解析PB格式。



发送数据
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
   ctx.channel().writeAndFlush(bytebuf).addListener(ChannelFutureListener.CLOSE);

上面的例子中发送完数据以后,会关闭连接。但实际应用中我们更希望连接能够重用,毕竟内部系统之间重复建连无法发挥企业内部的网络性能优势。这个地方可以构造一个conntionPool,这里不再敖述。

异步变同步

客户端发起一次调用,需要等待服务端的响应,因为nio是异步的,如何实现呢。
public ResponseCommand waitResponse(long timeoutMillis) throws InterruptedException {
        this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
        return this.responseCommand;
    }


public void putResponse(RemotingCommand response) {
        this.responseCommand = (ResponseCommand) response;
        this.countDownLatch.countDown();
    }

client端发送完请求后,利用countDownLatch的特性阻塞当前线程,直到countDownLatch的值为0.也可以使用BlockingQueque,requestId对应一个BlockingQueque,接收数据时的线程put response到BlockingQueque,前面的线程take这个BlockingQueque



调用方式
对于调用方来讲,一般拿到的是服务方的接口。比如com.xxx.Service,使用动态代理的方式构建出一个com.xxx.Service的代理类,在反射的方法里面,进行RPC调用。从调用方视角上看,这完全是一个本地调用。比如:
LiveFacade service = RpcServiceFactory.getRpcService(LiveFacade.class, rpcClient);
service.doXXX();

对于服务提供方来讲,在应用启动的时候收集发布对外的服务,一般这个过程是结合Spring来做,调用方请求过来时,可以根据请求的方法,参数进行反射调用。



路由寻址
客户端调用时需要知道服务提供方的IP地址、端口号,这个一般是通过ConfigServer获取,这里不在敖述,参考下面的流程图,如果需要自己实现一个ConfigServer,借助zookeeper来实现。





  • 大小: 159.2 KB
分享到:
评论

相关推荐

    netty权威指南 第二版 李林锋pdf

    这本书针对Netty5版本的源码进行了详尽的解读,旨在帮助开发者深入理解Netty的设计原理和实现机制。 1. **Netty的基本概念**:Netty是一个基于NIO(非阻塞I/O)的Java框架,它简化了网络编程,提供了线程模型、缓冲...

    精通并发与Netty(共92讲)百度网盘地址.txt

    - **RPC框架**:如构建自己的RPC框架,实现远程过程调用。 #### 3.4 高级特性 - **自定义Codec**:允许开发者定义自己的编解码器,实现更复杂的协议处理。 - **ChannelPipeline**:一系列Handler的容器,按照顺序...

    Java Netty-入门教程.pdf

    其中值得一提的是,Netty 的主要贡献者之一也是 Apache MINA(另一个流行的网络应用框架)的重要贡献者。这表明 Netty 在设计之初就吸收了大量来自 MINA 的经验教训,并在此基础上进行了创新和发展。 ##### 1.3 ...

    dubbo源码分析系列

    Dubbo是一个流行的高性能Java RPC框架,由阿里巴巴开源并广泛应用于分布式服务的架构设计中。它的架构设计及源码风格分析对于理解开源项目和掌握分布式服务框架的原理和实现细节具有重要的指导作用。 在分布式服务...

    Java思维导图xmind文件+导出图片

    分布式架构 漫谈分布式架构 初识分布式架构与意义 如何把应用从单机扩展到分布式 大型分布式架构演进过程 分布式架构设计 主流架构模型-SOA架构和微服务架构 领域驱动设计及业务驱动... 手写实现多协议RPC框架

    Dubbo源码解读与实战_文档.zip

    Dubbo是阿里巴巴开源的一款Java RPC框架,它提供了服务注册与发现、远程调用、负载均衡、容错、监控等关键功能,广泛应用于微服务架构中。首先,我们需要理解RPC(Remote Procedure Call)的概念,它是进程间通信的...

    dubbo2.0源码解读

    《Dubbo 2.0 源码解读》是一份深度剖析Dubbo核心机制和技术细节的资料,旨在帮助开发者深入理解这一著名Java微服务框架的工作原理。以下是对这份资料主要知识点的详细阐述: 1. **源码阅读路径**:源码阅读是提升...

    spark 源码解读迷你书

    Spark作为一个分布式计算框架,其核心概念包括弹性分布式数据集(Resilient Distributed Datasets, RDD)、任务调度、内存管理以及容错机制。这本书可能会深入讲解这些概念的实现细节。 1. **RDD**:RDD是Spark的...

    dubbo最新全面深度解读

    Dubbo是基于Java的RPC框架,旨在提高服务的透明性和可扩展性。它提供了一种服务化的解决方案,使开发者可以像调用本地方法一样调用远程服务,降低了分布式系统的复杂性。 2. **框架结构** Dubbo的核心组件包括...

    xxl-job宣讲材料

    XXL-JOB是一个开源的分布式任务调度平台,其设计目标在于快速开发、简单学习、轻量级以及易于扩展。在2020年的培训材料中,它被介绍为已广泛应用于多家公司的线上产品线,提供了丰富的功能和服务。 **1. 架构和功能...

    dubbo源码学习.zip--------------

    Dubbox是Dubbo的一个社区分支,包含了更多的扩展功能,如支持RESTful API等。对这个版本的源码学习,有助于开发者更好地适应各种复杂的业务场景。 "服务端线程池溢出,异常客户端无响应-代码解读"这部分,将深入...

    dubbo源码分析pdf.zip

    Dubbo是一个高性能、轻量级的服务治理框架,广泛应用于微服务架构中,以实现服务的发布、发现、调用等核心功能。这套书籍包含了对Dubbo核心组件和原理的详细剖析,旨在提升开发者的源码阅读和系统设计能力。 首先,...

    Java开发面试必备知识技能总结视频合集

    - **Netty线程模型**:深入研究Netty框架的线程模型,包括`NioEventLoopGroup`、`Selector`等组件的作用。 - **源码解析**:通过对Netty源码的详细解析,掌握其高效并发处理能力的关键技术点。 #### 7. 架构师不得...

Global site tag (gtag.js) - Google Analytics