`
y806839048
  • 浏览: 1120540 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

Dubbo服务调用过程 (编解码 + 服务提供方返回调用结果)

阅读更多

需求:

dubbo 的 消息体是怎么样的? 如何去定义消息体的。  另外我们都知道,当多个消费者对同一个dubbo 的provider 进行消费的时候,Dubbo 会将响应对象派发到线程池上,dubbo 是如何将响应对象从线程池线程传递到用户线程上。本文基于2.7.0 以下版本,2.7.0 版本,dubbo 协议的通信完全基于 completetableFuture 的方式调用。

(1):

首先预备一下知识。 Dubbo 的服务调用,再对dubbo 扩展和理解的时候,我们需要谨记下面的调用连过程,这样我们就知道扩展哪一层的SPI,最合适。层级是非常明确的。

 
 

(2).

我们来看看Dubbo 的消息头,对于编解码来说,我们需要知道dubbo 的消息头的组成有哪些,各个组成的部分有什么样的作用。下面的图来自与dubbo 的官网,也就是说,对于不同的协议,dubbo 的消息头的组成是不一样的,分别有 network (http 协议), tcp 协议,dubbo 协议等组成。 咱们来看看dubbo 的协议,dubbo 的协议通信层是netty,再进入到通信层之前,我们需要对dubbo 的头信息进行编码,那看看dubbo 的头信息有哪些。

 
 

那我们来具体的看看dubbo 的协议的消息头的组成部分。128位,一共16 个字节,每个自己都记录相关的元数据信息

0 ~ 7 : dubbo 魔数((short) 0xdabb) 高位,也就是 (short) 0xda。这个魔数是做什么使用的,是一种标识吗?其实在编码和解码的过程中会对比 这个魔数。 就像SUN 公司规定每个 class 文件都必须以一个 word(4 个字节 ) 来开始,这个数字就称为魔数。

8 ~ 15: dubbo 魔数((short) 0xdabb) 低位,也就是 (short) 0xbb。我有同样的疑惑,dubbo 高低位魔数是做什么使用的。

16: 数据包类型, 0 - Response, 1 - Request

17: 调用方式,  仅在第16位被设为1的情况下有效,0 - 单向调用,1 - 双向调用,  这里我不太清楚何为双向调用? 我的想法是 dubbo 提供的callback 功能, 也就是说consumer 调用 provider, provider 有response 之后,会回调consumer 的相关函数。

18: 事件标识, 0 - 当前数据包是请求或响应包,1 - 当前数据包是心跳包

19 ~ 23: 序列化器编号, 比如 2 是 Hessian2Serialization

24 ~ 31: 状态, 20 - OK, 30 - CLIENT_TIMEOUT, 31 - SERVER_TIMEOUT

32 ~ 95: 请求编号, 共8字节,运行时生成

96 ~ 127: 消息体长度, 运行时计算

 
 

下面来看看编码: encode

 

@Override publicvoidencode(Channel channel, ChannelBuffer buffer, Object msg)throwsIOException{

        if (msg instanceof Request) {

            // 对 Request 对象进行编码            encodeRequest(channel, buffer, (Request) msg);

        } else if (msg instanceof Response) {

            // 对 Response 对象进行编码,后面分析            encodeResponse(channel, buffer, (Response) msg);

        } else {

            super.encode(channel, buffer, msg);

        }

    }

然后进入 encodeRequest 方法,我们看看这个方法的过程:

// 创建消息头字节数组,长度为 16 byte[] header = new byte[HEADER_LENGTH];

// 设置魔数 Bytes.short2bytes(MAGIC, header);

// 设置数据包类型(Request/Response)和序列化器编号 header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

// 设置通信方式(单向/双向) if (req.isTwoWay()) {

            header[2] |= FLAG_TWOWAY;

        }

// 设置事件标识 if (req.isEvent()) {

            header[2] |= FLAG_EVENT;

        }

// 获取 buffer 当前的写位置 int savedWriteIndex = buffer.writerIndex();

// 更新 writerIndex,为消息头预留 16 个字节的空间 buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);

        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);

        // 创建序列化器,比如 Hessian2ObjectOutput        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);

        if (req.isEvent()) {

            // 对事件数据进行序列化操作            encodeEventData(channel, out, req.getData());

        } else {

            // 对请求数据进行序列化操作            encodeRequestData(channel, out, req.getData(), req.getVersion());

        }

// 获取写入的字节数,也就是消息体长度 int len = bos.writtenBytes();

        checkPayload(channel, len);

        // 将消息体长度写入到消息头中        Bytes.int2bytes(len, header, 12);

请大家好好补补对byte 数组进行运算的知识。 方可理解dubbo 源码里面到底是如何对128 位,16个字节进行运算的。并要清楚了解 byte 的运算中 &,|, >>, <<等运算符。

以上就是请求对象的编码过程,该过程首先会通过位运算将消息头写入到 header 数组中。然后对 Request 对象的 data 字段执行序列化操作,序列化后的数据最终会存储到 ChannelBuffer 中。序列化操作执行完后,可得到数据序列化后的长度 len,紧接着将 len 写入到 header 指定位置处。最后再将消息头字节数组 header 写入到 ChannelBuffer 中,整个编码过程就结束了。对data 的序列化过程如下,请看encodeRequestData方法,其大致过程如下:

// 依次序列化 dubbo version、path、version,  序列化调用方法名

out.writeUTF(version);

out.writeUTF(inv.getAttachment(Constants.PATH_KEY));

out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));

out.writeUTF(inv.getMethodName());

// 将参数类型转换为字符串,并进行序列化

out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));

// 对运行时参数进行序列化 out.writeObject(encodeInvocationArgument(channel, inv, i));

// 序列化 attachments out.writeObject(inv.getAttachments());

下面再来看看 decode 的过程

默认情况下 Dubbo 使用 Netty 作为底层的通信框架。Netty 检测到有数据入站后,首先会通过解码器对数据进行解码

@Override protectedObjectdecode(Channel channel, ChannelBuffer buffer,intreadable,byte[] header)throwsIOException{

        // 检查魔数是否相等

   if (readable > 0 && header[0] != MAGIC_HIGH

                || readable > 1 && header[1] != MAGIC_LOW) {

            int length = header.length;

            if (header.length < readable) {

                header = Bytes.copyOf(header, readable);

                buffer.readBytes(header, length, readable - length);

            }

            for (int i = 1; i < header.length - 1; i++) {

                if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {

                    buffer.readerIndex(buffer.readerIndex() - header.length + i);

                    header = Bytes.copyOf(header, i);

                    break;

                }

            }

            // 通过 telnet 命令行发送的数据包不包含消息头,所以这里            // 调用 TelnetCodec 的 decode 方法对数据包进行解码            return super.decode(channel, buffer, readable, header);

        }

        // 检测可读数据量是否少于消息头长度,若小于则立即返回 DecodeResult.NEED_MORE_INPUT        if (readable < HEADER_LENGTH) {

            return DecodeResult.NEED_MORE_INPUT;

        }

        // 从消息头中获取消息体长度        int len = Bytes.bytes2int(header, 12);

        // 检测消息体长度是否超出限制,超出则抛出异常        checkPayload(channel, len);

        int tt = len + HEADER_LENGTH;

        // 检测可读的字节数是否小于实际的字节数        if (readable < tt) {

            return DecodeResult.NEED_MORE_INPUT;

        }

        ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

        try {

            // 继续进行解码工作            return decodeBody(channel, is, header);

        } finally {

            if (is.available() > 0) {

                try {

                    StreamUtils.skipUnusedStream(is);

                } catch (IOException e) {

                    logger.warn(e.getMessage(), e);

                }

            }

        }

    }

上面方法通过检测消息头中的魔数是否与规定的魔数相等,提前拦截掉非常规数据包,比如通过 telnet 命令行发出的数据包。接着再对消息体长度,以及可读字节数进行检测。最后调用 decodeBody 方法进行后续的解码工作,ExchangeCodec 中实现了 decodeBody 方法,但因其子类 DubboCodec 覆写了该方法,所以在运行时 DubboCodec 中的 decodeBody 方法会被调用。

在DecodeableRpcInvocation class 中,会将dubbo version, path, method name, attachment 解码出来,最终得到一个具有完整调用信息的 DecodeableRpcInvocation 对象。

下面来看看  服务提供方返回调用结果

服务提供方调用指定服务后,会将调用结果封装到 Response 对象中,并将该对象返回给服务消费方。服务提供方也是通过 NettyChannel 的 send 方法将 Response 对象返回。当响应数据解码完成之后,dubbo 会将响应对象派发到线程池上,但是线程池中的线程并非用户的调用线程,所以要想办法将响应对象从线程池线程传递到用户线程上。

那么我们首先来看看用户线程现在在干什么?  用户线程在发送完请求后,就调用defaultFuture 的get 方法等待响应对象的到来,当响应对象到来之后,用户线程会被唤醒,并通过调用编号获取属于自己的响应对象。

doReceive 方法,请大家自行查看DefaultFuture 方法,需要去理解下, ReentrantLock,还有 lock.newcondition, 的用法。 并且它与synchronized, wait,notify 的区别。

那么调用编号是怎么回事呢?

一般情况下,服务消费方会并发调用多个服务,每个用户线程发送请求后,会调用不同 DefaultFuture 对象的 get 方法进行等待。 一段时间后,服务消费方的线程池会收到多个响应对象。这个时候要考虑一个问题,如何将每个响应对象传递给相应的 DefaultFuture 对象,且不出错。答案是通过调用编号。DefaultFuture 被创建时,会要求传入一个 Request 对象。此时 DefaultFuture 可从 Request 对象中获取调用编号,并将 <调用编号, DefaultFuture 对象> 映射关系存入到静态 Map 中,即 FUTURES。线程池中的线程在收到 Response 对象后,会根据 Response 对象中的调用编号到 FUTURES 集合中取出相应的 DefaultFuture 对象,然后再将 Response 对象设置到 DefaultFuture 对象中。最后再唤醒用户线程,这样用户线程即可从 DefaultFuture 对象中获取调用结果了。整个过程大致如下图:

 

 
 

注意: 本文的大部分内容来自于dubbo 官网对 dubbo 源码的分析,甚是奇妙,我也想编辑下来,最为以后的代码设计的参照。 

问个问题,请问 synchronized, ReentrantLock, ReadWriteLock , 之间的区别?

信号之间的通知是如何的,比如 notify, wait, ReentrantLock 的 condition 的使用。 谢谢大家时间,希望有所得!

 

参考:

https://www.jianshu.com/p/99a0bc93eeb6

 

 

分享到:
评论

相关推荐

    服务治理中间件 dubbo原理解析

    在Dubbo中,服务调用分为服务消费方发起请求和服务提供方接收调用请求两个步骤。这一过程涉及到通信层、编码解码模块、传输协议等组件的协同工作。 ### 通信层 通信层是Dubbo中负责网络通信的部分,它包括提供方...

    dubbo源码解析2

    一个典型的Dubbo HelloWorld例子通常包括服务提供者和服务消费者两个部分,通过简单的示例程序演示了如何注册服务、发布服务和调用服务。 #### 五、源文件概述 Dubbo的源代码主要由以下几个部分组成: 1. **core*...

    Dubbo入门到精通架构高级课程

    - **服务治理**:利用Dubbo提供的服务治理功能,实现服务的自动化运维。 - **异构系统集成**:不同技术栈的应用之间可以通过Dubbo进行通信,实现系统间的集成。 - **高性能服务**:对于需要高并发处理能力的场景,...

    dubbo——rpc的原理和netty1

    9. **解码并返回结果**:client stub接收并解码消息,将最终结果传递给服务消费方。 RPC框架的主要任务就是隐藏这些复杂的网络通信细节,使得开发者可以专注于业务逻辑的实现,而无需关心底层通信的具体操作。 ...

    Dubbo面试题(2022最新版)

    Dubbo提供了一种机制来检测服务提供者的健康状况,当检测到服务提供者长时间未响应时,可以将其从服务列表中移除,避免无效调用。 8. **同一个服务多个注册的情况下可以直连某一个服务吗?** 在某些特殊情况下,...

    dubbo源码解析2.01.pdf

    - **Provider**:服务提供方,即暴露服务的一方。 - **Consumer**:服务消费方,即调用远程服务的一方。 - **Registry**:注册中心,用于服务的发现和管理。 - **Monitor**:监控中心,用于服务的监控和统计。 ...

    Darks Codec是一个轻量级的通信消息协议编解码框架.rar

    Dubbo作为Java语言的RPC框架,优势之一在于屏蔽了调用细节,能够像调用本地方法一样调用远程服务,不必为数据格式抓耳饶腮。正是这一特性,也引入...(认为类的路径属于上下文信息)接下来揭秘Dubbo的编码解码过程。

    dubbo源码解析

    通过这种方式,**Dubbo**能够动态地加载不同的服务实现,这为系统的灵活性和扩展性提供了极大的便利。 #### 三、基于SPI思想的Dubbo内核实现 **Dubbo**进一步扩展了SPI的思想,提出了更加灵活和强大的扩展机制。在...

    分布式服务框架 dubbox

    - 远程调用过程包括客户端发起请求、服务端处理请求并返回结果、网络传输等环节,涉及请求编解码、连接管理等技术。 3. **服务治理**: - 负载均衡:Dubbo 提供多种负载均衡策略,如 Random、RoundRobin、...

    秒杀项目之服务调用&分布式session对应的完整项目

    秒杀项目是一个典型的高并发、低延迟的电商应用场景,它涉及到多个关键技术,包括服务调用和分布式Session管理。在这个项目中,我们主要关注如何在微服务架构下有效地处理这些问题。 一、服务调用 1. **API ...

    深入浅析Netty 在 Dubbo 中是如何应用的

    在Dubbo中,Consumer是服务消费方,它需要通过网络与提供者进行通信。在Dubbo的实现中,Consumer使用Netty作为网络通信的底层实现。当Consumer启动时,它会创建一个Netty的client对象,该对象负责与提供者进行通信。...

    35-二进制类RPC协议:还是叫NBA吧,总说全称多费劲1

    调用过程包括编码、序列化、网络传输、解码和任务处理。Dubbo默认使用Hessian2作为RPC协议,它将远程调用序列化为二进制,支持压缩,简化了协议约定的复杂性。 总结来说,二进制类RPC协议如Dubbo的Hessian2在数据...

    JAVA上百实例源码以及开源项目源代码

    5个目标文件,演示Address EJB的实现 ,创建一个EJB测试客户端,得到名字上下文,查询jndi名,通过强制转型得到Home接口,getInitialContext()函数返回一个经过初始化的上下文,用client的getHome()函数调用Home接口...

Global site tag (gtag.js) - Google Analytics