`
manzhizhen
  • 浏览: 293107 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论
阅读更多

         刚开始使用Dubbo的人,可能对Dubbo的第一印象就是它是一个RPC框架,当然,所有的分布式框架都少不了相互通信的过程,何况Dubbo的任务就是帮助分布式业务系统完成服务的通讯、负载、注册、发现和监控等功能。不得不承认,RPC是Dubbo提供服务的核心流程,为了兼容多种使用场景,Dubbo显然需要提供多种RPC方式(协议).

         开发一个简单的RPC框架,重点需要考虑的是两点,即编解码方式和底层通讯协议的选型,编解码方式指的是需要传输的数据在调用方将以什么组织形式拆解成字节流并在服务提供方以什么形式解析出来。编解码方式的设计需要考虑到后期的版本升级,所以很多RPC协议在设计时都会带上当前协议的版本信息。而底层通讯协议的选型都大同小异,一般都是TCP(当然也可以选择建立于TCP之上更高级的协议,比如Avro、Thrift和HTTP等),在Java语言中就是指套接字Socket,当然,在Netty出现后,很少RPC框架会直接以自己写Socket作为默认实现的通讯方式,但通常也会自己实现一个aio、nio或bio版本给那些“不方便”依赖Netty库的应用系统来使用。

         在Dubbo的源码中,有一个单独模块dubbo-rpc,其中,最重要的应该是Protocol和Invoker两个接口,代表着协议(编解码方式)和调用过程(通讯方式)。Invoker接口继承于Node接口,Node接口规范了Dubbo体系中各组件之间通讯的基本要素:

 

public interface Node {

    // 协议数据载体

    URL getUrl();

    // 状态监测,当前是否可用

    boolean isAvailable();

    // 销毁方法

    void destroy();

}

 

而Invoker接口则更简单:

 

public interface Invoker<T> extends Node {

    // 获取调用的接口

    Class<T> getInterface();

    // 调用过程

    Result invoke(Invocation invocation) throws RpcException;

}

 

从源代码dubbo-rpc下的子模块来看,我们能知道目前Dubbo支持dubbo(默认)、hessian、http、injvm(本地调用)、memcached、redis、rmi、thrift和webservice等9中RPC方式。根据Dubbo的官方手册,injvm是一个伪协议,它不开启端口,不发起远程调用,只在JVM内直接关联,但执行Dubbo的Filter链,所以这一般用于线下测试。可是为啥Memcached和Redis也能用作RPC?这里是指Dubbo端作为服务消费方,而Memcached或Redis作为服务提供方。

       我们这里重点看调用方(服务消费方)部分的代码。

       虽然Invoker接口中定义的是invoke方法,invoker方法的实现理应RPC的整个操作,但为了状态检查、上下文切换和准备、异常捕获等,抽象类AbstractInvoker中定义了一个doInvoker抽象方法来支持不同的RPC方式所应做的纯粹而具体的RPC过程,我们直接看AbstractInvoker中的invoker实现:

 

public Result invoke(Invocation inv) throws RpcException {

    if(destroyed) {

        throw new RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost()

                                        + " use dubbo version " + Version.getVersion()

                                        + " is DESTROYED, can not be invoked any more!");

    }

    RpcInvocation invocation = (RpcInvocation) inv;

    invocation.setInvoker(this);

    // 填充接口参数

    if (attachment != null && attachment.size() > 0) {

       invocation.addAttachmentsIfAbsent(attachment);

    }

    // 填充业务系统需要透传的参数

    Map<String, String> context = RpcContext.getContext().getAttachments();

    if (context != null) {

       invocation.addAttachmentsIfAbsent(context);

    }

    // 默认是同步调用,但也支持异步

    if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)){

       invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());

    }

 

    /**

     * 幂等操作:异步操作默认添加invocation id,它是一个自增的AtomicLong

     * 可以在RpcContext中设置attachments{@link Constants.ASYNC_KEY}值来设置是同步还是异步

     */

    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

 

    try {

       

        // 执行具体的RPC操作

        return doInvoke(invocation);

 

    // 异常处理的代码略去

    } catch (InvocationTargetException e) {

    } catch (RpcException e) {

    } catch (Throwable e) {

    }

}

       可以看出主要是用来做参数填充(包括方法参数、业务参数和Dubbo内定的参数),然后就直接调用具体的doInvoker方法了。Dubbo所支持的RPC协议都需继承AbstractInvoker类。

         我们先来看看Dubbo中默认的dubbo协议的实现,即DubboInvoker,直接看其doInvoker的实现:

 

@Override

protected Result doInvoke(final Invocation invocation) throws Throwable {

    RpcInvocation inv = (RpcInvocation) invocation;

    final String methodName = RpcUtils.getMethodName(invocation);

    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());

    inv.setAttachment(Constants.VERSION_KEY, version);

 

    // 确定此次调用该使用哪个client(一个client代表一个connection

    ExchangeClient currentClient;

    if (clients.length == 1) {

        currentClient = clients[0];

    } else {

        // 如果是多个client,则使用简单的轮询方式来决定

        currentClient = clients[index.getAndIncrement() % clients.length];

    }

    try {

        // 是否异步调用

        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);

        // 是否单向调用,注意,单向调用和异步调用相比不同,单向调用不等待被调用方的应答就直接返回

        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);

        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);

        if (isOneway) {

           boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);

            // 单向调用只负责发送消息,不等待服务端应答,所以没有返回值

            currentClient.send(inv, isSent);

            RpcContext.getContext().setFuture(null);

            return new RpcResult();

        } else if (isAsync) {

           ResponseFuture future = currentClient.request(inv, timeout);

            // 异步调用先保存future,便于后期处理

            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));

            return new RpcResult();

        } else {

            // 默认的同步调用

           RpcContext.getContext().setFuture(null);

            return (Result) currentClient.request(inv, timeout).get();

        }

    } catch (TimeoutException e) {

        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);

    } catch (RemotingException e) {

        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);

    }

}

 

从上面的代码可以看出dubbo协议中分为三种调用方式同步(默认)、异步和OneWay,同步好理解,就是阻塞等拿到被调用方的结果再返回,异步也好理解,不等待被调用者的处理结果就直接返回,但需要等到被调用者接收到异步请求的应答,OneWay(单向调用)在很多MQRPC框架中都有出现,即调用方只负责调用一次,不管被调用方是否接收到该请求,更不会去理会被调用方的任何应答,OneWay一般只会在无需保证调用结果的时候使用。在《Dubbo源代码实现二》中我们已经提到过,负载的策略决定此次服务调用是请求哪个服务提供方(也就是哪台服务器),当确定了调用哪个服务提供房后,其实也就是确定了使用哪个Invoker,这里指DubboInvoker实例。RPC框架为了提高服务的吞吐量,通常服务消费方和服务提供方的服务器之间会建立多个连接,如上面代码中的clients所以在确定使用哪个DubboInvoker实例后,会从中选择一个(如上面代码的取模轮询)client来进行RPC调用。从上面给出的代码可以看出,同步和异步的区别只是同步直接在currentClient.request返回的Future对象上进行了get操作来直接等待结果的返回。

       Dubbo中的Client实例都是ExchangeClient的实现,而每个Client实例都会绑定一个Channel的实例,来处理通讯的具体细节,而所有的Channel实例都实现了ExchangeChannel接口。这里我们先来看看HeaderExchangeChannel#request的实现:

 

public ResponseFuture request(Object request, int timeout) throws RemotingException {

    if (closed) {

        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");

    }

    // create request.

    Request req = new Request();

    req.setVersion("2.0.0");

    // 相比OneWay,同步和异步调用属于TwoWay

    req.setTwoWay(true);

    req.setData(request);

    // 创建DefaultFuture,用于将请求和应答关联起来

    DefaultFuture future = new DefaultFuture(channel, req, timeout);

    try{

        // 直接发送调用请求

        channel.send(req);

    }catch (RemotingException e) {

        future.cancel();

        throw e;

    }

 

    // future返回,用于拿到服务调用的返回值

    return future;

}

 

 

从上面代码可以看出,在直接调用channel.send发送数据时,先创建了一个DefaultFuture,它主要用于关联请求和应答,DefaultFuture将稍后分析。后面,直接调用了Channel的send方法,dubbo协议底层直接使用了Netty框架,所以这里指的是NettyChannel见NettyChannel#send的代码:

     

public void send(Object message, boolean sent) throws RemotingException {

    super.send(message, sent);

   

    boolean success = true;

    int timeout = 0;

    try {

        ChannelFuture future = channel.write(message);

        /**

         * sent值只是为了性能调优,默认是false

         */

        if (sent) {

            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);

            success = future.await(timeout);

        }

        Throwable cause = future.getCause();

        if (cause != null) {

            throw cause;

        }

    } catch (Throwable e) {

        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);

    }

   

    // senttrue且数据发送时间超过指定的超时时间时,由Dubbo负责抛出异常

    if(! success) {

        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()

                + "in timeout(" + timeout + "ms) limit");

    }

}

  

根据Dubbo用户手册中所说,sent参数的配置主要用于性能调优,这里当sent为true时(默认为false),将直接使用Netty的ChannelFuture来实现在给定的超时时间内等待,如果数据发送时间超过指定的超时时间,则抛出异常。之所以这样做,是为了将Netty框架处理时间控制在超时时间范围内,否则Dubbo框架在外围做的超时机制(DefaultFuture)将徒劳。

       接下来,我们看看Dubbo如何将请求和应答关联起来的,前面看到的HeaderExchangeChannel#request实现中,创建了一个Request对象,Request中有一个mId,用来唯一表示一个请求对象,而该mId在new的时候就会创建:

 

public Request() {

    mId = newId();

}

 

private static long newId() {

    // getAndIncrement()增长到MAX_VALUE时,再增长会变为MIN_VALUE,负数也可以做为ID

    return INVOKE_ID.getAndIncrement();

}

 

DefaultFuture靠的就是这个mId来关联请求和应答消息,DefaultFuture中有两个很重要的属性:FUTURSCHANNELS,它们类型都是ConcurrentHashMapkeymId,在新建DefaultFuture对象时会把mId和相关的Future和Channel塞到这两个Map中,还有一个ReentrantLock类型的lock属性,用于阻塞来等待应答,我们直接看DefaultFuture中获取结果和接收到应答后的实现:

 

public Object get(int timeout) throws RemotingException {

    if (timeout <= 0) {

        // 默认的超时时间是1

        timeout = Constants.DEFAULT_TIMEOUT;

    }

    if (! isDone()) {

        long start = System.currentTimeMillis();

        lock.lock();

        try {

            while (! isDone()) {

                // 最多等制定的超时时间

                done.await(timeout, TimeUnit.MILLISECONDS);

                // 如果已经有结果或者已经超过超时时间,则break

                if (isDone() || System.currentTimeMillis() - start > timeout) {

                    break;

                }

            }

        } catch (InterruptedException e) {

            throw new RuntimeException(e);

        } finally {

            lock.unlock();

        }

        if (! isDone()) {

            throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));

        }

    }

    return returnFromResponse();

}

 

public static void received(Channel channel, Response response) {

    try {

        // 获取并移除该mIdFuture

        DefaultFuture future = FUTURES.remove(response.getId());

        if (future != null) {

            future.doReceived(response);

        } else {

            logger.warn("The timeout response finally returned at "

                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))

                        + ", response " + response

                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress()

                            + " -> " + channel.getRemoteAddress()));

        }

    } finally {

        // 获取并移除该mIdChannel

        CHANNELS.remove(response.getId());

    }

}

 

private void doReceived(Response res) {

    lock.lock();

    try {

        response = res;

        if (done != null) {

            // 释放信号

            done.signal();

        }

    } finally {

        lock.unlock();

    }

    if (callback != null) {

        invokeCallback(callback);

    }

}

 

由于received是静态方法,所以可以直接在Netty中注册的Handler中使用。

       那服务消费方和服务提供方的连接数量是由谁决定的呢?这个我们可以直接看DubboInvoker的创建方DubboProtocol中的代码:

 

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {

    // create rpc invoker.

    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);

    invokers.add(invoker);

    return invoker;

}

 

private ExchangeClient[] getClients(URL url){

    //是否共享连接

    boolean service_share_connect = false;

    /** 如果在dubbo:reference中没有设置{@link Constants.CONNECTIONS_KEY},则默认是共享连接  */

    int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);

    //如果connections不配置,则共享连接,否则每服务每连接

    if (connections == 0){

        service_share_connect = true;

        connections = 1;

    }

 

    // 一个client维护一个connection

    ExchangeClient[] clients = new ExchangeClient[connections];

    for (int i = 0; i < clients.length; i++) {

        if (service_share_connect){

            // 使用共享的TCP长连接

            clients[i] = getSharedClient(url);

        } else {

            // 单独为该URL建立TCP长连接

            clients[i] = initClient(url);

        }

    }

    return clients;

}

 

 

从getClients的代码可以看出,服务消费方和服务提供方的服务器之间的连接数量是可以配置的,服务消费方和服务提供方都可以配置,当然服务消费方优先级更高,例如:

服务消费方A:<dubbo:reference   interface="com.foo.BarServiceA"   /> 

服务消费方A:<dubbo:reference   interface="com.foo.BarServiceB"  connections="5"  /> 

服务提供方B:<dubbo:service  interface="com.foo.BarServiceA"  /> 

服务提供方B:<dubbo:service  interface="com.foo.BarServiceB"  connections="10"  /> 

对于服务BarServiceA,由于消费方和提供方都没有配置connections,所以,所有类似于BarServiceA这样没有配置connections的服务,消费方服务器和提供方服务器将公用一个TCP长连接,即上面代码说提到的共享连接。而对于服务BarServiceA,因为配置了connections属性,消费方A和提供方B之间将单独建立5个(消费方配置优先级高于服务端配置,所以这里是5而不是10TCP长连接来专门给服务BarServiceA使用,以提高吞吐量和性能,至于每次调用应该如何从这5个连接中选,前面已经提到,这里不再阐述。所以,为了提高某个服务的吞吐量,可以试着配置connections属性,当然,前提是服务提供方性能过剩。

<!--StartFragment--> <!--EndFragment-->

         对于异步调用,Dubbo的默认调用过滤链中有一个FutureFilter,当我们在dubbo:reference中配置了async="true"后,将会执行FutureFilter中的异步逻辑,这里不再阐述,感兴趣的同学可以去阅读FutureFilter#asyncCallback部分的代码。

0
0
分享到:
评论
1 楼 miss_fish 2017-06-11  
这么详细的文章,居然没有人评价。。。 谢谢~~~

相关推荐

    Dubbo源代码(2.8.4)

    Dubbo提供了丰富的API供开发者使用,包括服务提供者和服务消费者API,以及配置管理API等。 8. **社区支持**: Dubbo拥有活跃的社区,开发者可以在其中找到大量的问题解答、使用案例和最佳实践。 总的来说,Dubbo...

    Dubbo源代码(2.5.4)

    【Dubbo源代码(2.5.4)】是一份重要的开源项目资源,它包含了Dubbo框架在2.5.4版本的完整源代码。Dubbo是中国阿里巴巴公司贡献的高性能、轻量级的服务治理框架,它专注于服务调用、监控和服务治理。这个版本的源...

    dubbo完整案例,包含服务方和消费方完整代码

    在本案例中,我们看到的是一个完整的Dubbo项目,包括服务提供者(Service Provider)和服务消费者(Service Consumer)的源代码,结合了Zookeeper作为注册中心以及Spring和MyBatis(SSM)的集成。这个案例是为了解决...

    dubbo2.8.4源代码

    服务提供方和服务消费方通过接口定义进行交互,实现了服务间的解耦。 2. **服务注册与发现**:Dubbo支持多种注册中心,如Zookeeper、Eureka等,服务提供者启动时会向注册中心注册自己的服务,消费者则从注册中心...

    dubbo_rpc.zip

    Dubbo作为RPC框架,主要扮演服务提供者和服务消费者的角色,通过中间的注册中心实现服务的发布、查找和调用。 2. **Dubbo核心组件** - **服务提供者(Provider)**:暴露服务的服务提供方。 - **服务消费者...

    微服务 spring dubbo项目:dubbo rpc;druid数-spring-dubbo-service.zip

    Dubbo的RPC机制使得服务消费者和服务提供者可以透明地进行交互,简化了分布式系统的设计。 5. **Druid数据源**:Druid是一个强大的数据库连接池组件,提供了监控、SQL解析、连接有效性检查、池预热、慢查询日志等...

    Go-rpcx是一个类似Dubbo和Motan的分布式的RPC服务框架

    了解了Go-rpcx的基本概念后,你可以通过阅读`rpcx-master`中的源代码和示例来深入学习。这将帮助你掌握如何配置、使用和扩展Go-rpcx,以适应你的特定需求。 总的来说,Go-rpcx是Golang开发者在构建分布式系统时的一...

    eclipse 中实现dubbo provider、customer源代码及zookeeper、dubbo-admin压缩包

    在本压缩包中,你将找到关于如何在Eclipse环境中实现Dubbo服务提供者(Provider)和服务消费者(Consumer)的源代码,以及如何利用Zookeeper作为注册中心和Dubbo管理员控制台(dubbo-admin)的相关资源。这个压缩包...

    SSM+dubbo项目源代码

    SSM+dubbo项目源代码是基于Spring、SpringMVC、MyBatis以及Dubbo构建的微服务架构。这个项目展示了如何将这些技术有效地整合在一起,以实现高效、可扩展的分布式系统。 首先,Spring框架是Java领域最广泛使用的依赖...

    阿里巴巴dubbo-2.5.4源代码

    在源代码中,`Provider`接口定义了服务提供者的基本行为,而`Consumer`接口则定义了服务消费者的接口。两者通过`Registry`注册中心进行连接,实现服务的发布、订阅和调用。 2. **服务注册与发现** `Registry`模块...

    Dubbo源代码分析之远程调用过程(2.5.4开发版)

    该文档分析了 Dubbo 框架中 RPC 调用的整个流程,并基于源代码按照执行 时序进行说明,源码版本为2.5.4开发版。 涉及的关键点包括:Invocation、Invoker、Directory、路由、负载均衡、集群容错、过滤器以及监控模块...

    Dubbo服务框架 v2.7.22.zip

    7. **API与配置中心**:Dubbo提供了一套完整的API,方便开发者在代码中进行服务的注册、引用、调用等操作。同时,支持通过配置中心动态修改服务配置,实现热更新。 在压缩包中的"dubbo-dubbo-2.7.22"文件夹里,你...

    dubbo调用示例

    Dubbo调用示例是一个关于如何使用Dubbo框架构建服务提供者(provider)和服务消费者(custmer App)的教程。Dubbo是阿里巴巴开源的一款高性能、轻量级的Java RPC框架,它提供了服务注册、服务发现、负载均衡等功能,...

    dubbo实例代码

    而“src”目录则是源代码存放的地方,可能包含服务提供者和服务消费者的实现代码,以及相应的配置文件,如dubbo配置、Zookeeper连接配置等。通过这些代码,我们可以进一步了解如何在实际项目中集成和使用Dubbo和...

    alibaba-dubbo-dubbo-2.5.7-0-ge2d63ad.tar.gz

    1. **dubbo-rpc**:这是Dubbo的核心模块,实现了基于接口的远程调用,支持多种协议,如Hessian2、HTTP、RMI等,使得服务提供者和服务消费者之间可以透明通信。 2. **dubbo-remoting**:负责网络通信,包括客户端和...

    dubbo学习后自己的源代码

    本项目是基于Dubbo的学习实践,包含了作者自己编写的源代码,分为三个部分:`dubbo-demo-api`、`dubbo-demo-provider_api`和`dubbo-demo-consumer_api`,分别代表了服务接口定义、服务提供者和服务消费者。...

    【Dubbo+Zookeeper的RPC分布式集群服务系统】服务端接口.zip

    【Dubbo+Zookeeper的RPC分布式集群服务系统】服务端接口.zip这个压缩包包含了基于Dubbo和Zookeeper构建的RPC分布式服务系统的源代码。Dubbo是阿里巴巴开源的一个高性能、轻量级的服务框架,它提供了服务治理、负载...

    Dubbo源码(注释版)

    此压缩包提供的" Dubbo源码(注释版)"是针对Dubbo 2.7.3版本的源代码,包含了详细的注释,方便开发者深入理解其内部机制。 一、服务注册与发现 Dubbo的核心功能之一是服务注册与发现。服务提供者在启动时会将自身...

    dubbo+netty打造高性能的RPC

    在这样的组合下,dubbo作为服务提供者和服务消费者之间的桥梁,利用Zookeeper作为注册中心,实现服务的自动发现和负载均衡。同时,通过Netty的高性能网络编程能力,可以优化RPC通信过程,提升服务调用的效率。http和...

    dubbo-3.1.4源码包

    在【压缩包子文件的文件名称列表】"apache-dubbo-3.1.4-src"中,你可以找到dubbo的主要模块和源代码,包括: 1. `dubbo-common`:包含了dubbo的核心通用模块,如配置管理、日志处理、序列化等。 2. `dubbo-remoting...

Global site tag (gtag.js) - Google Analytics