dubbo源码:
register:
ZookeeperRegistry这是zk的节点操作类,所有的注册,订阅对应到节点上
ZookeeperRegistryFactory 通过这个工厂可以获取ZookeeperRegistry
Monitor;
DubboMonitor这是监控的实际实现类,打包成Statistics,收集信息
DubboMonitorFactory这是包装DubboMonitor获取实力的工厂类
rpc:
服务根据url,端口信息,反射暴露成Exporter(由invoker而来)
Exporter<?> rpcExporter = protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("rmi://127.0.0.1:9001/TestService")));
服务实例反射成对象,然后用spring纳入对象的方法将此对象到jvm
客户端通过接口url等元数据获取服务的代理对象,用spring纳入jvm对象管理,服务代理对象和服务对象之间通过netty通信(由此可见invoker集成了netty通信框架)(由invoker而来)
service = proxy.getProxy(protocol.refer(DemoService.class, URL.valueOf("rmi://127.0.0.1:9001/TestService")));
服务端:invoker的代理对象加功之后生成export---服务端被调用的是invoker的代理对象
消费端:invoker的代理对象用于通信
通过invoker通信(invoker)(代理对象在jvm中,直接获取即可,然后用代理的形式调用(反射对象已经有了)),--到了服务端根据参数信息获取之前生成的服务代理实例---调用
protocol.export 最终通过spring rmi暴露服务,转化成内存对象实例
protected <T> Runnable doExport(final T impl, Class<T> type, URL url) throws RpcException {
final RmiServiceExporter rmiServiceExporter = new RmiServiceExporter();
rmiServiceExporter.setRegistryPort(url.getPort());
rmiServiceExporter.setServiceName(url.getPath());
rmiServiceExporter.setServiceInterface(type);
rmiServiceExporter.setService(impl);
try {
rmiServiceExporter.afterPropertiesSet();//spring纳入对象的方法
} catch (RemoteException e) {
throw new RpcException(e.getMessage(), e);
}
return new Runnable() {
public void run() {
try {
rmiServiceExporter.destroy();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
};
}
proxy.getProxy通过jdk代理获取暴露在内存的实例
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
T proxy = proxyFactory.getProxy(invoker);
if (GenericService.class != invoker.getInterface()) {
String stub = invoker.getUrl().getParameter(Constants.STUB_KEY, invoker.getUrl().getParameter(Constants.LOCAL_KEY));
if (ConfigUtils.isNotEmpty(stub)) {
Class<?> serviceType = invoker.getInterface();
if (ConfigUtils.isDefault(stub)) {
if (invoker.getUrl().hasParameter(Constants.STUB_KEY)) {
stub = serviceType.getName() + "Stub";
} else {
stub = serviceType.getName() + "Local";
}
}
try {
Class<?> stubClass = ReflectUtils.forName(stub);
if (!serviceType.isAssignableFrom(stubClass)) {
throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + serviceType.getName());
}
try {
Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType);
proxy = (T) constructor.newInstance(new Object[]{proxy});
....
}}}}}
cluster:
AbstractClusterInvoker 这个是主要业务类
AbstractLoadBalance:
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {//配置哪种转发策略就是这个配置的实例调用这个方法
if (invokers == null || invokers.size() == 0)
return null;
if (invokers.size() == 1)
return invokers.get(0);
return doSelect(invokers, url, invocation);//这里根据4中不同的策略配置选用不同的类的doSelect。select是谁调用就是谁,选择出相应的invoker
}
//一般第三方框架,会自己的方法1中调用自己的抽象方法2,这个抽象方法具体的实现类看配置中注入的,这个类调用方法1,1调用2,那么调用2的这个对象就是调用1的
protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
1,ConsistentHashLoadBalance
*/
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
@SuppressWarnings("unchecked")
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
int identityHashCode = System.identityHashCode(invokers);
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
if (selector == null || selector.identityHashCode != identityHashCode) {
selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
return selector.select(invocation);
}
}
public Invoker<T> select(Invocation invocation) {
String key = toKey(invocation.getArguments());
byte[] digest = md5(key);
return selectForKey(hash(digest, 0));
}
private Invoker<T> selectForKey(long hash) {
Invoker<T> invoker;
Long key = hash;
if (!virtualInvokers.containsKey(key)) {
SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.tailMap(key);
if (tailMap.isEmpty()) {
key = virtualInvokers.firstKey();
} else {
key = tailMap.firstKey();
}
}
invoker = virtualInvokers.get(key);
return invoker;
}
ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
for (Invoker<T> invoker : invokers) {
String address = invoker.getUrl().getAddress();
for (int i = 0; i < replicaNumber / 4; i++) {
byte[] digest = md5(address + i);
for (int h = 0; h < 4; h++) {////url地址加上一定的随机盐值作为key
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
}
2,RandomLoadBalance
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // Number of invokers
int totalWeight = 0; // The sum of weights
boolean sameWeight = true; // Every invoker has the same weight?
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
totalWeight += weight; // Sum
if (sameWeight && i > 0
&& weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false;
}
}
if (totalWeight > 0 && !sameWeight) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
int offset = random.nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < length; i++) {
offset -= getWeight(invokers.get(i), invocation);
if (offset < 0) {
return invokers.get(i);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(random.nextInt(length));//范围内的随机数
}
选择到了invoker,然后调用,Result中含有返回,或者异常信息给消费方:
public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {
public FailfastClusterInvoker(Directory<T> directory) {
super(directory);
}
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
try {
return invoker.invoke(invocation);
} catch (Throwable e) {
if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
throw (RpcException) e;
}
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
}
}
}
test类入手,配置文件入手看源码,有test类可以在 @Test中debug源码,debug as 运行
cache:
框架的写法支持配置化的原因就是应用了抽象工厂模式,--》代码中用抽象(多态),通过配置注入具体工厂类,具体工厂类中注入这个工厂要生产的具体类使用(抽象工厂类)。
应用:
CacheFilter 缓存应用具体工厂获取具体缓存,操作缓存
public class CacheFilter implements Filter {
private CacheFactory cacheFactory;////前台配置注入哪种具体缓工厂(具体工厂中注入具体缓存),框架中就用哪一个具体工厂(默认的是注入某一个具体类,也可是抽象类)
}
Cache(获取缓存中的value) CacheFactory(获取缓存) 缓存工厂(接口)
AbstractCacheFactory (抽象类) implements CacheFactory(接口)
可选的多种配置:
具体的工厂,缓存(具体用哪一个根据配置)
1,
ThreadLocalCache implements Cache
ThreadLocalCacheFactory extends AbstractCacheFactory
2,
public class LruCache implements Cache
public class LruCacheFactory extends AbstractCacheFactory
3,
public class JCache implements com.alibaba.dubbo.cache.Cache
public class JCacheFactory extends AbstractCacheFactory
同理
Validation:
ValidationFilter 应用
@Activate(group = {Constants.CONSUMER, Constants.PROVIDER}, value = Constants.VALIDATION_KEY, order = 10000)
public class ValidationFilter implements Filter {
private Validation validation;
public void setValidation(Validation validation) {
this.validation = validation;
}
}
public interface Validation 接口获取 Validator
public interface Validator 接口执行校验
public abstract class AbstractValidation implements Validation 抽象工厂类
具体类:
public class JValidation extends AbstractValidation 具体工厂
return new JValidator(url);
public class JValidator implements Validator 具体校验
remoting:
HeartbeatHandlerTest:
public void testHeartbeat() throws Exception {
URL serverURL = URL.valueOf("header://localhost:55555");
serverURL = serverURL.addParameter(Constants.HEARTBEAT_KEY, 1000);
TestHeartbeatHandler handler = new TestHeartbeatHandler();
server = Exchangers.bind(serverURL, handler);
System.out.println("Server bind successfully");
client = Exchangers.connect(serverURL);
Thread.sleep(10000);
System.err.println("++++++++++++++ disconnect count " + handler.disconnectCount);
System.err.println("++++++++++++++ connect count " + handler.connectCount);
Assert.assertTrue(handler.disconnectCount == 0);
Assert.assertTrue(handler.connectCount == 1);
}
大多数源码用map的时候用享元模式,如下:
private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap = new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>();
static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) {
if (ch == null) {
return null;
}
NettyChannel ret = channelMap.get(ch);
if (ret == null) {
NettyChannel nc = new NettyChannel(ch, url, handler);
if (ch.isConnected()) {
ret = channelMap.putIfAbsent(ch, nc);
}
if (ret == null) {
ret = nc;
}
}
return ret;
}
异步的方式返回:
public void testHandler() throws Exception {
//Thread.sleep(20000);
/*client.request("world\r\n");
Future future = client.request("world", 10000);
String result = (String)future.get();
Assert.assertEquals("Did you say 'world'?\r\n",result);*/
}
public void testFuture() throws Exception {
ResponseFuture future = client.request(new World("world"));
Hello result = (Hello) future.get();
Assert.assertEquals("hello,world", result.getName());
}
MockedClient:
public ResponseFuture request(Object msg, int timeout) throws RemotingException {
this.invoked = msg;
return new ResponseFuture() {
public Object get() throws RemotingException {
return received;
}
public Object get(int timeoutInMillis) throws RemotingException {
return received;
}
public boolean isDone() {
return true;
}
public void setCallback(ResponseCallback callback) {
}
};
}
HeaderExchangeChannel:
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);///这里有channel,自然future.get可以找到这channel获取channel中的返回值
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
public DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// put into waiting map.
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
public class DefaultFuture implements ResponseFuture {
static {
Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer");
th.setDaemon(true);
th.start();
}
private static class RemotingInvocationTimeoutScan implements Runnable {
public void run() {
while (true) {////不断循环获取结果,放在固定的通道
try {
for (DefaultFuture future : FUTURES.values()) {
if (future == null || future.isDone()) {
continue;
}
if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
// create exception response.
Response timeoutResponse = new Response(future.getId());
// set timeout status.
timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
// handle response.
DefaultFuture.received(future.getChannel(), timeoutResponse);
}
}
Thread.sleep(30);
} catch (Throwable e) {
logger.error("Exception when scan the timeout invocation of remoting.", e);
}
}
}
}
}
public Object get() throws RemotingException {
return get(timeout);
}
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {///只要没有做完就不断循环,没有返回,主线程就等待,类似Future.get() 通过lock阻塞一样
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
MockedClient implements ExchangeClient
HeaderExchangeClient implements ExchangeClient
LazyConnectExchangeClient implements ExchangeClient
class ReferenceCountExchangeClient implements ExchangeClient
ExchangeClient中引用ExchangeChannel:
private final ExchangeChannel channel;
HeaderExchangeChannel implements ExchangeChannel
MockChannel implements ExchangeChannel
参考:
https://blog.csdn.net/paul_wei2008/article/details/70076898
相关推荐
### Dubbo源码解析2 #### 一、源码阅读路径 在开始深入解析Dubbo源码之前,首先需要明确的是,Dubbo虽然代码量不算庞大,但是它涉及的技术领域非常广泛,对于初学者来说,可能需要具备一定的前置知识才能更好地...
根据给定的文件信息,以下是关于Dubbo源码解析的详细知识点: 首先,阅读Dubbo源码前需要一定的预备知识。这包括但不限于以下几点: 1. Java编程语言:掌握Java编程基础,阅读《Java编程思想》能够有助于理解源码...
《Dubbo源码解析2.0》是一份深入剖析阿里巴巴开源框架Dubbo核心机制的资料,专注于2.0版本的源代码分析。Dubbo作为Java领域最知名的分布式服务框架之一,其设计理念、实现原理以及在实际应用中的优化策略都是开发者...
### Dubbo源码解析——Filter和Listener的注入过程 #### 前言 本文将深入探讨Dubbo框架中Filter和Listener的注入过程。不同于普通的概念介绍或功能概述,本文聚焦于技术实现细节,旨在帮助中高级软件开发工程师...
dubbo源码一览
### Dubbo源码解析知识点概览 #### 一、Dubbo简介与背景 - **背景**:Apache Dubbo是一款高性能、轻量级的开源服务框架,致力于提供高性能和透明化的RPC远程服务调用方案,以及SOA服务治理方案。Dubbo版本2.01在...
《Dubbo源码分析》是一套深入探讨Apache Dubbo这一著名Java开源框架的书籍,旨在帮助开发者更好地理解和应用Dubbo。Dubbo是一个高性能、轻量级的服务治理框架,广泛应用于微服务架构中,以实现服务的发布、发现、...
《Dubbo源码分析系列》是一份深入探讨Java开源框架Dubbo核心原理和技术细节的资料。Dubbo,作为阿里巴巴的一款高性能、轻量级的服务治理框架,它为分布式系统提供了服务发现、调用、负载均衡、容错等关键功能。这份...
本代码是github下载的dubbo源码,构建好了,可直接使用,其中包含一些demo和看的过程中的一些见解(注释),还包含sentinel、ZooInspector,以及新老版本的dubbo管控台,下载下来打开可直接使用,需要安装zookeeper,...
深入理解Dubbo的源码有助于开发者优化服务性能,解决实际问题,以及更好地定制化服务。下面,我们将详细探讨Dubbo的几个关键模块。 **1. 服务提供者(Provider)** 服务提供者是Dubbo架构中的基础组件,它负责暴露...
四、Dubbo源码解析 1. 远程调用(RPC):Dubbo的Remoting层处理服务间的网络通信,包括协议解析、序列化、连接管理等。Protocol接口定义了服务调用的基本操作,如refer、invoke等,而Exporter和Invoker接口则封装了...
【标题】"Dubbo视频下载"涉及的知识点主要包括以下几个方面: 1. **Dubbo框架**:Dubbo是一款高性能、轻量级的Java RPC框架,由阿里巴巴开源,它提供了服务治理、监控、容错和负载均衡等功能,是企业级分布式应用...