`
y806839048
  • 浏览: 1127102 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

dubbo源码解析

阅读更多

 

dubbo源码:

 

register:

 

ZookeeperRegistry这是zk的节点操作类,所有的注册,订阅对应到节点上

 

ZookeeperRegistryFactory  通过这个工厂可以获取ZookeeperRegistry

 

Monitor;

DubboMonitor这是监控的实际实现类,打包成Statistics,收集信息

DubboMonitorFactory这是包装DubboMonitor获取实力的工厂类

 

 

 

 

rpc:

 

服务根据url,端口信息,反射暴露成Exporter(由invoker而来)

Exporter<?> rpcExporter = protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("rmi://127.0.0.1:9001/TestService")));

服务实例反射成对象,然后用spring纳入对象的方法将此对象到jvm

 

客户端通过接口url等元数据获取服务的代理对象,用spring纳入jvm对象管理,服务代理对象和服务对象之间通过netty通信(由此可见invoker集成了netty通信框架)(由invoker而来)

service = proxy.getProxy(protocol.refer(DemoService.class, URL.valueOf("rmi://127.0.0.1:9001/TestService")));

    

 服务端:invoker的代理对象加功之后生成export---服务端被调用的是invoker的代理对象

消费端:invoker的代理对象用于通信

通过invoker通信(invoker)(代理对象在jvm中,直接获取即可,然后用代理的形式调用(反射对象已经有了)),--到了服务端根据参数信息获取之前生成的服务代理实例---调用

 

 

 

protocol.export 最终通过spring  rmi暴露服务,转化成内存对象实例

 protected <T> Runnable doExport(final T impl, Class<T> type, URL url) throws RpcException {

        final RmiServiceExporter rmiServiceExporter = new RmiServiceExporter();

        rmiServiceExporter.setRegistryPort(url.getPort());

        rmiServiceExporter.setServiceName(url.getPath());

        rmiServiceExporter.setServiceInterface(type);

        rmiServiceExporter.setService(impl);

        try {

            rmiServiceExporter.afterPropertiesSet();//spring纳入对象的方法

        } catch (RemoteException e) {

            throw new RpcException(e.getMessage(), e);

        }

        return new Runnable() {

            public void run() {

                try {

                    rmiServiceExporter.destroy();

                } catch (Throwable e) {

                    logger.warn(e.getMessage(), e);

                }

            }

        };

    }

 

 

 

proxy.getProxy通过jdk代理获取暴露在内存的实例

 

 public <T> T getProxy(Invoker<T> invoker) throws RpcException {

        T proxy = proxyFactory.getProxy(invoker);

        if (GenericService.class != invoker.getInterface()) {

            String stub = invoker.getUrl().getParameter(Constants.STUB_KEY, invoker.getUrl().getParameter(Constants.LOCAL_KEY));

            if (ConfigUtils.isNotEmpty(stub)) {

                Class<?> serviceType = invoker.getInterface();

                if (ConfigUtils.isDefault(stub)) {

                    if (invoker.getUrl().hasParameter(Constants.STUB_KEY)) {

                        stub = serviceType.getName() + "Stub";

                    } else {

                        stub = serviceType.getName() + "Local";

                    }

                }

                try {

                    Class<?> stubClass = ReflectUtils.forName(stub);

                    if (!serviceType.isAssignableFrom(stubClass)) {

                        throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + serviceType.getName());

                    }

                    try {

                        Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType);

                        proxy = (T) constructor.newInstance(new Object[]{proxy});

 

 

....

}}}}}

 

 

 

cluster:

 

 

 

 

AbstractClusterInvoker  这个是主要业务类

 

 

 

AbstractLoadBalance:

 

  public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {//配置哪种转发策略就是这个配置的实例调用这个方法

        if (invokers == null || invokers.size() == 0)

            return null;

        if (invokers.size() == 1)

            return invokers.get(0);

        return doSelect(invokers, url, invocation);//这里根据4中不同的策略配置选用不同的类的doSelect。select是谁调用就是谁,选择出相应的invoker

    }

 

//一般第三方框架,会自己的方法1中调用自己的抽象方法2,这个抽象方法具体的实现类看配置中注入的,这个类调用方法1,1调用2,那么调用2的这个对象就是调用1的

 

    protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);

 

 

 

1,ConsistentHashLoadBalance

 */

public class ConsistentHashLoadBalance extends AbstractLoadBalance {

 

    private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();

 

    @SuppressWarnings("unchecked")

    @Override

    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {

        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();

        int identityHashCode = System.identityHashCode(invokers);

        ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);

        if (selector == null || selector.identityHashCode != identityHashCode) {

            selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));

            selector = (ConsistentHashSelector<T>) selectors.get(key);

        }

        return selector.select(invocation);

    }

    }

 

 

 

 public Invoker<T> select(Invocation invocation) {

            String key = toKey(invocation.getArguments());

            byte[] digest = md5(key);

            return selectForKey(hash(digest, 0));

        }

 

 

private Invoker<T> selectForKey(long hash) {

            Invoker<T> invoker;

            Long key = hash;

            if (!virtualInvokers.containsKey(key)) {

                SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.tailMap(key);

                if (tailMap.isEmpty()) {

                    key = virtualInvokers.firstKey();

                } else {

                    key = tailMap.firstKey();

                }

            }

            invoker = virtualInvokers.get(key);

            return invoker;

        }

 

 

 

 

 ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {

            this.virtualInvokers = new TreeMap<Long, Invoker<T>>();

            this.identityHashCode = identityHashCode;

            URL url = invokers.get(0).getUrl();

            this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);

            String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));

            argumentIndex = new int[index.length];

            for (int i = 0; i < index.length; i++) {

                argumentIndex[i] = Integer.parseInt(index[i]);

            }

            for (Invoker<T> invoker : invokers) {

                String address = invoker.getUrl().getAddress();

                for (int i = 0; i < replicaNumber / 4; i++) {

                    byte[] digest = md5(address + i);

                    for (int h = 0; h < 4; h++) {////url地址加上一定的随机盐值作为key

                        long m = hash(digest, h);

                        virtualInvokers.put(m, invoker);

                    }

                }

            }

        }

 

 

 

 

2,RandomLoadBalance

 

  protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {

        int length = invokers.size(); // Number of invokers

        int totalWeight = 0; // The sum of weights

        boolean sameWeight = true; // Every invoker has the same weight?

        for (int i = 0; i < length; i++) {

            int weight = getWeight(invokers.get(i), invocation);

            totalWeight += weight; // Sum

            if (sameWeight && i > 0

                    && weight != getWeight(invokers.get(i - 1), invocation)) {

                sameWeight = false;

            }

        }

        if (totalWeight > 0 && !sameWeight) {

            // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.

            int offset = random.nextInt(totalWeight);

            // Return a invoker based on the random value.

            for (int i = 0; i < length; i++) {

                offset -= getWeight(invokers.get(i), invocation);

                if (offset < 0) {

                    return invokers.get(i);

                }

            }

        }

        // If all invokers have the same weight value or totalWeight=0, return evenly.

        return invokers.get(random.nextInt(length));//范围内的随机数

    }

 

 

 

 

选择到了invoker,然后调用,Result中含有返回,或者异常信息给消费方:

 

public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {

 

    public FailfastClusterInvoker(Directory<T> directory) {

        super(directory);

    }

 

    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

        checkInvokers(invokers, invocation);

        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);

        try {

            return invoker.invoke(invocation);

        } catch (Throwable e) {

            if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.

                throw (RpcException) e;

            }

            throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);

        }

    }

}

 

 

 

 

test类入手,配置文件入手看源码,有test类可以在 @Test中debug源码,debug  as  运行

 

cache:

 

框架的写法支持配置化的原因就是应用了抽象工厂模式,--》代码中用抽象(多态),通过配置注入具体工厂类,具体工厂类中注入这个工厂要生产的具体类使用(抽象工厂类)。

 

 

应用:

 

CacheFilter  缓存应用具体工厂获取具体缓存,操作缓存

public class CacheFilter implements Filter {

 

    private CacheFactory cacheFactory;////前台配置注入哪种具体缓工厂(具体工厂中注入具体缓存),框架中就用哪一个具体工厂(默认的是注入某一个具体类,也可是抽象类)

 

 

 

    }

 

 

Cache(获取缓存中的value)  CacheFactory(获取缓存)  缓存工厂(接口)

 

AbstractCacheFactory (抽象类)  implements CacheFactory(接口)

 

 

 

可选的多种配置:

 

 

具体的工厂,缓存(具体用哪一个根据配置)

 

1,

ThreadLocalCache implements Cache

ThreadLocalCacheFactory extends AbstractCacheFactory 

 

 

2,

public class LruCache implements Cache 

public class LruCacheFactory extends AbstractCacheFactory 

 

 

 

3,

 

public class JCache implements com.alibaba.dubbo.cache.Cache 

 

public class JCacheFactory extends AbstractCacheFactory

 

 

 

 

同理

Validation:

 

 

ValidationFilter  应用

 

 

@Activate(group = {Constants.CONSUMER, Constants.PROVIDER}, value = Constants.VALIDATION_KEY, order = 10000)

public class ValidationFilter implements Filter {

 

    private Validation validation;

 

    public void setValidation(Validation validation) {

        this.validation = validation;

    }

    }

 

 

public interface Validation  接口获取  Validator

 

public interface Validator  接口执行校验

 

 

public abstract class AbstractValidation implements Validation  抽象工厂类

 

 

 

 

具体类:

 

public class JValidation extends AbstractValidation 具体工厂   

        return new JValidator(url);

 

public class JValidator implements Validator   具体校验

 

 

 

 

remoting:

 

 

HeartbeatHandlerTest:

 public void testHeartbeat() throws Exception {

        URL serverURL = URL.valueOf("header://localhost:55555");

        serverURL = serverURL.addParameter(Constants.HEARTBEAT_KEY, 1000);

        TestHeartbeatHandler handler = new TestHeartbeatHandler();

        server = Exchangers.bind(serverURL, handler);

        System.out.println("Server bind successfully");

 

        client = Exchangers.connect(serverURL);

        Thread.sleep(10000);

        System.err.println("++++++++++++++ disconnect count " + handler.disconnectCount);

        System.err.println("++++++++++++++ connect count " + handler.connectCount);

        Assert.assertTrue(handler.disconnectCount == 0);

        Assert.assertTrue(handler.connectCount == 1);

    }

 

 

大多数源码用map的时候用享元模式,如下:

 

 

 private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap = new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>();

 

 

   static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) {

        if (ch == null) {

            return null;

        }

        NettyChannel ret = channelMap.get(ch);

        if (ret == null) {

            NettyChannel nc = new NettyChannel(ch, url, handler);

            if (ch.isConnected()) {

                ret = channelMap.putIfAbsent(ch, nc);

            }

            if (ret == null) {

                ret = nc;

            }

        }

        return ret;

    }

 

 

异步的方式返回:

 

 

  public void testHandler() throws Exception {

        //Thread.sleep(20000);

        /*client.request("world\r\n");

        Future future = client.request("world", 10000);

        String result = (String)future.get();

        Assert.assertEquals("Did you say 'world'?\r\n",result);*/

    }

 

 

 public void testFuture() throws Exception {

        ResponseFuture future = client.request(new World("world"));

        Hello result = (Hello) future.get();

        Assert.assertEquals("hello,world", result.getName());

    }

 

 

MockedClient:

 

 public ResponseFuture request(Object msg, int timeout) throws RemotingException {

        this.invoked = msg;

        return new ResponseFuture() {

            public Object get() throws RemotingException {

                return received;

            }

 

            public Object get(int timeoutInMillis) throws RemotingException {

                return received;

            }

 

            public boolean isDone() {

                return true;

            }

 

            public void setCallback(ResponseCallback callback) {

            }

        };

    }

 

 

 

HeaderExchangeChannel:

 

 public ResponseFuture request(Object request, int timeout) throws RemotingException {

        if (closed) {

            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");

        }

        // create request.

        Request req = new Request();

        req.setVersion("2.0.0");

        req.setTwoWay(true);

        req.setData(request);

        DefaultFuture future = new DefaultFuture(channel, req, timeout);///这里有channel,自然future.get可以找到这channel获取channel中的返回值

        try {

            channel.send(req);

        } catch (RemotingException e) {

            future.cancel();

            throw e;

        }

        return future;

    }

 

 

  public DefaultFuture(Channel channel, Request request, int timeout) {

        this.channel = channel;

        this.request = request;

        this.id = request.getId();

        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);

        // put into waiting map.

        FUTURES.put(id, this);

        CHANNELS.put(id, channel);

    }

 

 

public class DefaultFuture implements ResponseFuture {

 static {

        Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer");

        th.setDaemon(true);

        th.start();

    }

 private static class RemotingInvocationTimeoutScan implements Runnable {

 

        public void run() {

            while (true) {////不断循环获取结果,放在固定的通道

                try {

                    for (DefaultFuture future : FUTURES.values()) {

                        if (future == null || future.isDone()) {

                            continue;

                        }

                        if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {

                            // create exception response.

                            Response timeoutResponse = new Response(future.getId());

                            // set timeout status.

                            timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);

                            timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));

                            // handle response.

                            DefaultFuture.received(future.getChannel(), timeoutResponse);

                        }

                    }

                    Thread.sleep(30);

                } catch (Throwable e) {

                    logger.error("Exception when scan the timeout invocation of remoting.", e);

                }

            }

        }

    }

 

}

 

 

 

 

 

 

 

  public Object get() throws RemotingException {

        return get(timeout);

    }

 

    public Object get(int timeout) throws RemotingException {

        if (timeout <= 0) {

            timeout = Constants.DEFAULT_TIMEOUT;

        }

        if (!isDone()) {///只要没有做完就不断循环,没有返回,主线程就等待,类似Future.get() 通过lock阻塞一样

            long start = System.currentTimeMillis();

            lock.lock();

            try {

                while (!isDone()) {

                    done.await(timeout, TimeUnit.MILLISECONDS);

                    if (isDone() || System.currentTimeMillis() - start > timeout) {

                        break;

                    }

                }

            } catch (InterruptedException e) {

                throw new RuntimeException(e);

            } finally {

                lock.unlock();

            }

            if (!isDone()) {

                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));

            }

        }

        return returnFromResponse();

    }

 

 

 

 

 

 

 

 

MockedClient implements ExchangeClient

HeaderExchangeClient implements ExchangeClient

LazyConnectExchangeClient implements ExchangeClient 

class ReferenceCountExchangeClient implements ExchangeClient

ExchangeClient中引用ExchangeChannel:

private final ExchangeChannel channel;

 

 

 

HeaderExchangeChannel implements ExchangeChannel 

MockChannel implements ExchangeChannel

 

 参考:

https://blog.csdn.net/paul_wei2008/article/details/70076898

 

分享到:
评论

相关推荐

    dubbo源码解析2

    ### Dubbo源码解析2 #### 一、源码阅读路径 在开始深入解析Dubbo源码之前,首先需要明确的是,Dubbo虽然代码量不算庞大,但是它涉及的技术领域非常广泛,对于初学者来说,可能需要具备一定的前置知识才能更好地...

    dubbo源码解析 1 pdf2.0

    根据给定的文件信息,以下是关于Dubbo源码解析的详细知识点: 首先,阅读Dubbo源码前需要一定的预备知识。这包括但不限于以下几点: 1. Java编程语言:掌握Java编程基础,阅读《Java编程思想》能够有助于理解源码...

    dubbo源码解析2.0.7z

    《Dubbo源码解析2.0》是一份深入剖析阿里巴巴开源框架Dubbo核心机制的资料,专注于2.0版本的源代码分析。Dubbo作为Java领域最知名的分布式服务框架之一,其设计理念、实现原理以及在实际应用中的优化策略都是开发者...

    Dubbo源码解析

    ### Dubbo源码解析——Filter和Listener的注入过程 #### 前言 本文将深入探讨Dubbo框架中Filter和Listener的注入过程。不同于普通的概念介绍或功能概述,本文聚焦于技术实现细节,旨在帮助中高级软件开发工程师...

    dubbo 源码解析

    dubbo源码一览

    dubbo源码解析2.01.pdf

    ### Dubbo源码解析知识点概览 #### 一、Dubbo简介与背景 - **背景**:Apache Dubbo是一款高性能、轻量级的开源服务框架,致力于提供高性能和透明化的RPC远程服务调用方案,以及SOA服务治理方案。Dubbo版本2.01在...

    dubbo源码分析pdf.zip

    《Dubbo源码分析》是一套深入探讨Apache Dubbo这一著名Java开源框架的书籍,旨在帮助开发者更好地理解和应用Dubbo。Dubbo是一个高性能、轻量级的服务治理框架,广泛应用于微服务架构中,以实现服务的发布、发现、...

    dubbo源码分析系列

    《Dubbo源码分析系列》是一份深入探讨Java开源框架Dubbo核心原理和技术细节的资料。Dubbo,作为阿里巴巴的一款高性能、轻量级的服务治理框架,它为分布式系统提供了服务发现、调用、负载均衡、容错等关键功能。这份...

    dubbo源码解析(含注释)

    本代码是github下载的dubbo源码,构建好了,可直接使用,其中包含一些demo和看的过程中的一些见解(注释),还包含sentinel、ZooInspector,以及新老版本的dubbo管控台,下载下来打开可直接使用,需要安装zookeeper,...

    Dubbo 源码分析

    深入理解Dubbo的源码有助于开发者优化服务性能,解决实际问题,以及更好地定制化服务。下面,我们将详细探讨Dubbo的几个关键模块。 **1. 服务提供者(Provider)** 服务提供者是Dubbo架构中的基础组件,它负责暴露...

    dubbo入门学习框架源码

    四、Dubbo源码解析 1. 远程调用(RPC):Dubbo的Remoting层处理服务间的网络通信,包括协议解析、序列化、连接管理等。Protocol接口定义了服务调用的基本操作,如refer、invoke等,而Exporter和Invoker接口则封装了...

    dubbo 视频下载

    【标题】"Dubbo视频下载"涉及的知识点主要包括以下几个方面: 1. **Dubbo框架**:Dubbo是一款高性能、轻量级的Java RPC框架,由阿里巴巴开源,它提供了服务治理、监控、容错和负载均衡等功能,是企业级分布式应用...

Global site tag (gtag.js) - Google Analytics