`
45808916
  • 浏览: 5785 次
文章分类
社区版块
存档分类
最新评论

从Consumer分析Dubbo调用链

 
阅读更多

入手

继上一篇不成熟的源码分析经历之后,为了搞清楚Consumer是如何与Provider通信的,于是又一言不合翻看起了源码。好,进入正题,依旧从RegistryDirectory这个核心类入手:

    // 这里的入参urls是所有可用的provider的url
    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
        Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
        if(urls == null || urls.size() == 0){
            return newUrlInvokerMap;
        }
        Set<String> keys = new HashSet<String>();
        String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
        for (URL providerUrl : urls) {
            //如果reference端配置了protocol,则只选择匹配的protocol
            if (queryProtocols != null && queryProtocols.length() >0) {
                boolean accept = false;
                String[] acceptProtocols = queryProtocols.split(",");
                for (String acceptProtocol : acceptProtocols) {
                    if (providerUrl.getProtocol().equals(acceptProtocol)) {
                        accept = true;
                        break;
                    }
                }
                if (!accept) {
                    continue;
                }
            }
            if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
                continue;
            }
            if (! ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
                logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost() 
                        + ", supported protocol: "+ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
                continue;
            }
            URL url = mergeUrl(providerUrl);

            String key = url.toFullString(); // URL参数是排序的
            if (keys.contains(key)) { // 重复URL
                continue;
            }
            keys.add(key);
            // 缓存key为没有合并消费端参数的URL,不管消费端如何合并参数,如果服务端URL发生变化,则重新refer
            Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
            Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
            if (invoker == null) { // 缓存中没有,重新refer
                try {
                    boolean enabled = true;
                    if (url.hasParameter(Constants.DISABLED_KEY)) {
                        enabled = ! url.getParameter(Constants.DISABLED_KEY, false);
                    } else {
                        enabled = url.getParameter(Constants.ENABLED_KEY, true);
                    }
                    if (enabled) {
                        invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to refer invoker for interface:"+serviceType+",url:("+url+")" + t.getMessage(), t);
                }
                if (invoker != null) { // 将新的引用放入缓存
                    newUrlInvokerMap.put(key, invoker);
                }
            }else {
                newUrlInvokerMap.put(key, invoker);
            }
        }
        keys.clear();
        return newUrlInvokerMap;
    }

上面一段代码虽然很长,但是其实做的事情比较简单:就是把ProviderURL封装成对应的Invoker,我们只需要关注下面这一行:

invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);

这里的protocol的真正的实现是DubboProtocol。不过其上还被两层包装类给包装了(这里涉及到dubbo的扩展点自动包装),分别是ProtocolListenerWrapperProtocolFilterWrapper。其中ProtocolListenerWrapper添加了监听器的功能,而ProtocolFilterWrapper添加了过滤器功能。

调用链Filter的实现

ProtocolListenerWrapper比较简单,这边就不展开看源码了,ProtocolFilterWrapper还是有必要看一下的,主要就看buildInvokerChain方法:

    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (filters.size() > 0) {
            for (int i = filters.size() - 1; i >= 0; i --) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {

                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }

                    public URL getUrl() {
                        return invoker.getUrl();
                    }

                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }

                    public Result invoke(Invocation invocation) throws RpcException {
                        return filter.invoke(next, invocation);
                    }

                    public void destroy() {
                        invoker.destroy();
                    }

                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }
        return last;
    }

这里会按照Filter列表的顺序生成一个FilterChain,实际上是利用了Invoker来实现链式调用。这里每个Filter都会生成一个Invoker,然后该Invokerinvoke方法会调用自身Filter中的invoke方法。而最后一个Filter中的invoke方法的Invoker参数则是真正我们最后调用远程服务的Invoker,也就是DubboInvoker

DubboProtocol实现

通过两层包装类之后,就要调用到真正的Protocol实现类——DubboProtocol了,我们看看其中的refer方法干了些什么:

    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }

可以看到返回的invoker是一个DubboInvoker,并且在创建这个DubboInvoker时,我们注意到第三个参数是一个ExchangeClient[]类型的参数,从名字上很容易猜想出其中可能涉及了两端的通讯:

    private ExchangeClient[] getClients(URL url){
        //是否共享连接
        boolean service_share_connect = false;
        int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
        //如果connections不配置,则共享连接,否则每服务每连接
        if (connections == 0){
            service_share_connect = true;
            connections = 1;
        }

        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (service_share_connect){
                clients[i] = getSharedClient(url);
            } else {
                clients[i] = initClient(url);
            }
        }
        return clients;
    }

默认情况下都是共享连接的,也就是ConsumerProvider之间不管有多少个Service,都只共享一条连接:

    private ExchangeClient getSharedClient(URL url){
        String key = url.getAddress();
        ReferenceCountExchangeClient client = referenceClientMap.get(key);
        if ( client != null ){
            if ( !client.isClosed()){
                client.incrementAndGetCount();
                return client;
            } else {
                referenceClientMap.remove(key);
            }
        }
        ExchangeClient exchagneclient = initClient(url);

        client = new ReferenceCountExchangeClient(exchagneclient, ghostClientMap);
        referenceClientMap.put(key, client);
        ghostClientMap.remove(key);
        return client; 
    }

下面来看看客户端连接的初始化过程:

    private ExchangeClient initClient(URL url) {

        // client type setting.
        String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));

        String version = url.getParameter(Constants.DUBBO_VERSION_KEY);
        boolean compatible = (version != null && version.startsWith("1.0."));
        url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() && compatible ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
        //默认开启heartbeat
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

        // BIO存在严重性能问题,暂时不允许使用
        if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported client type: " + str + "," +
                    " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
        }

        ExchangeClient client ;
        try {
            //设置连接应该是lazy的 
            if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)){
                client = new LazyConnectExchangeClient(url ,requestHandler);
            } else {
                client = Exchangers.connect(url ,requestHandler);
            }
        } catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service(" + url
                    + "): " + e.getMessage(), e);
        }
        return client;
    }

默认不是lazy的,也就是说,在初始化时Consumer就直接与Provider建立了一条连接。

请求调用细节

而建立连接之后,后续就是利用这条连接来进行通讯了,让我们来跟踪一下客户端最终是如何通过这条连接来发送调用请求的。这里需要提一句,关于Dubbo中的Invoker,在客户端这一块,我个人认为应该要分成两类。一类的作用是将整个集群伪装成一个Invoker,这类Invoker的典型特征是都继承于AbstractClusterInvoker。而另外一类则是真正可调用的,也就是类似DubboInvoker这类,下面我们就来看看DubboInvoker中的核心方法:

    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                // 这里还有一个参数isSent,表明是否等待消息发出
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout) ;
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

这里可以很清晰地看到,远程调用分三种情况:
1. 不需要返回值的调用(所谓oneWay)
2. 异步(async)
3. 同步

对于第一种情况,客户端只管发请求就完了,不考虑返回结果。
对于第二种情况,客户端除了发请求,还需要将结果塞到一个ThreadLocal变量中,以便于客户端get返回值
对于第三种情况,客户端除了发请求,还会同步等待返回结果

看了源代码之后,我们再来看看官方文档上对于同步/异步调用的描述:

这里写图片描述

是不是很清晰?

<script type="text/javascript"> $(function () { $('pre.prettyprint code').each(function () { var lines = $(this).text().split('\n').length; var $numbering = $('<ul/>').addClass('pre-numbering').hide(); $(this).addClass('has-numbering').parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($('<li/>').text(i)); }; $numbering.fadeIn(1700); }); }); </script>
分享到:
评论

相关推荐

    Dubbo调用java接口程序

    在实现Dubbo调用Java接口的过程中,我们需要以下步骤: 1. **创建服务接口**:定义一个Java接口,如`HelloService`,包含你需要的方法。 ```java public interface HelloService { String sayHello(String name);...

    dubbo-Consumer,dubbo-provider

    在本项目中,`dubbo-consumer`模块可能包含了消费者端的代码,用于通过Dubbo框架来远程调用`dubbo-provider`提供的服务。它通常会定义服务接口,并在运行时通过Dubbo的注册中心找到服务提供者的地址,然后进行RPC...

    dubbo调用的例子

    Dubbo调用例子详解 在分布式系统开发中,Dubbo是一个广泛应用的服务框架,它由阿里巴巴开源,旨在提高服务治理的效率和灵活性。本教程将深入探讨一个基于Dubbo的调用例子,涵盖从接口定义、服务提供者到消费者的...

    maven-dubbo-consumer

    【maven-dubbo-consumer】项目是一个基于Maven构建的Dubbo消费者端示例,它展示了如何在Java应用中使用Dubbo框架来调用服务提供者的接口。Dubbo是一款高性能、轻量级的服务治理框架,它主要实现了服务的发布、发现、...

    dubbo-demo-consumer、dubbo-demo-provider、dubbo-simple-monitor

    本篇将详细讲解基于dubbo-demo-consumer、dubbo-demo-provider和dubbo-simple-monitor的实例服务,带你深入理解Dubbo的核心概念和操作流程。 首先,我们来看`dubbo-demo-consumer`,它是Dubbo服务的消费者。消费者...

    dubbo提供与调用

    Dubbo是中国阿里巴巴开源的一款高性能、轻量级的Java服务治理框架,它主要负责服务的发布、发现、调用以及监控。在"**dubbo提供与调用**"这个主题下,我们将深入探讨Dubbo的核心概念、配置文件以及Web配置。 1. **...

    Dubbo 实现远程调用

    1、Dubbo 远程调用实现 2、内带zookeeper-3.4.5消息服务 3、直接导入myeclipse运行:dubbo-server导入tomcat中运行 4、dubbo-client 运行测试类/dubbo-client/src/com/fengjx/main/Consumer.java

    Bean以及注解的方式实现dubbo调用(Maven)

    本文将详细介绍如何通过Bean和注解的方式来实现Dubbo调用,同时结合Maven项目管理工具进行构建。 首先,我们需要了解在Maven项目中配置Dubbo的基本步骤。在`pom.xml`文件中,添加Dubbo相关的依赖,例如: ```xml ...

    dubbo provider consumer zookeeper

    Dubbo是阿里巴巴开源的一款高性能、轻量级的Java RPC框架,它提供了面向接口的代理实现,使得服务调用变得简单。在本示例中,“dubbo provider consumer zookeeper”涉及了Dubbo的核心组件——服务提供者(Provider...

    springboot 调用dubbo实例

    本篇文章将深入探讨如何在Spring Boot应用中调用Dubbo服务实例,以及相关的配置与实践步骤。 首先,Spring Boot是由Pivotal团队提供的一个开源框架,它旨在简化Spring应用程序的初始搭建以及开发过程。通过内置的...

    dubbo-provider/dubbo-consumer

    服务消费者(dubbo-consumer)则是调用远程服务的模块,它负责从注册中心查找并消费服务提供者提供的服务。服务消费者在调用服务时,可以根据服务名、版本号等信息找到对应的服务实例,并发起RPC(Remote Procedure ...

    Dubbo+zookeeper调用java接口

    在压缩包文件"Dubbo调用java接口案例"中,我们可能看到以下内容: - **服务接口类(Interface.java)**:定义了供调用的接口方法。 - **服务实现类(Impl.java)**:实现了接口中的业务逻辑。 - **服务提供者配置...

    spring boot 集成 dubbo consumer provider 例子

    分别启动 Provider 和 Consumer 项目,Consumer 会自动从 Zookeeper 注册中心发现并消费 Provider 的服务。你可以通过调用 `HelloConsumer.consume()` 方法来测试服务调用。 8. **扩展与优化** Dubbo 还提供了...

    dubbo-consumer.7z

    综上所述,"dubbo-consumer.7z" 提供的项目是一个完整的 Dubbo 消费者端实现,开发者可以通过分析和运行这个项目,学习如何配置和使用 Dubbo 进行服务调用,同时掌握 Maven 的基本操作和 IntelliJ IDEA 的项目配置。...

    dubbo_consumer_provider例子

    Dubbo是一个基于Java的RPC框架,其主要目标是通过透明化远程过程调用,使服务消费就像调用本地方法一样简单。它提供了服务注册与发现、负载均衡、调用链路监控等一系列功能,极大地简化了分布式系统开发。 在"**...

    dubbo-consumer.zip

    4. **监控与日志**:消费者可以记录调用日志,配合Dubbo的监控平台,进行服务调用性能分析和问题排查。 总之,`dubbo-consumer.zip`不仅是一个压缩包,更是Dubbo消费者端的完整实现,它封装了服务调用的复杂性,让...

    基于groovy实现 java脚本动态编译、部署、发布;可以通过脚本直接调用dubbo接口.zip

    首先,你需要在脚本中导入Dubbo的相关依赖,然后创建Dubbo的消费者(Consumer),配置服务的URL,最后调用服务方法: ```groovy import com.alibaba.dubbo.config.ApplicationConfig; import ...

    自己手动实现dubbo源码

    2. `dubbo-consumer-webtest`:这是一个服务消费者工程,用于调用服务提供者的服务。服务消费者通过从注册中心获取服务提供者的信息,然后进行远程调用。在这一工程中,你需要配置服务引用、设置服务版本匹配规则、...

    分布式 rpc远程调用 dubbo

    标题“分布式rpc远程调用dubbo”直指Dubbo在分布式环境中的核心功能——RPC调用。Dubbo通过RPC实现了服务提供者(Provider)和服务消费者(Consumer)之间的透明通信。服务提供者发布服务到服务注册中心,如...

    Dubbo超时机制导致的雪崩连接

    假设有一个具体的案例:在一个电商系统中,前端服务(如marketingfront)需要从后端服务(如wsproductreadserver)获取商品信息,这个过程涉及到Dubbo的服务调用。在高并发的情况下,服务调用出现了异常情况: 1. *...

Global site tag (gtag.js) - Google Analytics