`
huangjinjin520
  • 浏览: 71079 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

dubbo源码学习(四):暴露服务的过程

阅读更多
dubbo采用的nio异步的通信,通信协议默认为 netty,当然也可以选择 mina,grizzy。在服务端(provider)在启动时主要是开启netty监听,在zookeeper上注册服务节点,处理消费者请求,返回处理后的消息给消费者,消费者使用服务时主要是订阅服务的节点,监听zookeeper节点目录,服务端的变化时zookeeper会推送给消费者,消费者重新缓存服务地址等。服务者、消费者、zookeeper三者之间都是长连接。

下面看dubbo源码来看服务暴露的过程,服务暴露的入口为:com.alibaba.dubbo.config.ServiceConfig#export 方法,代码如下:

//是否延时暴露 
        if (delay != null && delay > 0) { 
            Thread thread = new Thread(new Runnable() { 
                public void run() { 
                    try { 
                        Thread.sleep(delay); 
                    } catch (Throwable e) { 
                    } 
                    doExport(); 
                } 
            }); 
            thread.setDaemon(true); 
            thread.setName("DelayExportServiceThread"); 
            thread.start(); 
        } else { 
            //不延时暴露,则直接暴露 
            doExport(); 
        } 
上在代码无论是延时暴露或直接暴露调用的方法是:doExport(),doExport会对解析完的配置再做一次检查,核心代码大家可以查看dubbo的源码,下面列出一小部分
/*
            检查默认设置,如果xml中没有配置<dubbo:provider
            主要是从系统环境变量中寻找是否有相应的provider的配置
*/ 
        checkDefault(); 
        //下面设置的内容如果没有配置<dubbo:provider时基本上都是Null 
        if (provider != null) { 
            if (application == null) { 
                application = provider.getApplication(); 
            } 
            if (module == null) { 
                module = provider.getModule(); 
            } 
            if (registries == null) { 
                registries = provider.getRegistries(); 
            } 
            if (monitor == null) { 
                monitor = provider.getMonitor(); 
            } 
            if (protocols == null) { 
                protocols = provider.getProtocols(); 
            } 
        } 
        if (module != null) { 
            //registries一般都会配置 
            if (registries == null) { 
                registries = module.getRegistries(); 
            } 
            if (monitor == null) { 
                monitor = module.getMonitor(); 
            } 
        } 
        if (application != null) { 
            //application一般也会配置 
            if (registries == null) { 
                registries = application.getRegistries(); 
            } 
            if (monitor == null) { 
                monitor = application.getMonitor(); 
            } 
        } 
        //是否泛化调用 
        if (ref instanceof GenericService) { 
            interfaceClass = GenericService.class; 
            if (StringUtils.isEmpty(generic)) { 
                generic = Boolean.TRUE.toString(); 
            } 
        } else { 
            try { 
                interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() 
                        .getContextClassLoader()); 
            } catch (ClassNotFoundException e) { 
                throw new IllegalStateException(e.getMessage(), e); 
            } 
            /*
                检查即将暴露的接口的方法配置,检查方法是否在接口中存在
                一般不会配置所以一般情况下methods为null
                <dubbo:service  > <dubbo:method /> </dubbo:serivce>
             */ 
            checkInterfaceAndMethods(interfaceClass, methods); 
            /*
                检查接口的引用不为空,并且必须实现的是要暴露的接口
             */ 
            checkRef(); 
            generic = Boolean.FALSE.toString(); 
        } 

所有的检查通过之后,会调用 :com.alibaba.dubbo.config.ServiceConfig#doExportUrls
/*
            将注册协议转化成url
            registry://45.119.68.23:2181/com.alibaba.dubbo.registry.RegistryService?
            application=test-dubbo&dubbo=2.5.3&pid=7648&registry=zookeeper×tamp=1462349748801
         */ 
        List<URL> registryURLs = loadRegistries(true); 
        //配置多通信协议时,都进行暴露 
        for (ProtocolConfig protocolConfig : protocols) { 
            doExportUrlsFor1Protocol(protocolConfig, registryURLs); 
        } 
doExportUrlsFor1Protocol中主要将所有的配置转化成map,然后将map转化成dubbo的统一URL,最终暴露的dubbo服务也就是这个统一的url,这个url也会注册到zookeeper的节点上,部分代码如下:
/*
    将不为null的配置对象中的属性设置到 map 中
    即将 xml 配置文件中的配置设置的值全转化成为map
    {side=provider, application=alijk-dubbo, accepts=1000,
        dubbo=2.5.3, threads=100, pid=7236, interface=cn.eoncloud.account.sdk.export.AccountService,
        threadpool=fixed, version=1.0.0, timeout=500, anyhost=true, timestamp=1462347843960}
*/ 
appendParameters(map, application); 
appendParameters(map, module); 
appendParameters(map, provider, Constants.DEFAULT_KEY); 
appendParameters(map, protocolConfig); 
appendParameters(map, this); 
......
/*
    将配置信息转化成 url ,主要根据之前map里的数据组装成url
    调用 URL#buildString方法
    dubbo://10.6.13.137:9998/cn.eoncloud.account.sdk.export.AccountService
    ?accepts=1000&anyhost=true&application=test-dubbo&dubbo=2.5.3
    &interface=cn.eoncloud.account.sdk.export.AccountService
    &methods=getAccountName,getAllTest&pid=7236&revision=1.0.0&side=provider
    &threadpool=fixed&threads=100&timeout=500×tamp=1462347843960&version=1.0.0
*/ 
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map); 
 
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) 
        .hasExtension(url.getProtocol())) { 
    url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) 
            .getExtension(url.getProtocol()).getConfigurator(url).configure(url); 

...... 
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); 
//com.alibaba.dubbo.registry.integration.RegistryProtocol#export 即将进行暴露 
Exporter<?> exporter = protocol.export(invoker); 
上面的代码核心暴露的一行代码为:protocol.export(invoker); 这个protocol的值为:RegistryProtocol,也就是暴露会跳到:RegistryProtocol.exprot中去处理,RegistryProtocol.exprot主要做两件事情:
1、开启netty服务端 。
2、创建zookeeper服务节点。
下面来看RegistryProtocol.export方法,代码如下:
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { 
        //export invoker doLocalExport调用dubboProtocol.export开启netty服务监听 
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); 
        //registry provider 
        final Registry registry = getRegistry(originInvoker); 
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); 
        //调用zodoRegister的doRegister 创建zookeeper的服务节点 
        registry.register(registedProviderUrl); 
        // 订阅override数据 
        // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。 
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); 
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl); 
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); 
        //订阅 
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); 
        //保证每次export都返回一个新的exporter实例 
        return new Exporter<T>() { 
            public Invoker<T> getInvoker() { 
                return exporter.getInvoker(); 
            } 
            public void unexport() { 
                try { 
                    exporter.unexport(); 
                } catch (Throwable t) { 
                    logger.warn(t.getMessage(), t); 
                } 
                try { 
                    registry.unregister(registedProviderUrl); 
                } catch (Throwable t) { 
                    logger.warn(t.getMessage(), t); 
                } 
                try { 
                    overrideListeners.remove(overrideSubscribeUrl); 
                    registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); 
                } catch (Throwable t) { 
                    logger.warn(t.getMessage(), t); 
                } 
            } 
        }; 
    } 
上面的代码里有一段特别重要,关键性的代码在doLocalExport中:
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); 
//此处protol为dubboProtocol 
exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker); 
从上面的代码中可以看到会调用dubboProtocol的export对服务进行暴露,这个export最终目的就是开启netty的监听,下面来看dubbo是如何一步一步开启netty的
private void openServer(URL url) { 
       // find server. ip:port 
       String key = url.getAddress(); 
       //client 也可以暴露一个只有server可以调用的服务。 
       boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true); 
       if (isServer) { 
        ExchangeServer server = serverMap.get(key); 
        if (server == null) { 
               //创建 Server 
            serverMap.put(key, createServer(url)); 
        } else { 
            //server支持reset,配合override功能使用 
            server.reset(url); 
        } 
       } 
   } 
    
   private ExchangeServer createServer(URL url) { 
       //默认开启server关闭时发送readonly事件 
       url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); 
       //默认开启heartbeat 
       url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); 
       //默认使用netty 
       String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); 
 
       if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) 
           throw new RpcException("Unsupported server type: " + str + ", url: " + url); 
       //默认使用dubbo协议编码 
       url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); 
       ExchangeServer server; 
       try { 
           //HeaderExchangeServer 在此处已经开启了Netty Server 进行监听 
           server = Exchangers.bind(url, requestHandler); 
       } catch (RemotingException e) { 
           throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); 
       } 
       str = url.getParameter(Constants.CLIENT_KEY); 
       if (str != null && str.length() > 0) { 
           Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); 
           if (!supportedTypes.contains(str)) { 
               throw new RpcException("Unsupported client type: " + str); 
           } 
       } 
       return server; 
   } 
在上面的代码中:Exchangers.bind(url, requestHandler)  默认为:HeaderExchanger.bind()
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { 
        //Transporters默认为NettyTransporter 
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); 
    } 
代码运行到这里可以看到传输方式了,dubbo默认采用的通信方式为 NettyTransporter ,再来看NettyTransporter.bind方法
public static final String NAME = "netty"; 
     
    public Server bind(URL url, ChannelHandler listener) throws RemotingException { 
        return new NettyServer(url, listener); 
    } 

已经能看到NettyServer了,dubbo在暴露服务最终开启的netty服务监听,监听消费者发送的请求,通过反射调用方法得到结果通过 tcp/ip 网络传输返回给消费者。再进入到NettyServer中我们就能看到非常传统的开启Netty服务的代码了
protected void doOpen() throws Throwable { 
        NettyHelper.setNettyLoggerFactory(); 
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); 
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); 
        //最后一个参数为 NIO 最大工作线程数 
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); 
        //netty server 启动器 
        bootstrap = new ServerBootstrap(channelFactory); 
         
        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); 
        channels = nettyHandler.getChannels(); 
        // https://issues.jboss.org/browse/NETTY-365 
        // https://issues.jboss.org/browse/NETTY-379 
        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true)); 
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() { 
            public ChannelPipeline getPipeline() { 
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this); 
                ChannelPipeline pipeline = Channels.pipeline(); 
                /*int idleTimeout = getIdleTimeout(); 
                if (idleTimeout > 10000) { 
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); 
                }*/ 
                pipeline.addLast("decoder", adapter.getDecoder()); 
                pipeline.addLast("encoder", adapter.getEncoder()); 
                pipeline.addLast("handler", nettyHandler); 
                return pipeline; 
            } 
        }); 
        // 创建一个绑定到指定地址的新通道,也就是绑定IP、端口供客户端连接 
        channel = bootstrap.bind(getBindAddress()); 
    } 
上面的代码执行完成后,netty的服务端就已经开启了,可以接收客户端的连接了,但客户端连接上来要怎么处理呢?消息接收、发送怎么处理呢?所有的处理都在上面代码的 NettyHandler类中,Nettyhandler继承了Netty包中的的SimpleChannelHandler

NettyHandler extends SimpleChannelHandler  
重写了 channelConnected、channelDisconnected、messageReceived等方法,而我们比较关注的可能是messagereceived方法,在收到消息时如何处理,但今天暂时先不看dubbo如果处理消息,只看暴露,消息处理如何实现异步通信下一节再讲。

/** 
     * 收到消息时触发 
     * @param ctx 
     * @param e 
     * @throws Exception 
     */ 
    @Override 
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); 
        try { 
            handler.received(channel, e.getMessage()); 
        } finally { 
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); 
        } 
    } 
从前面知道,开启netty服务是在RegistryProtocol.export 的 doLocalExport 中,在开启了netty服务后,就是在zookeeper上注册服务节点了,消费者在消费服务时会根据消费的接口名找到对应的zookeeper节点目录,对目录进行监听,接收推送

//registry provider 
final Registry registry = getRegistry(originInvoker); 
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); 
//调用zodoRegister的doRegister 创建zookeeper的服务节点 
registry.register(registedProviderUrl); 
// 订阅override数据 
// FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。 
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); 
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl); 
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); 
//订阅 
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); 
dubbo服务在zookeeper上的节点注册是:com.alibaba.dubbo.registry.support.FailbackRegistry#register

@Override 
    public void register(URL url) { 
        super.register(url); 
        failedRegistered.remove(url); 
        failedUnregistered.remove(url); 
        try { 
            // 向服务器端发送注册请求 
            doRegister(url); 
因为doRegister是一个抽象的方法,查看他的实现可以看到:




从上图可以看到doRegister实现有 dubbo、redis、zookeeper,这也是在我们配置时经常看到的 注册协议的配置 ,最为常用的就是 zookeeper了,所以再看ZookeeperRegistry的代码,看他的doRegistry干什么了如下
protected void doRegister(URL url) { 
       try { 
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); 
       } catch (Throwable e) { 
           throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); 
       } 
   } 
其实从上面已经可以看到 在zookeeper上面创建 节点了,默认不分组的情况下,服务结构如下:/dubbo/XXXXservice/consumers、providers



至此,dubbo的暴露基本上已经完成,开启了netty服务,注册了zookeeper的节点,就等着消费者连接上来使用了。下一节将介绍dubbo的消息发送和接收,NIO异步通讯的实现。

关注获取视频

  • 大小: 39.9 KB
  • 大小: 37.7 KB
  • 大小: 15.9 KB
分享到:
评论

相关推荐

    dubbo源码分析系列

    《Dubbo源码分析系列》是一份深入探讨Java开源...通过对《Dubbo源码分析系列》的学习,开发者不仅可以掌握Dubbo的基本使用,还能深入理解其设计思想,从而更好地在实际项目中应用和优化Dubbo,提高系统的稳定性和效率。

    dubbo源码解析2

    在开始深入解析Dubbo源码之前,首先需要明确的是,Dubbo虽然代码量不算庞大,但是它涉及的技术领域非常广泛,对于初学者来说,可能需要具备一定的前置知识才能更好地理解和学习。以下是建议的学习路径: 1. **Java...

    dubbo入门学习框架源码

    通过深入学习和实践Dubbo源码,开发者不仅可以更好地理解Dubbo的工作原理,还能在实际项目中灵活运用,解决各种复杂的分布式服务问题。Dubbo的源码阅读也是一个不断提升自身技术深度的过程,有助于成长为更优秀的...

    dubbo源码解析 1 pdf2.0

    根据给定的文件信息,以下是关于Dubbo源码解析的详细知识点: 首先,阅读Dubbo源码前需要一定的预备知识。这包括但不限于以下几点: 1. Java编程语言:掌握Java编程基础,阅读《Java编程思想》能够有助于理解源码...

    dubbo源码解析

    综上所述,阅读和理解Dubbo源码需要较为扎实的Java基础和丰富的框架应用经验,对于希望深入学习分布式系统和服务治理的开发者来说,Dubbo是一个非常好的学习案例。通过本文的详细分析,可以帮助读者快速掌握Dubbo的...

    dubbo源码包

    《Dubbo源码解析:深度探索高性能服务框架》 Dubbo作为阿里巴巴开源的一款高性能、轻量级的服务治理框架,其源码的深入理解和分析对于Java开发者来说,具有极高的学习价值。本文将针对提供的dubbo-2.4.3-sources....

    apache dubbo 3.0.7源码

    Apache Dubbo 3.0.7 是一个高性能、轻量级的开源Java RPC框架,它由阿里集团贡献并维护,现已成为Apache顶级项目。Dubbo的主要目标是提供一种...同时,掌握Dubbo源码也有助于与社区保持同步,及时了解和应用最新特性。

    dubbo的简单demo源码

    【标题】:Dubbo的简单demo源码解析 ...总结来说,这个“dubbo的简单demo源码”是学习Dubbo的入门教程,通过实践,你可以了解到Dubbo的核心组件和服务交互方式,为后续的微服务架构设计和开发打下基础。

    dubbo-master源码

    通过对Dubbo源码的阅读和分析,我们可以更深入地理解服务治理的原理,学习到如何设计高可用、可扩展的微服务架构,这对于提升我们的Java编程技能和系统设计能力具有重要的实践意义。同时,Dubbo也是许多企业级项目的...

    MyDubbo:dubbo源码学习笔记

    《MyDubbo:Dubbo源码学习笔记》 在Java开发领域,Dubbo是一个非常知名的分布式服务框架,它由阿里巴巴开源并广泛应用于大型企业系统。本篇笔记将深入探讨Dubbo的核心概念、工作原理以及源码解析,帮助开发者更好地...

    dubbo 源码

    总结,Dubbo的源码学习不仅可以帮助我们理解分布式服务治理的实现原理,还能提升我们的系统设计能力。通过深入分析`Provider`、`Consumer`、`Registry`、`Proxy`等核心组件,我们可以更好地运用Dubbo解决实际项目中...

    dubbo教程视频源码

    根据提供的链接,我们可以推测这是一份包含了 Dubbo 教程视频、文档和源码的学习资料。这类资源对于初学者来说是非常宝贵的,因为它不仅覆盖了 Dubbo 的基础知识,还深入讲解了高级特性和系统架构等内容,并且通过...

    dubbo示例源码及相关文档

    通过分析源码,我们可以了解Dubbo如何通过Spring整合,如何定义和消费服务,以及服务注册、发现、调用等过程。同时,配合文档,能更全面地掌握Dubbo的各种特性,为实际项目中的应用打下坚实基础。在实践中,不断探索...

    dubbo-die::star::star::star::star:dubbo源码分析

    《Dubbo源码分析:深度探索服务治理框架的精髓》 Dubbo,作为阿里巴巴开源的一款高性能、轻量级的服务治理框架,一直以来都是Java开发者在分布式系统领域的热门选择。本文将深入探讨Dubbo的核心功能和设计理念,并...

    dubbo-master.zip

    通过学习和研究这个压缩包,你可以深入了解Dubbo的工作原理、服务治理机制以及如何通过源码定制和扩展Dubbo功能。这对于开发者来说,无论是调试问题、优化性能还是开发新的特性,都是极其宝贵的资源。同时,通过`...

    dubbo源码实例

    2. 服务暴露:通过`@Service`注解标记实现了服务接口的类,配置服务元数据,如版本、组等信息,使服务对外暴露。 3. Spring整合:Dubbo与Spring框架深度集成,可以通过Spring配置文件或注解来管理服务提供者,使得...

    dubbo 分布式搭建源码

    【Dubbo 分布式搭建源码详解】...通过对 Dubbo 的源码学习,我们可以深入了解其实现原理,从而更好地应用和优化分布式系统。这个提供的 "dubbo-master" 压缩包包含了 Dubbo 的完整源码,可供开发者进行深度学习和研究。

    dubbo-2.5.3源码

    总结,Dubbo 2.5.3源码的学习能让我们更深入理解分布式服务治理的原理,对于提升Java开发者的架构设计能力、优化系统性能有着极大的帮助。通过对源码的分析,我们可以了解到Dubbo如何优雅地处理服务发现、调用、监控...

    dubbo应用实例源码工程

    通过阅读这篇文章,开发者可以学习到如何配置和启动Dubbo服务,以及如何进行服务调用和服务治理。 在这个源码工程中,我们可以深入学习以下关键知识点: 1. **Dubbo服务提供者和服务消费者**:Dubbo的核心概念是...

Global site tag (gtag.js) - Google Analytics