对于Dubbo的服务提供者,主要有两种线程池,一种是IO处理线程池,另一种是服务调用线程池。而作为IO处理线程池,由于Dubbo基于Mina、Grizzly和Netty框架做IO组件,IO线程池都是基于这些框架来配置,比如Netty中的boss和worker线程池,Dubbo选择的是“无边界”的CachedThreadPool,这意味着对所有服务请求先做到“来者不拒”,但它进一步限制了IO处理的线程数,默认是核数+1,本文拿Netty组件举例,代码见NettyServer#open:
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
<!--StartFragment-->ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));<!--EndFragment-->
那么当请求到达Dubbo的提供者端,会进行哪些处理呢?用过Dubbo框架的同学哪怕没看过源码也能猜出个大概,请求处理会包含三部分:请求解析、服务调用和应答。请求解析需要确认请求的正确性,比如请求解码(比如协议是否正确)、请求是否合法(提供者端是否有该服务;该服务是否需要token验证来防止绕过注册中心 直连);服务调用过程就是提供者作为服务端的一个服务处理过程,这个过程需要用到前面说到的第二种服务调用线程池来执行,该过程通过线程池来和请求解析过程分开,这样做的目的一是过程解耦,二是可以做到服务提供者超时返回,为了让用户能对该过程进行拦截,Dubbo特意通过SPI实现了Filter机制,用户可以通过自定义Filter来对服务调用进行日志记录和监控,当然前提是服务调用线程池还没被请求打满;应答过程主要是对结果进行编码并返回。
我们先来仔细看看请求解析过程,我们这里参照的是Netty,使用过Netty的同学都知道,如果想自定义流处理、协议的编解码功能,需要自己去实现一些适配类,比如Netty3.x中的 SimpleChannelHandler和Netty4.x中的ChannelInboundHandlerAdapter,Dubbo的2.5.3版本依赖的是Netty3.x版本,我们这里直接可以从NettyServer#doOpen方法看出Dubbo向Netty中注册了哪些Handler,这些Handler是请求数据处理的第一道屏障:
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
// 无界的Netty boss线程池,负责和消费者建立新的连接
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
// 无界的Netty worker线程池,负责连接的数据交换
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
// Netty服务启动类
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
// 解码处理器,{@link InternalDecoder}
pipeline.addLast("decoder", adapter.getDecoder());
// 编码处理器,{@link InternalEncoder}
pipeline.addLast("encoder", adapter.getEncoder());
// 数据解析后流程处理的起点,{@link NettyHandler}
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// 绑定端口
channel = bootstrap.bind(getBindAddress());
}
从这里可以看出,如果我们在一个JVM进程只暴露一个Dubbo服务端口,那么一个JVM进程只会有一个NettyServer实例,也会只有一个NettyHandler实例,但如果应用即是消费者,也是提供者,那么将会存在多个NettyHandler。从上面代码也可以看出,Dubbo在Netty的Pipeline中只注册了三个Handler,而Dubbo内部也定义了一个ChannelHandler接口,用来将和Channel相关的处理串起来,而第一个ChannelHandler就是由NettyHandler来调用的。有趣的是NettyServer本身也是一个ChannelHandler。当Dubbo将Spring容器中的服务实例做了动态代理的处理后,就会通过NettyServer#doOpen来暴露服务端口,再接着将服务注册到注册中心。这些步骤做完后,Dubbo的消费者就可以来和提供者建立连接了,当然是消费者来主动建立连接,而提供者在初始化连接后会调用NettyHandler#channelConnected方法来创建一个NettyChannel:
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
// 从channelMap中创建或获取一个NettyChannel
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
if (channel != null) {
// 如果在channels中没有则创建,注意这里的key是远端消费者的地址,即IP+端口
channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress()), channel);
}
// 这里的handler正是创建此NettyHandler的NettyServer
handler.connected(channel);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
NettyHandler有两个重要的属性用来保存当前的Netty Channel和Netty Channel和Dubbo内部NettyChannel的映射关系:
private final org.jboss.netty.channel.Channel channel;
private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap = new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>();
从上面的代码可以看出,就像Netty和Dubbo都有自己的ChannelHandler一样,Netty和Dubbo也有着自己的Channel。该方法最后会调用NettyServer#connected方法来检查新添加channel后是否会超出提供者配置的accepts配置,如果超出,则直接打印错误日志并关闭该Channel,这样的话消费者端自然会收到连接中断的异常信息,详细可以见AbstractServer#connected方法。这里我们也可以看出,消费者和提供者建立的每一个TCP连接都放到了NettyHandler的channels中。还记得我们在《Dubbo源代码实现五》中提到的,消费者和提供者之间默认只会建立一条TCP长连接,为了增加消费者调用服务提供者的吞吐量,可以在消费者的dubbo:reference中配置connections来单独增加消费者和服务提供者的TCP长连接吗?作为服务提供者,也同样可以限制所接收的连接数,例如:
<dubbo:protocol name="dubbo" port="8888" threads="500" accepts="200"/>
需要注意的是,这种配置的长连接不会像JDK中的线程池那样按需来建立,而是在消费者启动后就全部创建好,的如果消费者“太过分”的话,即消费者配置的连接数已经超过了服务提供者的accepts,那么多余连接的建立时会遭到提供者拒绝,于是消费者将收到如下异常:
[18/06/17 04:02:55:055 CST] DubboClientReconnectTimer-thread-2 WARN transport.AbstractClient: [DUBBO] client reconnect to 提供者IP+端口 find error . url: dubbo://提供者IP+端口/...略...
com.alibaba.dubbo.remoting.RemotingException: Failed connect to server /提供者IP+端口 from NettyClient 消费者IP using dubbo version 2.5.3, cause: Connect wait timeout: 1500000ms.
at com.alibaba.dubbo.remoting.transport.AbstractClient.connect(AbstractClient.java:282)
at com.alibaba.dubbo.remoting.transport.AbstractClient$1.run(AbstractClient.java:145)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
当连接建立完毕后,消费者就可以请求提供者的服务了,当请求到来,提供者这边会依次经过如下Handler的处理:
NettyCodecAdapter$InternalDecoder#messageReceived:对请求进行解码。
NettyHandler#messageReceived:根据Netty Channel来获取Dubbo的Channel,并开始调用Dubbo的Handler。
AbstractPeer#received:如果服务已经关闭,则返回,否则调用下一个Handler来处理。
MultiMessageHandler#received:如果是批量请求,则依次对请求调用下一个Handler来处理。
AllChannelHandler#received:该Dubbo的Handler非常重要,因为从这里是IO线程池和服务调用线程池的边界线,该Handler将服务调用操作直接提交给服务调用线程池并返回。
我们这里仔细看一下AllChannelHandler#received:
public void received(Channel channel, Object message) throws RemotingException {
// 获取服务调用线程池
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
我们注意到这里对execute进行了异常捕获,这是因为IO线程池是无界的(0-Integer.MAX_VALUE),但服务调用线程池是有界的,所以进行execute提交可能会遇到RejectedExecutionException异常,这也是为什么我们会在dubbo输出的日志中看到如下片段:
2017-06-16 22:01:03-WARN org.jboss.netty.channel.DefaultChannelPipeline- [DUBBO] An exception was thrown by a user handler while handling an exception event ([id: 0xad26fbf0, /消费者IP+端口=> /提供者IP+端口] EXCEPTION: com.alibaba.dubbo.remoting.ExecutionException: class com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler error when process received event .), dubbo version: 2.5.3, current host: 127.0.0.1 - New I/O worker #95
com.alibaba.dubbo.remoting.ExecutionException: class com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler error when process caught event .
at com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler.caught(AllChannelHandler.java:67)
at com.alibaba.dubbo.remoting.transport.AbstractChannelHandlerDelegate.caught(AbstractChannelHandlerDelegate.java:44)
at com.alibaba.dubbo.remoting.transport.AbstractChannelHandlerDelegate.caught(AbstractChannelHandlerDelegate.java:44)
at com.alibaba.dubbo.remoting.transport.AbstractPeer.caught(AbstractPeer.java:127)
at com.alibaba.dubbo.remoting.transport.netty.NettyHandler.exceptionCaught(NettyHandler.java:112)
at com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter$InternalDecoder.exceptionCaught(NettyCodecAdapter.java:165)
at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:525)
at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:48)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter$InternalDecoder.messageReceived(NettyCodecAdapter.java:148)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.RejectedExecutionException: Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-提供者IP+端口, Pool Size: 300 (active: 274, core: 300, max: 300, largest: 300), Task: 145727238 (completed: 145726964), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://提供者IP+端口!
at com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport.rejectedExecution(AbortPolicyWithReport.java:53)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
at com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler.caught(AllChannelHandler.java:65)
... 19 more
当然,如果你没有指定,服务调用线程池默认的size是200,并且使用的是SynchronousQueue队列,请看FixedThreadPool#getExecutor实现:
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
可以看出,上面的错误日志正式在AbortPolicyWithReport中被输出的,我们看下AbortPolicyWithReport#rejectedExecution的实现:
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!" ,
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
throw new RejectedExecutionException(msg);
}
可以看出,这里输出的是WRAN级别的日志,并且抛出了RejectedExecutionException异常,那么问题来了,我们业务系统能否检测到这种情况?很遗憾,在这一步的时候,不仅没有走到Invoke的Filter环节,也没有真正开始进行服务调用,所以对服务接口配置的Filter或者AOP中都无法对这种情况进行处理。那我们该如何感知这种情况,一种方式是检测Dubbo输出的日志,第二种方式是消费者可以收到对应的RpcException,因为NettyCodecAdapter$InternalDecoder#exceptionCaught已经对该异常进行了处理,直接输出到了消费者端:
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
ctx.sendUpstream(e);
}
消费者方可以用过Filter捕获到该异常,并输出日志:
com.alibaba.dubbo.rpc.RpcException: Failed to invoke remote method: doYouLoveMe, provider: dubbo://提供者IP+端口/...略..., cause: com.alibaba.dubbo.remoting.ExecutionException: class com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler error when process received event .
com.alibaba.dubbo.remoting.ExecutionException: class com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler error when process received event .
at com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler.received(AllChannelHandler.java:58)
at com.alibaba.dubbo.remoting.exchange.support.header.HeartbeatHandler.received(HeartbeatHandler.java:90)
at com.alibaba.dubbo.remoting.transport.MultiMessageHandler.received(MultiMessageHandler.java:25)
at com.alibaba.dubbo.remoting.transport.AbstractPeer.received(AbstractPeer.java:123)
at com.alibaba.dubbo.remoting.transport.netty.NettyHandler.messageReceived(NettyHandler.java:91)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter$InternalDecoder.messageReceived(NettyCodecAdapter.java:148)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.RejectedExecutionException: Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-提供者IP+端口, Pool Size: 300 (active: 274, core: 300, max: 300, largest: 300), Task: 145731584 (completed: 145731286), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://提供者IP+端口!
at com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport.rejectedExecution(AbortPolicyWithReport.java:53)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
at com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler.received(AllChannelHandler.java:56)
... 16 more
如果经常出现该问题,说明提供者的处理能力跟不上消费者,最简单的解决办法就是将提供者的服务调用线程池数目调大点,比如:
<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="500" />
这里我们为了保证模块内的主要服务有线程可用(防止次要服务抢占过多服务调用线程),可以对次要服务进行并发限制,例如:
<dubbo:protocol threads="500" />
<dubbo:service interface="xxxxx" version="1.0.0" ref="xxx1"
executes="100" >
<dubbo:method name="findAllPerson" executes="50" />
</dubbo:service>
如果服务调用线程池够用,则将直接创建ChannelEventRunnable对象并扔到服务调用线程池中执行。直接将事件交给DecodeHandler来处理,从这里开始调用的Handler如下:
DecodeHandler#received:对message的data进行解码,并交给下一个Handler.
HeaderExchangeHandler#received:这里会记录下读取时间,便于心跳任务检测(参见HeartBeatTask),通过下一个Handler执行完服务调用后往Channel写入应答数据。
DubboProtocol#reply:因为使用的是dubbo协议,所以这里是DubboProtocol,这里将直接通过Invoker来调用服务处理过程。
这里先给出DubboProtocol中的相关代码片段:
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
// 根据请求参数来选择Invoker
Invoker<?> invoker = getInvoker(channel, inv);
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
// 直接执行服务调用过程
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException{
int port = channel.getLocalAddress().getPort();
String path = inv.getAttachments().get(Constants.PATH_KEY);
// 服务调用key,即接口名+版本+提供者端口
String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
// 通过服务调用key找到对应的DubboExporter对象,exporterMap中是每个接口对应一个DubboExporter
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
if (exporter == null)
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
// 返回invoker
return exporter.getInvoker();
}
代码都容易懂,为了能让用户接入服务调用的之前和之后,Dubbo使用了Filter机制,类似于Servlet中的Filter概念,Filter在消费者和提供者都有。当然,Dubbo里面也有一系列的内定Filter用来执行特定的功能,比如状态检查、异常处理等,这里我们就不讨论Dubbo中Filter执行顺序的问题了。为了大家能清晰的了解一笔请求会经历哪些内定的Filter,这里列出提供者在接收到请求后会执行的Filter链路(调用顺序从上到下):
invoker = {ProtocolFilterWrapper$1@3163} "com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"
invoker = {RegistryProtocol$InvokerDelegete@3172}
filter = {EchoFilter@3173} // 第一个调用的Filter,如果方法名是$echo,则仿照echo协议传什么返回什么
next = {ProtocolFilterWrapper$1@3174} "com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"
invoker = {RegistryProtocol$InvokerDelegete@3172}
filter = {ClassLoaderFilter@3177} // 将当前线程的ClassLoader切换成服务调用接口的ClassLoader,服务调用完毕再切换回来
next = {ProtocolFilterWrapper$1@3178} "com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"
invoker = {RegistryProtocol$InvokerDelegete@3172}
filter = {GenericFilter@3180} // 通过GenericService来调用的Dubbo服务才会执行里面的逻辑
next = {ProtocolFilterWrapper$1@3181} "com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"
invoker = {RegistryProtocol$InvokerDelegete@3172}
filter = {ContextFilter@3183} // 填充上下文RpcContext中的数据,服务调用完后清除
next = {ProtocolFilterWrapper$1@3184} "com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"
invoker = {RegistryProtocol$InvokerDelegete@3172}
filter = {TraceFilter@3188} // telnet的时候才会执行内部逻辑
next = {ProtocolFilterWrapper$1@3189} "com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"
invoker = {RegistryProtocol$InvokerDelegete@3172}
filter = {MonitorFilter@3192} // 如果需要使用监控功能,这里会统计并发数和异常数
next = {ProtocolFilterWrapper$1@3193} "com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"
invoker = {RegistryProtocol$InvokerDelegete@3172}
filter = {TimeoutFilter@3196} // 统计方法调用耗时,如果超过provider设置的时间,则输出告警日志
next = {ProtocolFilterWrapper$1@3197} "com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"
invoker = {RegistryProtocol$InvokerDelegete@3172}
filter = {ExceptionFilter@3199} // 遇到异常Dubbo会如何处理,大家一定得好好看看这个Filter的实现
next = {ProtocolFilterWrapper$1@3200} "com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"
invoker = {RegistryProtocol$InvokerDelegete@3172}
filter = {TokenFilter@3202} // 校验token,如果在provider中token设置成true,表明不允许consumer绕过注册中心直接调用provider的服务,注意,token是provider负责生成并注册到注册中心的
next = {ProtocolFilterWrapper$1@3203} "com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"
invoker = {RegistryProtocol$InvokerDelegete@3172}
filter = {ProviderAFilter@3209} // 这里是业务方自定义的Filter
next = {RegistryProtocol$InvokerDelegete@3172}
invoker = {JavassistProxyFactory$1@3210} "registry://注册中心地址/com.alibaba.dubbo.registry.RegistryService?...略..."
InvokerWrapper.invoker = {JavassistProxyFactory$1@3210} "registry://服务提供者地址/com.alibaba.dubbo.registry.RegistryService?...略..."
wrapper = {Wrapper0@3215}
this$0 = {JavassistProxyFactory@3216}
proxy = {ServiceProviderImpl$$EnhancerByCGLIB$$14338e8a@3217} "com.manzhizhen.study.dubbo.ServiceProviderImpl@747af825" // 对Spring的服务类做动态代理处理
type = {Class@418} "interface com.manzhizhen.study.dubbo.ServiceProvider"
url = {URL@3218} "registry://注册中心地址/com.alibaba.dubbo.registry.RegistryService?...略..."
url = {URL@3211} "dubbo://服务提供者地址/com.manzhizhen.study.dubbo.ServiceProvider?...略..."
<!--StartFragment--> <!--EndFragment-->
最后执行的就是Wrapper对proxy的调用了,proxy指向的是Spring容器中的服务实例的代理类,这个代理类采用的是和Spring容器配置相关的代理形式(比如这里是CGLib而不是JDK的动态代理),而Wrapper是Dubbo用Javassist实现的代理类(关于Dubbo代理部分可以参看《Dubbo源代码实现二》),用来调用proxy。大家有兴趣可以看看Wrapper的源码。
相关推荐
### Apache Dubbo:服务提供者与消费者核心概念详解 #### 一、Apache Dubbo概览 **Apache Dubbo**是一款高性能、轻量级的开源微服务框架,最初由阿里巴巴内部开发并在2008年开始使用,随后在2011年开源。自2017年...
总的来说,Dubbo 2.8.4的源代码是一个深度学习分布式服务治理框架的好材料,它揭示了服务治理的核心机制和实现细节,对于理解微服务架构和提升Java开发能力大有裨益。通过深入研究,开发者可以更好地利用Dubbo来构建...
Apache Dubbo:Dubbo高级特性:服务降级与熔断实战 Dubbo是著名的RCP框架,文档内有干货,提供代码和可复现的命令,值得借鉴。
### Apache Dubbo:服务治理——服务路由与动态配置 #### 一、服务治理的重要性 在当前流行的微服务架构中,由于各个服务之间存在着频繁而复杂的交互,如何有效地管理和控制这些服务成为了确保整个系统稳定性和可...
【Dubbo源代码(2.5.4)】是一份重要的开源项目资源,它包含了Dubbo框架在2.5.4版本的完整源代码。Dubbo是中国阿里巴巴公司贡献的高性能、轻量级的服务治理框架,它专注于服务调用、监控和服务治理。这个版本的源...
- **配置文件方式**:在服务提供者和消费者的配置文件中,使用 `<dubbo:service>` 或 `<dubbo:reference>` 标签的 `version` 属性来指定服务版本。 - **服务提供者配置**: ```xml <dubbo:service interface=...
### Apache Dubbo:服务熔断与超时重试 #### 一、服务熔断基础 ##### 1.1 服务熔断的概念 服务熔断,作为一种重要的服务稳定性保障措施,在分布式系统中扮演着至关重要的角色。它的工作原理是,当某个服务节点...
### Apache Dubbo:Dubbo服务治理:负载均衡与容错机制 #### 一、Dubbo服务治理的重要性 在现代微服务架构中,服务间的交互变得日益频繁和复杂,因此需要一种有效的方式来管理这些服务,以确保整个系统的稳定性和...
### Apache Dubbo:Dubbo监控与运维:服务性能分析 #### 一、Dubbo监控概述 ##### 1.1 Dubbo监控的重要性 在现代微服务架构中,由于服务之间存在着复杂的调用关系,任何单一服务的性能问题都有可能对整体系统稳定...
在Dubbo中,服务提供者是实现业务逻辑的组件,它通过`@Service`注解将接口与其实现类绑定,暴露服务。例如,在`dubbo-samples-api`模块中定义的接口,会在`dubbo-samples-provider`模块中被具体实现并暴露。 2. **...
【Dubbo Server+Client 完整代码】是一个深入学习和实践Dubbo框架的实例项目,它涵盖了服务端(Server)和服务消费者端(Client)的完整实现。Dubbo是阿里巴巴开源的一款高性能、轻量级的Java远程服务框架,它强调了...
6. `<dubbo:provider>` 和 `<dubbo:consumer>`:这两个元素是服务提供者和服务消费者的高级配置,可以用来设置一些全局的属性。 7. `<dubbo:method>` 和 `<dubbo:argument>`:细化服务方法和参数的配置,如异步调用...
1. **服务接口与实现**:了解如何定义服务接口,并在服务提供者中实现接口,这是Dubbo服务的基础。 2. **Dubbo配置**:学习如何在XML或properties文件中配置服务提供者和消费者,包括服务元数据、传输协议、序列化...
dubbo分布式服务框架,方便大家对分布式服务的学习,方便对dubbo的扩展
- 在服务提供者端,通过`dubbo:service`标签配置服务接口、实现类、版本等信息,暴露服务。 - 在服务消费者端,通过`dubbo:reference`标签配置服务接口、版本等信息,引用远程服务。 4. **配置文件详解**: - `...
【标题】"Dubbo 2.8.4 源代码" 涵盖了分布式服务框架的核心技术,是阿里巴巴开源的一款高性能、轻量级的服务治理框架。它为开发者提供了微服务开发所需的诸多功能,包括服务注册与发现、负载均衡、调用链路监控等。...
4. 压缩包中的“com.dubbo.service”很可能包含了服务提供者的源代码,尤其是实现了业务逻辑的服务类和相关配置。 5. 在实际开发中,需要对服务提供者进行详细配置,如服务接口、实现类、版本、权重等,并且需要考虑...
如果使用dubbo遇到错误:com.alibaba.dubbo.remoting.RemotingException: Fail to decode request due to: RpcInvocation 请下载这个jar,替换掉你项目中的那个jar,应该可以解决。
1. 配置Spring:创建Spring配置文件,定义Bean,包括Mybatis的SqlSessionFactory和MapperScannerConfigurer,以及Dubbo的相关配置,如服务提供者和消费者。 2. 集成Mybatis:编写Mapper接口和对应的XML映射文件,...
这通常需要修改Dubbo Admin的源代码,并重新打包部署。 #### 三、Dubbo监控系统的配置与使用 ##### 3.1 配置Dubbo Admin Dubbo Admin的配置主要涉及以下几个方面: 1. **服务注册中心**:Dubbo Admin需要连接到...