`

Dubbo不能优雅停机,导致停止服务的时候,业务掉单

 
阅读更多

Dubbo 优雅停机修改方案

 

1.      服务端不能优雅停机的原因:

NettyServer在构造函数中会调用

ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))

方法将handler进行包装,包装成MultiMessageHandler的一个对象。在下面红色代码中会判断handler是否是WrappedChannelHandler对象,只有是的时候才会对executor对象复值。因为MultiMessageHandler对象不是WrappedChannelHandler的子类,所以executor为空。

NettyServerclose方法被调用的时候,会调用ExecutorUtil.gracefulShutdown方法,gracefulShutdown方法只有executor不为空时才会等待线程池关闭

 

public abstract class AbstractServer extends AbstractEndpoint implements Server {

   public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {

        super(url, handler);

        localAddress = getUrl().toInetSocketAddress();

        String host = url.getParameter(Constants.ANYHOST_KEY, false)

                        || NetUtils.isInvalidLocalHost(getUrl().getHost())

                        ? NetUtils.ANYHOST : getUrl().getHost();

        bindAddress = new InetSocketAddress(host, getUrl().getPort());

        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);

        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);

        try {

            doOpen();

            if (logger.isInfoEnabled()) {

                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());

            }

        } catch (Throwable t) {

            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()

                                        + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);

        }

        if (handler instanceof WrappedChannelHandler ){

            executor = ((WrappedChannelHandler)handler).getExecutor();

        }

    }

  public void close(int timeout) {

        ExecutorUtil.gracefulShutdown(executor ,timeout);

        close();

    }

}

 

 

2. 服务端改动:

修改了AbstractServer类的红色部分

 public abstract class AbstractServer extends AbstractEndpoint implements Server {

 

    private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);

 

    private InetSocketAddress              localAddress;

 

    private InetSocketAddress              bindAddress;

 

    private int                            accepts;

 

    private int                            idleTimeout = 600; //600 seconds

 

    protected static final String SERVER_THREAD_POOL_NAME  ="DubboServerHandler";

 

    ExecutorService executor;

 

    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {

        super(url, handler);

        localAddress = getUrl().toInetSocketAddress();

        String host = url.getParameter(Constants.ANYHOST_KEY, false)

                || NetUtils.isInvalidLocalHost(getUrl().getHost())

                ? NetUtils.ANYHOST : getUrl().getHost();

        bindAddress = new InetSocketAddress(host, getUrl().getPort());

        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);

        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);

        try {

            doOpen();

            if (logger.isInfoEnabled()) {

                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());

            }

        } catch (Throwable t) {

            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()

                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);

        }

        try {

            recycleToGetExecutor(handler);

        } catch (Exception e) {

            logger.error("recycle to get executor failed");

        }

    }

 

    private void recycleToGetExecutor(ChannelHandler handler) throws NoSuchFieldException, IllegalAccessException {

        if(handler == null)

        {

            return;

        }

 

        if (handler instanceof WrappedChannelHandler ){

            executor = ((WrappedChannelHandler)handler).getExecutor();

        }else if(handler instanceof AbstractChannelHandlerDelegate){

            Field field = AbstractChannelHandlerDelegate.class.getDeclaredField("handler");

            field.setAccessible(true);

            recycleToGetExecutor((ChannelHandler) field.get(handler));

        }

    }

 

    protected abstract void doOpen() throws Throwable;

 

    protected abstract void doClose() throws Throwable;

 

    public void reset(URL url) {

        if (url == null) {

            return;

        }

        try {

            if (url.hasParameter(Constants.ACCEPTS_KEY)) {

                int a = url.getParameter(Constants.ACCEPTS_KEY, 0);

                if (a > 0) {

                    this.accepts = a;

                }

            }

        } catch (Throwable t) {

            logger.error(t.getMessage(), t);

        }

        try {

            if (url.hasParameter(Constants.IDLE_TIMEOUT_KEY)) {

                int t = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 0);

                if (t > 0) {

                    this.idleTimeout = t;

                }

            }

        } catch (Throwable t) {

            logger.error(t.getMessage(), t);

        }

        try {

            if (url.hasParameter(Constants.THREADS_KEY)

                    && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {

                ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;

                int threads = url.getParameter(Constants.THREADS_KEY, 0);

                int max = threadPoolExecutor.getMaximumPoolSize();

                int core = threadPoolExecutor.getCorePoolSize();

                if (threads > 0 && (threads != max || threads != core)) {

                    if (threads < core) {

                        threadPoolExecutor.setCorePoolSize(threads);

                        if (core == max) {

                            threadPoolExecutor.setMaximumPoolSize(threads);

                        }

                    } else {

                        threadPoolExecutor.setMaximumPoolSize(threads);

                        if (core == max) {

                            threadPoolExecutor.setCorePoolSize(threads);

                        }

                    }

                }

            }

        } catch (Throwable t) {

            logger.error(t.getMessage(), t);

        }

        super.setUrl(getUrl().addParameters(url.getParameters()));

    }

 

    public void send(Object message, boolean sent) throws RemotingException {

        Collection<Channel> channels = getChannels();

        for (Channel channel : channels) {

            if (channel.isConnected()) {

                channel.send(message, sent);

            }

        }

    }

 

    public void close() {

        if (logger.isInfoEnabled()) {

            logger.info("Close " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());

        }

        ExecutorUtil.shutdownNow(executor ,100);

        try {

            super.close();

        } catch (Throwable e) {

            logger.warn(e.getMessage(), e);

        }

        try {

            doClose();

        } catch (Throwable e) {

            logger.warn(e.getMessage(), e);

        }

    }

 

    public void close(int timeout) {

        ExecutorUtil.gracefulShutdown(executor ,timeout);

        close();

    }

 

    public InetSocketAddress getLocalAddress() {

        return localAddress;

    }

 

    public InetSocketAddress getBindAddress() {

        return bindAddress;

    }

 

    public int getAccepts() {

        return accepts;

    }

 

    public int getIdleTimeout() {

        return idleTimeout;

    }

 

    @Override

    public void connected(Channel ch) throws RemotingException {

        Collection<Channel> channels = getChannels();

        if (accepts > 0 && channels.size() > accepts) {

            logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);

            ch.close();

            return;

        }

        super.connected(ch);

    }

 

    @Override

    public void disconnected(Channel ch) throws RemotingException {

        Collection<Channel> channels = getChannels();

        if (channels.size() == 0){

            logger.warn("All clients has discontected from " + ch.getLocalAddress() + ". You can graceful shutdown now.");

        }

        super.disconnected(ch);

    }

 

}

 

3.   客户端不能优雅停机的原因:

DubboInvoker停机的时候直接调用了client.close()方法应该调用client.close(long timeout)方法

public class DubboInvoker<T> extends AbstractInvoker<T> {

public void destroy() {

        //防止client被关闭多次.connect per jvm的情况下,client.close方法会调用计数器-1,当计数器小于等于0的情况下,才真正关闭

        if (super.isDestroyed()){

            return ;

        } else {

            //dubbo check ,避免多次关闭

            destroyLock.lock();

            try{

                if (super.isDestroyed()){

                    return ;

                }

                super.destroy();

                if (invokers != null){

                    invokers.remove(this);

                }

                for (ExchangeClient client : clients) {

                    try {

                        client.close();

                    } catch (Throwable t) {

                        logger.warn(t.getMessage(), t);

                    }

                }

               

            }finally {

                destroyLock.unlock();

            }

        }

}

}

 

HeaderExchangeChannel类中判断HeaderExchangeChannel.this是否在DefaultFuture中。实际上在处理请求的时候是将HeaderExchangeChannel对象中channel成员变量放到DefaultFuture中的

final class HeaderExchangeChannel implements ExchangeChannel {

  public ResponseFuture request(Object request, int timeout) throws RemotingException {

        if (closed) {

            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");

        }

        // create request.

        Request req = new Request();

        req.setVersion("2.0.0");

        req.setTwoWay(true);

        req.setData(request);

        DefaultFuture future = new DefaultFuture(channel, req, timeout);

        try{

            channel.send(req);

        }catch (RemotingException e) {

            future.cancel();

            throw e;

        }

        return future;

}

 

public void close(int timeout) {

        if (closed) {

            return;

        }

        closed = true;

        if (timeout > 0) {

            long start = System.currentTimeMillis();

            while (DefaultFuture.hasFuture(HeaderExchangeChannel.this)

                    && System.currentTimeMillis() - start < timeout) {

                try {

                    Thread.sleep(10);

                } catch (InterruptedException e) {

                    logger.warn(e.getMessage(), e);

                }

            }

        }

        close();

    }

}

 

4.   客户端的修改

 

修改下面高量代码:

public class DubboInvoker<T> extends AbstractInvoker<T> {

    public void destroy() {

        //防止client被关闭多次.connect per jvm的情况下,client.close方法会调用计数器-1,当计数器小于等于0的情况下,才真正关闭

        if (super.isDestroyed()){

            return ;

        } else {

            //dubbo check ,避免多次关闭

            destroyLock.lock();

            try{

                if (super.isDestroyed()){

                    return ;

                }

                super.destroy();

                if (invokers != null){

                    invokers.remove(this);

                }

                for (ExchangeClient client : clients) {

                    try {

                        logger.info("start to shutdown:" + getServerShutdownTimeout());

                        client.close(getServerShutdownTimeout());

                    } catch (Throwable t) {

                        logger.warn(t.getMessage(), t);

                    }

                }

 

            }finally {

                destroyLock.unlock();

            }

        }

    }

 

    protected static int getServerShutdownTimeout() {

        int timeout = Constants.DEFAULT_SERVER_SHUTDOWN_TIMEOUT;

        String value = ConfigUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY);

        if (value != null && value.length() > 0) {

            try{

                timeout = Integer.parseInt(value);

            }catch (Exception e) {

            }

        } else {

            value = ConfigUtils.getProperty(Constants.SHUTDOWN_WAIT_SECONDS_KEY);

            if (value != null && value.length() > 0) {

                try{

                    timeout = Integer.parseInt(value) * 1000;

                }catch (Exception e) {

                }

            }

        }

 

        return timeout;

    }

}

 

final class HeaderExchangeChannel implements ExchangeChannel {

     // graceful close

    public void close(int timeout) {

        if (closed) {

            return;

        }

        closed = true;

        if (timeout > 0) {

            long start = System.currentTimeMillis();

            while (DefaultFuture.hasFuture(channel)

                    && System.currentTimeMillis() - start < timeout) {

                try {

                    Thread.sleep(10);

                } catch (InterruptedException e) {

                    logger.warn(e.getMessage(), e);

                }

            }

        }

        close();

    }

}

 

 

注意服务端和客户端都需要加上配置dubbo.service.shutdown.wait=60000,设定停机等待时间

分享到:
评论
2 楼 无红墙 2017-08-08  
另一种修改,请参考:
https://github.com/taige/dubbo/commit/ed08ffac5cfacd0d226299ce7f7633057620bec4
1 楼 fish_no7 2016-03-16  

if (handler instanceof WrappedChannelHandler ){
            executor = ((WrappedChannelHandler)handler).getExecutor();
        }

你好,请问NettyServer的话默认
public NettyServer(URL url, ChannelHandler handler) throws RemotingException{
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

wrap的都是MultiMessageHandler,包括minaServer

他在优雅停机的时候为什么又要是WrappedChannelHandler 的子类才给优雅停机呢,是BUG么

相关推荐

    Dubbo超时机制导致的雪崩连接

    2. **配置不一致**:DubboConsumer设置的超时时间为91毫秒,而DubboProvider的超时时间设置为600毫秒,这之间的差异可能导致服务调用出现错误。 3. **雪崩效应触发**:当DubboConsumer尝试调用DubboProvider获取商品...

    基于zookeeper 监控dubbo provider 下线,提供dubbo 服务下线 邮箱预警

    本项目结合两者,实现了一个针对`Dubbo`服务提供者(`Provider`)下线的监控系统,并提供了邮件预警功能,确保系统能够及时发现并处理服务异常情况,减少因服务不可用导致的业务中断。 1. **Zookeeper在Dubbo中的作用...

    服务治理工具dubbo

    1. **大型分布式系统**:在电商、金融、社交等业务场景中,Dubbo可以帮助构建大规模、高并发的分布式服务架构。 2. **微服务改造**:在进行微服务化改造时,Dubbo作为服务治理工具,能够有效管理和协调各个独立服务...

    dubbo注册中心服务ip和实际服务提供者ip不一致问题

    dubbo注册中心服务ip和实际服务提供者ip不一致问题 网上收集的资料

    Dubbo 分布式服务架构

    Dubbo分布式服务架构,对于研究大型Web服务器的并发技术的同学们有帮助。

    dubbo-admin:dubbo服务监控

    【标题】"dubbo-admin:dubbo服务监控" 指的是Dubbo生态系统中的一个关键组件,用于管理和监控Dubbo服务。Dubbo是阿里巴巴开源的一个高性能、轻量级的服务治理框架,广泛应用于分布式系统中。它提供了一整套服务治理...

    dubbo拆分服务实例.rar

    本实例将深入探讨如何利用Dubbo进行服务的拆分,通过"用户服务"(dubbo-user)和"订单服务"(dubbo-order)两个具体的示例,揭示服务拆分的实施步骤和关键点。 1. 服务拆分原则 服务拆分的目标是提高系统的可扩展性...

    dubbo-api服务

    Dubbo-api服务则是这个框架中实现业务逻辑的部分,它定义了服务的接口,并提供了服务的实现。开发者可以通过定义服务接口,编写服务提供者和消费者代码,实现服务的发布和调用。 在"下载即运行"的概念下,dubbo-api...

    Dubbo无法访问远程Zookeeper已注册服务的问题解决方案

    在使用Dubbo框架时,我们可能会遇到以下情况:当Zookeeper、Dubbo-admin、生产者和消费者都在内网中的时候,生产者的生产和消费是没有问题的。然而,当我们将Zookeeper和生产者放到远程服务器上,然后消费者在访问...

    dubbo本地测试服务

    在IT行业中,Dubbo是一个广泛使用的高性能Java RPC框架,它使得服务间的调用变得简单而高效。本主题聚焦于“dubbo本地测试服务”,旨在帮助开发者在本地环境中进行服务提供者连接、数据发送测试以及接口调试,从而...

    dubbo启动和停止脚本

    - 发送信号给Dubbo服务进程,如发送`SIGTERM`或`SIGKILL`信号来优雅地关闭服务或强制终止服务。 - 清理资源,例如释放端口、删除临时文件等。 **目录结构的重要性** 在描述中提到了文档目录应符合`bin`、`log`、`...

    dubbodubbo.zip

    3. **服务测试**:Dubbo支持Mock服务,可以在不依赖真实服务的情况下进行单元测试,提高了开发效率。 4. **微服务架构**:在微服务架构中,Dubbo可以帮助拆分大型系统为多个小型、独立的服务,每个服务都可以独立...

    dubbo资源 dubbo-admin dubbo demo

    【标题】"dubbo资源 dubbo-admin dubbo demo" 提供的是关于Apache Dubbo的相关素材,主要包括了Dubbo-admin的管理和示例项目。Dubbo是一个高性能、轻量级的开源Java RPC框架,它提供了丰富的服务治理功能,是阿里...

    dubbo服务和消费

    首先,我们要明确的是,Dubbo的核心概念是服务,它将业务逻辑封装为可远程调用的服务。服务提供者(Provider)是暴露服务的一方,它将本地实现注册到服务注册中心,使得服务消费者能够发现并调用。服务消费者...

    dubbo服务注册到eureka.zip

    - 引入 Eureka 客户端依赖,使 Dubbo 服务能与 Eureka 通信。 - 在 Dubbo 服务提供者的配置文件中,添加 Eureka 相关配置,如应用名、Eureka 服务器地址等。 - 修改服务提供者的启动类,加入 @EnableEurekaClient...

    dubbo 服务提供方 服务消费方

    在分布式系统领域,Dubbo是一个广泛使用的开源Java框架,它主要设计用于构建高性能、轻量级的服务治理平台。本篇文章将深入探讨Dubbo服务提供方(Provider)和服务消费方(Consumer)的核心概念、实现原理以及如何...

    dubbo示例代码dubbo-sample

    通过`dubbo-sample`代码,开发者不仅可以学习到Dubbo的基本用法,还能了解到服务治理的实践细节,从而更好地在实际项目中应用Dubbo,构建高效稳定的分布式系统。在实际开发中,可以根据需求选择合适的模块进行学习和...

    dubbo上传文件+oss上传文件服务.zip

    在IT行业中,分布式服务框架Dubbo和对象存储服务OSS(Object Storage Service)是两种非常重要的技术。本项目结合了这两个技术,实现了文件的本地上传以及通过Dubbo接口将文件发送到远程服务,并最终存储在OSS上。...

    dubbo 分布式服务框架 开发者学习文档 PDF格式

    《Dubbo分布式服务框架开发者学习文档》是一份深入解析Dubbo技术体系的重要资料,它涵盖了Dubbo的核心功能和实现机制,对于理解并掌握这个流行的Java分布式服务框架具有极高的价值。Dubbo,作为阿里巴巴开源的一款高...

    Dubbo服务注册与发现.doc

    然而,业务拆分后发现许多共用的模块很难复用起来,这时候分布式服务架构登场了,将核心业务抽取出来,作为独立的服务,逐渐形成稳定的服务中心。 RPC(Remote Procedure Call)是指远程过程调用,是一种进程间通信...

Global site tag (gtag.js) - Google Analytics