Dubbo 优雅停机修改方案
1. 服务端不能优雅停机的原因:
NettyServer在构造函数中会调用
ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))
方法将handler进行包装,包装成MultiMessageHandler的一个对象。在下面红色代码中会判断handler是否是WrappedChannelHandler对象,只有是的时候才会对executor对象复值。因为MultiMessageHandler对象不是WrappedChannelHandler的子类,所以executor为空。
当NettyServer的close方法被调用的时候,会调用ExecutorUtil.gracefulShutdown方法,gracefulShutdown方法只有executor不为空时才会等待线程池关闭
public abstract class AbstractServer extends AbstractEndpoint implements Server { public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); localAddress = getUrl().toInetSocketAddress(); String host = url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(getUrl().getHost()) ? NetUtils.ANYHOST : getUrl().getHost(); bindAddress = new InetSocketAddress(host, getUrl().getPort()); this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); try { doOpen(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } if (handler instanceof WrappedChannelHandler ){ executor = ((WrappedChannelHandler)handler).getExecutor(); } } public void close(int timeout) { ExecutorUtil.gracefulShutdown(executor ,timeout); close(); } } |
2. 服务端改动:
修改了AbstractServer类的红色部分
public abstract class AbstractServer extends AbstractEndpoint implements Server {
private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);
private InetSocketAddress localAddress;
private InetSocketAddress bindAddress;
private int accepts;
private int idleTimeout = 600; //600 seconds
protected static final String SERVER_THREAD_POOL_NAME ="DubboServerHandler";
ExecutorService executor;
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); localAddress = getUrl().toInetSocketAddress(); String host = url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(getUrl().getHost()) ? NetUtils.ANYHOST : getUrl().getHost(); bindAddress = new InetSocketAddress(host, getUrl().getPort()); this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); try { doOpen(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } try { recycleToGetExecutor(handler); } catch (Exception e) { logger.error("recycle to get executor failed"); } }
private void recycleToGetExecutor(ChannelHandler handler) throws NoSuchFieldException, IllegalAccessException { if(handler == null) { return; }
if (handler instanceof WrappedChannelHandler ){ executor = ((WrappedChannelHandler)handler).getExecutor(); }else if(handler instanceof AbstractChannelHandlerDelegate){ Field field = AbstractChannelHandlerDelegate.class.getDeclaredField("handler"); field.setAccessible(true); recycleToGetExecutor((ChannelHandler) field.get(handler)); } }
protected abstract void doOpen() throws Throwable;
protected abstract void doClose() throws Throwable;
public void reset(URL url) { if (url == null) { return; } try { if (url.hasParameter(Constants.ACCEPTS_KEY)) { int a = url.getParameter(Constants.ACCEPTS_KEY, 0); if (a > 0) { this.accepts = a; } } } catch (Throwable t) { logger.error(t.getMessage(), t); } try { if (url.hasParameter(Constants.IDLE_TIMEOUT_KEY)) { int t = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 0); if (t > 0) { this.idleTimeout = t; } } } catch (Throwable t) { logger.error(t.getMessage(), t); } try { if (url.hasParameter(Constants.THREADS_KEY) && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) { ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; int threads = url.getParameter(Constants.THREADS_KEY, 0); int max = threadPoolExecutor.getMaximumPoolSize(); int core = threadPoolExecutor.getCorePoolSize(); if (threads > 0 && (threads != max || threads != core)) { if (threads < core) { threadPoolExecutor.setCorePoolSize(threads); if (core == max) { threadPoolExecutor.setMaximumPoolSize(threads); } } else { threadPoolExecutor.setMaximumPoolSize(threads); if (core == max) { threadPoolExecutor.setCorePoolSize(threads); } } } } } catch (Throwable t) { logger.error(t.getMessage(), t); } super.setUrl(getUrl().addParameters(url.getParameters())); }
public void send(Object message, boolean sent) throws RemotingException { Collection<Channel> channels = getChannels(); for (Channel channel : channels) { if (channel.isConnected()) { channel.send(message, sent); } } }
public void close() { if (logger.isInfoEnabled()) { logger.info("Close " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } ExecutorUtil.shutdownNow(executor ,100); try { super.close(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } try { doClose(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } }
public void close(int timeout) { ExecutorUtil.gracefulShutdown(executor ,timeout); close(); }
public InetSocketAddress getLocalAddress() { return localAddress; }
public InetSocketAddress getBindAddress() { return bindAddress; }
public int getAccepts() { return accepts; }
public int getIdleTimeout() { return idleTimeout; }
@Override public void connected(Channel ch) throws RemotingException { Collection<Channel> channels = getChannels(); if (accepts > 0 && channels.size() > accepts) { logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts); ch.close(); return; } super.connected(ch); }
@Override public void disconnected(Channel ch) throws RemotingException { Collection<Channel> channels = getChannels(); if (channels.size() == 0){ logger.warn("All clients has discontected from " + ch.getLocalAddress() + ". You can graceful shutdown now."); } super.disconnected(ch); }
} |
3. 客户端不能优雅停机的原因:
DubboInvoker停机的时候直接调用了client.close()方法应该调用client.close(long timeout)方法
public class DubboInvoker<T> extends AbstractInvoker<T> { public void destroy() { //防止client被关闭多次.在connect per jvm的情况下,client.close方法会调用计数器-1,当计数器小于等于0的情况下,才真正关闭 if (super.isDestroyed()){ return ; } else { //dubbo check ,避免多次关闭 destroyLock.lock(); try{ if (super.isDestroyed()){ return ; } super.destroy(); if (invokers != null){ invokers.remove(this); } for (ExchangeClient client : clients) { try { client.close(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } }
}finally { destroyLock.unlock(); } } } } |
HeaderExchangeChannel类中判断HeaderExchangeChannel.this是否在DefaultFuture中。实际上在处理请求的时候是将HeaderExchangeChannel对象中channel成员变量放到DefaultFuture中的
final class HeaderExchangeChannel implements ExchangeChannel { public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // create request. Request req = new Request(); req.setVersion("2.0.0"); req.setTwoWay(true); req.setData(request); DefaultFuture future = new DefaultFuture(channel, req, timeout); try{ channel.send(req); }catch (RemotingException e) { future.cancel(); throw e; } return future; }
public void close(int timeout) { if (closed) { return; } closed = true; if (timeout > 0) { long start = System.currentTimeMillis(); while (DefaultFuture.hasFuture(HeaderExchangeChannel.this) && System.currentTimeMillis() - start < timeout) { try { Thread.sleep(10); } catch (InterruptedException e) { logger.warn(e.getMessage(), e); } } } close(); } } |
4. 客户端的修改
修改下面高量代码:
public class DubboInvoker<T> extends AbstractInvoker<T> { public void destroy() { //防止client被关闭多次.在connect per jvm的情况下,client.close方法会调用计数器-1,当计数器小于等于0的情况下,才真正关闭 if (super.isDestroyed()){ return ; } else { //dubbo check ,避免多次关闭 destroyLock.lock(); try{ if (super.isDestroyed()){ return ; } super.destroy(); if (invokers != null){ invokers.remove(this); } for (ExchangeClient client : clients) { try { logger.info("start to shutdown:" + getServerShutdownTimeout()); client.close(getServerShutdownTimeout()); } catch (Throwable t) { logger.warn(t.getMessage(), t); } }
}finally { destroyLock.unlock(); } } }
protected static int getServerShutdownTimeout() { int timeout = Constants.DEFAULT_SERVER_SHUTDOWN_TIMEOUT; String value = ConfigUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY); if (value != null && value.length() > 0) { try{ timeout = Integer.parseInt(value); }catch (Exception e) { } } else { value = ConfigUtils.getProperty(Constants.SHUTDOWN_WAIT_SECONDS_KEY); if (value != null && value.length() > 0) { try{ timeout = Integer.parseInt(value) * 1000; }catch (Exception e) { } } }
return timeout; } } |
final class HeaderExchangeChannel implements ExchangeChannel { // graceful close public void close(int timeout) { if (closed) { return; } closed = true; if (timeout > 0) { long start = System.currentTimeMillis(); while (DefaultFuture.hasFuture(channel) && System.currentTimeMillis() - start < timeout) { try { Thread.sleep(10); } catch (InterruptedException e) { logger.warn(e.getMessage(), e); } } } close(); } } |
注意服务端和客户端都需要加上配置dubbo.service.shutdown.wait=60000,设定停机等待时间
相关推荐
2. **配置不一致**:DubboConsumer设置的超时时间为91毫秒,而DubboProvider的超时时间设置为600毫秒,这之间的差异可能导致服务调用出现错误。 3. **雪崩效应触发**:当DubboConsumer尝试调用DubboProvider获取商品...
本项目结合两者,实现了一个针对`Dubbo`服务提供者(`Provider`)下线的监控系统,并提供了邮件预警功能,确保系统能够及时发现并处理服务异常情况,减少因服务不可用导致的业务中断。 1. **Zookeeper在Dubbo中的作用...
1. **大型分布式系统**:在电商、金融、社交等业务场景中,Dubbo可以帮助构建大规模、高并发的分布式服务架构。 2. **微服务改造**:在进行微服务化改造时,Dubbo作为服务治理工具,能够有效管理和协调各个独立服务...
dubbo注册中心服务ip和实际服务提供者ip不一致问题 网上收集的资料
Dubbo分布式服务架构,对于研究大型Web服务器的并发技术的同学们有帮助。
【标题】"dubbo-admin:dubbo服务监控" 指的是Dubbo生态系统中的一个关键组件,用于管理和监控Dubbo服务。Dubbo是阿里巴巴开源的一个高性能、轻量级的服务治理框架,广泛应用于分布式系统中。它提供了一整套服务治理...
本实例将深入探讨如何利用Dubbo进行服务的拆分,通过"用户服务"(dubbo-user)和"订单服务"(dubbo-order)两个具体的示例,揭示服务拆分的实施步骤和关键点。 1. 服务拆分原则 服务拆分的目标是提高系统的可扩展性...
Dubbo-api服务则是这个框架中实现业务逻辑的部分,它定义了服务的接口,并提供了服务的实现。开发者可以通过定义服务接口,编写服务提供者和消费者代码,实现服务的发布和调用。 在"下载即运行"的概念下,dubbo-api...
在使用Dubbo框架时,我们可能会遇到以下情况:当Zookeeper、Dubbo-admin、生产者和消费者都在内网中的时候,生产者的生产和消费是没有问题的。然而,当我们将Zookeeper和生产者放到远程服务器上,然后消费者在访问...
在IT行业中,Dubbo是一个广泛使用的高性能Java RPC框架,它使得服务间的调用变得简单而高效。本主题聚焦于“dubbo本地测试服务”,旨在帮助开发者在本地环境中进行服务提供者连接、数据发送测试以及接口调试,从而...
- 发送信号给Dubbo服务进程,如发送`SIGTERM`或`SIGKILL`信号来优雅地关闭服务或强制终止服务。 - 清理资源,例如释放端口、删除临时文件等。 **目录结构的重要性** 在描述中提到了文档目录应符合`bin`、`log`、`...
3. **服务测试**:Dubbo支持Mock服务,可以在不依赖真实服务的情况下进行单元测试,提高了开发效率。 4. **微服务架构**:在微服务架构中,Dubbo可以帮助拆分大型系统为多个小型、独立的服务,每个服务都可以独立...
【标题】"dubbo资源 dubbo-admin dubbo demo" 提供的是关于Apache Dubbo的相关素材,主要包括了Dubbo-admin的管理和示例项目。Dubbo是一个高性能、轻量级的开源Java RPC框架,它提供了丰富的服务治理功能,是阿里...
首先,我们要明确的是,Dubbo的核心概念是服务,它将业务逻辑封装为可远程调用的服务。服务提供者(Provider)是暴露服务的一方,它将本地实现注册到服务注册中心,使得服务消费者能够发现并调用。服务消费者...
- 引入 Eureka 客户端依赖,使 Dubbo 服务能与 Eureka 通信。 - 在 Dubbo 服务提供者的配置文件中,添加 Eureka 相关配置,如应用名、Eureka 服务器地址等。 - 修改服务提供者的启动类,加入 @EnableEurekaClient...
在分布式系统领域,Dubbo是一个广泛使用的开源Java框架,它主要设计用于构建高性能、轻量级的服务治理平台。本篇文章将深入探讨Dubbo服务提供方(Provider)和服务消费方(Consumer)的核心概念、实现原理以及如何...
通过`dubbo-sample`代码,开发者不仅可以学习到Dubbo的基本用法,还能了解到服务治理的实践细节,从而更好地在实际项目中应用Dubbo,构建高效稳定的分布式系统。在实际开发中,可以根据需求选择合适的模块进行学习和...
在IT行业中,分布式服务框架Dubbo和对象存储服务OSS(Object Storage Service)是两种非常重要的技术。本项目结合了这两个技术,实现了文件的本地上传以及通过Dubbo接口将文件发送到远程服务,并最终存储在OSS上。...
《Dubbo分布式服务框架开发者学习文档》是一份深入解析Dubbo技术体系的重要资料,它涵盖了Dubbo的核心功能和实现机制,对于理解并掌握这个流行的Java分布式服务框架具有极高的价值。Dubbo,作为阿里巴巴开源的一款高...
然而,业务拆分后发现许多共用的模块很难复用起来,这时候分布式服务架构登场了,将核心业务抽取出来,作为独立的服务,逐渐形成稳定的服务中心。 RPC(Remote Procedure Call)是指远程过程调用,是一种进程间通信...