`
manzhizhen
  • 浏览: 293753 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Dubbo源代码分析九:优雅停机

阅读更多

虽然我们系统的用户体验和数据一致性不应该完全靠优雅停机来保证,但作为一流的RPC框架,优雅停机的功能必不可少,Dubbo用户手册有对优雅停机做一个简单的叙述:

 

Dubbo是通过JDK的 ShutdownHook 来完成优雅停机的,所以如果用户使用 kill -9 PID 等强制关闭指令,是不会执行优雅停机的,只有通过 kill PID 时,才会执行。

服务提供方:停止时,先标记为不接收新请求,新请求过来时直接报错,让客户端重试其它机器。然后,检测线程池中的线程是否正在运行,如果有,等待所有线程执行完成,除非超时,则强制关闭。

服务消费方:停止时,不再发起新的调用请求,所有新的调用在客户端即报错。然后,检测有没有请求的响应还没有返回,等待响应返回,除非超时,则强制关闭。

 

从官方的描述来看,服务提供者进行优雅停机时,将不在接收新的请求,新的请求过来将直接报错,需要客户端配置重试机制来重试其他服务器;而服务消费者进行优雅停机时,会将Dubbo调用拦截在自己这方。官方给的方案有些简单粗暴,主要依赖的是系统上游消费者的重试,但很多情况下,微服务之间为了避免雪崩或流量风暴,除了特别重要的服务,几乎都关闭了重试的功能。

 

为了形象说明,我们通过一个场景来分析Dubbo的优雅停机做法,如下图:


 

服务调用图

 

服务ABCD之间通过Dubbo来通信,假设一次RPC调用顺序经历上图①②③三个步骤,我们的目标是对B服务进行优雅停机,当然,在分布式环境,A、B、C、D服务会有多个,为简单起见,图中只画了一个B服务。

 

话不多说,我们先从源代码角度看现有的Dubbo(本文使用的是2.5.3版本)的优雅停机是如何做的。官方文档已经告诉我们,如果ShutdownHook失效,用户可以自行调用ProtocolConfig.destroyAll()来主动进行优雅停机,可见我们该从这方法入手:

 

public static void destroyAll() {

    // 1.关闭所有已创建注册中心

    AbstractRegistryFactory.destroyAll();

    ExtensionLoader<Protocol> loader = ExtensionLoader.getExtensionLoader(Protocol.class);

    for (String protocolName : loader.getLoadedExtensions()) {

        try {

            Protocol protocol = loader.getLoadedExtension(protocolName);

            if (protocol != null) {

                // 2.关闭协议类的扩展点

                protocol.destroy();

            }

        } catch (Throwable t) {

            logger.warn(t.getMessage(), t);

        }

    }

}

 

可以看出,该方法主要做两件事情:步骤一. 和注册中心断连、步骤二. 关闭协议暴露(包括provider和consumer)。

 

步骤一简单来说就是通过 AbstractRegistryFactory.destroyAll() 来“撤销”在所有注册中心注册的服务,一般来说我们只会用一个注册中心,比如ZooKeeper,所以此时就是去调用ZkClient客户端的close方法(使用Curator也类似)。对于上面的服务调用图来说,就是关闭ZK(注册中心)和服务B的长连接(会话Session),这样的话,“过一阵子”A服务的地址列表中将不会有B服务的地址了。理想的情况下,步骤一后就不会有新的调用请求到达B服务了。

 

步骤二是关闭自己暴露的服务和自己对下游服务的调用。假设我们使用的是dubbo协议,protocol.destroy()其实会调用DubboProtocol#destroy方法,该方法部分摘要如下:

public void destroy() {

         // 关闭暴露的服务

    for (String key : new ArrayList<String>(serverMap.keySet())) {

        ExchangeServer server = serverMap.remove(key);

        if (server != null) {

            // 关闭该接口暴露的服务

            server.close(getServerShutdownTimeout());

   }

    }

   

        // 关闭对下游服务的调用

    for (String key : new ArrayList<String>(referenceClientMap.keySet())) {

        ExchangeClient client = referenceClientMap.remove(key);

        if (client != null) {

             client.close();

        }

    }

   

    stubServiceMethodsMap.clear();

    super.destroy();

}

 

我们可以看到顺序,是先关闭provider,再关闭consumer,这理解起来也简单,不先关闭provider,就可能会一直有对下游服务的调用。代码中的getServerShutdownTimeout()是获取“provider服务关闭的最长等待时间”的配置,即通过dubbo.service.shutdown.wait来设置的值,单位毫秒,默认是10秒钟,为了探究关闭provider的细节,我们来分析下HeaderExchangeServer#close方法:

 

public void close(final int timeout) {

    if (timeout > 0) {

        final long max = (long) timeout;

        final long start = System.currentTimeMillis();

        if (getUrl().getParameter(Constants.CHANNEL_SEND_READONLYEVENT_KEY, false)){

            sendChannelReadOnlyEvent();

        }

  

      // 如果还有进行中的任务并且没有到达等待时间的上限,则继续等待

        while (HeaderExchangeServer.this.isRunning()

                && System.currentTimeMillis() - start < max) {

            try {

  // 休息10毫秒再检查

                Thread.sleep(10);

            } catch (InterruptedException e) {

                logger.warn(e.getMessage(), e);

            }

        }

    }

        // 关闭心跳,停止应答

    doClose();

        // 关闭通信通道

    server.close(timeout);

}

 

其中HeaderExchangeServer.this.isRunning()是用来检测是否还有正在进行中的调用(如果读者对如何判断是否有进行中的任务,可以参看DefaultFuture),如果没有进行中的调用或者等待时间还达到上限(上面提到的dubbo.service.shutdown.wait),则立马调用关闭provider操作(while后面的doClose()操作)。但这里会有个小问题,因为provider从注册中心撤销服务和上游consumer将其服务从服务列表中删除并不是原子操作,如果集群规模过大,可能导致上游consumer的服务列表还未更新完成,我们的provider这时发现当前没有进行中的调用就立马关闭服务暴露,导致上游consumer调用该服务失败。所以,dubbo默认的这种优雅停机方案,需要建立在上游consumer有重试机制的基础之上,但由于consumer增加重试特性会增加故障时的雪崩风险,所以大多数分布式服务不愿意增加服务内部之间的重试机制,这样就比较尴尬了,其实dubbo.service.shutdown.wait的值主要是为了防止优雅停机时的无限等待,即限制等待上限,我们也应该用一个参数来设置等待下限,这样整个分布式系统几乎不需要通过重试来保证优雅停机,只需要给与上游consumer少许时间,让他们足够有机会更新完provider的列表就行,虽然dubbo目前并不打算这么做。

我们接下来看看doClose()中做了些什么:

 

private void doClose() {

    if (closed) {

        return;

    }

        // 修改标记位,该标记为设置为true后,provider不再对上游请求做应答

    closed = true;

        // 取消心跳的Futrue

    stopHeartbeatTimer();

    try {

                 // 关闭心跳的线程池

        scheduled.shutdown();

    } catch (Throwable t) {

        logger.warn(t.getMessage(), t);

    }

}

 

这里最重要的是将closed设置成了true了,这样以后provider将不会向上游系统发送应答数据。当然,它还关闭了服务端的心跳。

而server.close(timeout)则主要是关闭通信资源,可以参看AbstractServer#close和NettyServer#doClose。

那么上图的B服务的consumer端(即对②③的调用)是如何关闭的?这个我们可以参看HeaderExchangeClient中的代码:

 

public void close() {

        // 关闭心跳

    doClose();

        // 关闭通讯资源,关闭后不能重新建立连接,也不能向下游发送请求

    channel.close();

}

 

public void close(int timeout) {

    doClose();

    channel.close(timeout);

}

 

同HeaderExchangeServer一样,HeaderExchangeClient的close方法也有两个,但DubboProtocol#destroy中调用的是不带timeout的这个close(和关闭provider时相反),dubbo的新版本改成调用有timeout的方法,拿最上面的服务调用来说,在B服务的provider对上游应答关闭之前,步骤②③理想情况下应该陆续完成,如果已经走到要关闭B服务的consumer了,说明B服务对上游服务(比如A服务)的应答和服务暴露早已关闭,这时候B服务关闭自己的consumer就可以暴力些了。但如果B服务自身内部有些调度任务在处理,并且对下游Dubbo服务有依赖,那么这种情况就比较复杂了,很难做到优雅停机。

 

为了在2.5.3的版本实现不设置重试也能优雅停机,我们需要在几个关键地方加上一些等待时间。

在Constants.java中加入四个常量:

/**

 * 为了让优雅停机的可用性更高,这里暴露出provider和consumer在优雅停机时的最小等待时间,单位毫秒

 * yizhenqiang 2017-12-07

 */

public static final String SHUTDOWN_PROVIDER_MIN_WAIT          = "provider.shutdown.min.wait";

public static final String SHUTDOWN_PROVIDER_MIN_WAIT_DEFAULT  = "3000";

public static final String SHUTDOWN_CONSUMER_MIN_WAIT          = "consumer.shutdown.min.wait";

public static final String SHUTDOWN_CONSUMER_MIN_WAIT_DEFAULT  = "2000";

 

修改ProtocolConfig.java的destroyAll()方法,加入第一个Provider的等待时间:

public static void destroyAll() {

    AbstractRegistryFactory.destroyAll();

 

    /**

     * 为了防止上面和注册中心断开后立马结束provider暴露的服务,这里等待一小段时间

     * yizhenqiang 2017-12-07

     */

    String providerMinTimeoutStr = ConfigUtils.getProperty(Constants.SHUTDOWN_PROVIDER_MIN_WAIT,

            Constants.SHUTDOWN_PROVIDER_MIN_WAIT_DEFAULT);

    Long providerMinTimeout;

    try {

        providerMinTimeout = Long.parseLong(providerMinTimeoutStr);

    } catch (NumberFormatException e) {

        providerMinTimeout = Long.parseLong(Constants.SHUTDOWN_PROVIDER_MIN_WAIT_DEFAULT);

    }

    try {

        TimeUnit.MILLISECONDS.sleep(providerMinTimeout);

    } catch (InterruptedException e) {

        logger.warn(e.getMessage(), e);

    }

 

    ExtensionLoader<Protocol> loader = ExtensionLoader.getExtensionLoader(Protocol.class);

 

    for (String protocolName : loader.getLoadedExtensions()) {

        try {

            Protocol protocol = loader.getLoadedExtension(protocolName);

            if (protocol != null) {

                protocol.destroy();

            }

        } catch (Throwable t) {

            logger.warn(t.getMessage(), t);

        }

    }

}

 

因为我们使用dubbo协议,所以需要修改的是DubboInvoker.java:

先在DubboInvoker.java中加一个线程池属性,用于异步关闭client,例如:

/**

 * 为了做到多个接口(一个DubboInvoker对应一个接口)能优雅停机,这里对client的关闭

 * yizhenqiang 2017-12-08

 */

private static final ExecutorService closeClientPool = new ThreadPoolExecutor(0, 100, 5,

        TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {

    @Override

    public Thread newThread(Runnable r) {

        return new Thread(r, "dubboInvokerClientClose");

    }

});

 

再修改DubboInvoker.java的destroy(),加入第二个等待时间:

public void destroy() {

    //防止client被关闭多次.在connect per jvm的情况下,client.close方法会调用计数器-1,当计数器小于等于0的情况下,才真正关闭

    if (super.isDestroyed()) {

        return;

    } else {

        //dubbo check ,避免多次关闭

        destroyLock.lock();

        try {

            if (super.isDestroyed()) {

                return;

            }

 

            super.destroy();

            if (invokers != null) {

                invokers.remove(this);

            }

 

            /**

             * 为了避免关闭多个DubboInvoker时都等待指定的最小时间,这里关闭client时采用异步方式

             * yizhenqiang 2017-12-08

             */

            try {

                closeClientPool.submit(new Runnable() {

                    @Override

                    public void run() {

                        /**

                         * 当consumer收到provider变动的消息后,上面已经将失效的provider移除了,但为了让正在进行中的请求能完成,

                         * 这里在下面关闭ExchangeClient前先等待一小段时间

                         * yizhenqiang 2017-12-07

                         */

                        String consumerMinTimeoutStr = ConfigUtils.getProperty(Constants.SHUTDOWN_CONSUMER_MIN_WAIT,

                                Constants.SHUTDOWN_CONSUMER_MIN_WAIT_DEFAULT);

                        Long consumerMinTimeout;

                        try {

                            consumerMinTimeout = Long.parseLong(consumerMinTimeoutStr);

 

                        } catch (NumberFormatException e) {

                            consumerMinTimeout = Long.parseLong(Constants.SHUTDOWN_CONSUMER_MIN_WAIT_DEFAULT);

                        }

 

                        try {

                            TimeUnit.MILLISECONDS.sleep(consumerMinTimeout);

                        } catch (InterruptedException e) {

                            logger.warn(e.getMessage(), e);

                        }

 

                        for (ExchangeClient client : clients) {

                            try {

                                client.close();

                            } catch (Throwable t) {

                                logger.warn(t.getMessage(), t);

                            }

                        }

                    }

                });

 

            } catch (Exception e) {

                logger.warn("提交client关闭任务异常," +  e.getMessage(), e);

            }

 

 

        } finally {

            destroyLock.unlock();

        }

    }

}

 

这样修改后,哪怕业务系统没设置重试机制,也能实现优雅停机(通过等待少许时间),如果想调整provider和consumer的等待时间,那么只需要在dubbo.properties中设置就行了:

provider.shutdown.min.wait=5000

<!--StartFragment--> <!--EndFragment-->

consumer.shutdown.min.wait=2000

  • 大小: 68 KB
0
0
分享到:
评论

相关推荐

    Apache Dubbo:Dubbo服务治理:限流与降级策略

    ### Apache Dubbo:Dubbo服务治理:限流与降级策略 #### 1. Dubbo服务治理概述 ##### 1.1 Dubbo服务治理的重要性 在微服务架构中,服务之间的交互频繁且复杂,特别是在高并发场景下,某些服务可能会因为请求量过...

    Dubbo源代码(2.8.4)

    本文将深入探讨Dubbo 2.8.4的源代码结构、核心组件及其工作原理。 1. **源代码结构**: Dubbo的源码结构清晰,主要包括以下几个模块: - `dubbo-common`:基础通用模块,包含各种工具类和公共接口。 - `dubbo-...

    Dubbo源代码(2.5.4)

    【Dubbo源代码(2.5.4)】是一份重要的开源项目资源,它包含了Dubbo框架在2.5.4版本的完整源代码。Dubbo是中国阿里巴巴公司贡献的高性能、轻量级的服务治理框架,它专注于服务调用、监控和服务治理。这个版本的源...

    Apache Dubbo:Dubbo高级特性:服务降级与熔断实战

    Apache Dubbo:Dubbo高级特性:服务降级与熔断实战 Dubbo是著名的RCP框架,文档内有干货,提供代码和可复现的命令,值得借鉴。

    Dubbo源代码分析之远程调用过程(2.5.4开发版)

    该文档分析了 Dubbo 框架中 RPC 调用的整个流程,并基于源代码按照执行 时序进行说明,源码版本为2.5.4开发版。 涉及的关键点包括:Invocation、Invoker、Directory、路由、负载均衡、集群容错、过滤器以及监控模块...

    Apache Dubbo:Dubbo服务治理:服务路由与动态配置

    ### Apache Dubbo:服务治理——服务路由与动态配置 #### 一、服务治理的重要性 在当前流行的微服务架构中,由于各个服务之间存在着频繁而复杂的交互,如何有效地管理和控制这些服务成为了确保整个系统稳定性和可...

    Apache Dubbo:Dubbo服务治理:服务熔断与超时重试

    ### Apache Dubbo:服务熔断与超时重试 #### 一、服务熔断基础 ##### 1.1 服务熔断的概念 服务熔断,作为一种重要的服务稳定性保障措施,在分布式系统中扮演着至关重要的角色。它的工作原理是,当某个服务节点...

    Apache Dubbo:Dubbo服务治理:负载均衡与容错机制

    ### Apache Dubbo:Dubbo服务治理:负载均衡与容错机制 #### 一、Dubbo服务治理的重要性 在现代微服务架构中,服务间的交互变得日益频繁和复杂,因此需要一种有效的方式来管理这些服务,以确保整个系统的稳定性和...

    Apache Dubbo:Dubbo监控与运维:服务性能分析

    ### Apache Dubbo:Dubbo监控与运维:服务性能分析 #### 一、Dubbo监控概述 ##### 1.1 Dubbo监控的重要性 在现代微服务架构中,由于服务之间存在着复杂的调用关系,任何单一服务的性能问题都有可能对整体系统稳定...

    dubbo源代码

    dubbo分布式服务框架,方便大家对分布式服务的学习,方便对dubbo的扩展

    dubbo2.8.4.jar

    如果使用dubbo遇到错误:com.alibaba.dubbo.remoting.RemotingException: Fail to decode request due to: RpcInvocation 请下载这个jar,替换掉你项目中的那个jar,应该可以解决。

    Apache Dubbo:Dubbo高级特性:服务版本与分组

    - **配置文件方式**:在服务提供者和消费者的配置文件中,使用 `&lt;dubbo:service&gt;` 或 `&lt;dubbo:reference&gt;` 标签的 `version` 属性来指定服务版本。 - **服务提供者配置**: ```xml &lt;dubbo:service interface=...

    Apache Dubbo:Dubbo核心概念:服务提供者与消费者

    ### Apache Dubbo:服务提供者与消费者核心概念详解 #### 一、Apache Dubbo概览 **Apache Dubbo**是一款高性能、轻量级的开源微服务框架,最初由阿里巴巴内部开发并在2008年开始使用,随后在2011年开源。自2017年...

    dubbo示例代码dubbo-sample

    【Dubbo 示例代码详解】 Dubbo 是阿里巴巴开源的一款高性能、轻量级的Java服务治理框架,它主要提供了RPC(远程过程调用)服务,并且包含了服务注册与发现、负载均衡、容错处理、监控等全面的服务治理功能。本示例...

    Spring+mybatis+dubbo整合源代码及jar包

    通过Spring_Mybatis_Dubbo_Jars.zip和Spring_Mybatis_Dubbo2.zip这两个压缩包,你可以获取到整合所需的所有依赖包和源代码,从而快速搭建起这个框架。这将帮助开发者更快地理解并掌握这三大框架的协同工作方式,提高...

    dubbo xsd的支持

    例如,如果没有`dubbo.xsd`,XML解析器就无法识别像`&lt;dubbo:application&gt;`、`&lt;dubbo:service&gt;`或`&lt;dubbo:reference&gt;`这样的Dubbo特定标签。此时,我们需要引入`dubbo.xsd`文件到项目的类路径中,或者在XML配置文件中...

    dubbo server+client 完整代码

    【Dubbo Server+Client 完整代码】是一个深入学习和实践Dubbo框架的实例项目,它涵盖了服务端(Server)和服务消费者端(Client)的完整实现。Dubbo是阿里巴巴开源的一款高性能、轻量级的Java远程服务框架,它强调了...

    dubbo从入门到精通教程

    "dubbo从入门到精通教程" dubbo是一种分布式服务框架,致力于提供高性能和透明化的RPC远程服务调用方案。它是阿里巴巴SOA服务化治理方案的核心框架,每天为2000+个服务提供300000000+次访问量支持,并被广泛应用于...

    dubbo2.8.4源代码

    【标题】"Dubbo 2.8.4 源代码" 涵盖了分布式服务框架的核心技术,是阿里巴巴开源的一款高性能、轻量级的服务治理框架。它为开发者提供了微服务开发所需的诸多功能,包括服务注册与发现、负载均衡、调用链路监控等。...

    Dubbo工程demo

    传统工程改造成Dubbo工程 dubbodemo-parent : 父项目,定义jar包版本号,聚合所有maven项目(module)等. dubbodemo-facede : 定义接口,这个项目是要打成jar包分别被dubbodemo-service和dubbodemo-web引用的 dubbodemo-...

Global site tag (gtag.js) - Google Analytics