一、使用方式
服务提供方不变,调用方代码如下:哪里要用在那里配置(局部配置,不会影像全局)
1 <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService"> 2 <dubbo:method name="sayHello" async="true" timeout="60000"/> 3 <dubbo:method name="sayBye" async="true" timeout="60000"/> 4 </dubbo:reference>
配置里添加<dubbo:method name="xxx" async="true"/>,表示单个方法xxx使用异步方式;如果demoService下的所有方法都使用异步,直接配置为<dubbo:reference async="true"/>。
1 public static void main(String[] args) throws Exception { 2 //Prevent to get IPV6 address,this way only work in debug mode 3 //But you can pass use -Djava.net.preferIPv4Stack=true,then it work well whether in debug mode or not 4 System.setProperty("java.net.preferIPv4Stack", "true"); 5 6 asyncFuture2(); 7 } 8 9 public static void asyncFuture1() throws ExecutionException, InterruptedException { 10 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"}); 11 context.start(); 12 DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy 13 14 long start = System.currentTimeMillis(); 15 16 demoService.sayHello("zhangsan"); 17 Future<String> helloFuture = RpcContext.getContext().getFuture(); 18 19 demoService.sayBye("lisi"); 20 Future<String> byeFuture = RpcContext.getContext().getFuture(); 21 22 final String helloStr = helloFuture.get();//消耗5s 23 final String byeStr = byeFuture.get();//消耗8s 24 25 System.out.println(helloStr + " -- " + byeStr + " ,cost:" + (System.currentTimeMillis()-start));//总消耗8s 26 } 27 28 public static void asyncFuture2() throws ExecutionException, InterruptedException { 29 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"}); 30 context.start(); 31 DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy 32 33 long start = System.currentTimeMillis(); 34 35 Future<String> helloFuture = RpcContext.getContext().asyncCall(()-> demoService.sayHello("zhangsan")); 36 Future<String> byeFuture = RpcContext.getContext().asyncCall(()->demoService.sayBye("lisi")); 37 38 final String helloStr = helloFuture.get();//消耗5s 39 final String byeStr = byeFuture.get();//消耗8s 40 41 System.out.println(helloStr + " -- " + byeStr + " ,cost:" + (System.currentTimeMillis()-start));//总消耗8s 42 }
Consumer启动主类。其中asyncFuture2()方法是推荐用法,注意Callable(asyncCall方法的入参)只是一个任务task,不会新建线程;所以asyncFuture2()和asyncFuture1()相似,资源占用相同,都是用一根线程进行异步操作的。
二、asyncFuture1()源码解析
先来看asyncFuture1(),总体步骤:
- demoService.sayHello("zhangsan"); 创建一个Future对象,存入当前线程的上下文中(发出一个线程,建立future对象用于之后的返回结果的存储,并不是立即返回的调用)此时将结果包装成Future放在rpc上下文中---已经开始执行代理方法
- Future<String> helloFuture = RpcContext.getContext().getFuture(); 从当前线程的上下文中获取第一步存入的Future对象
- final String helloStr = helloFuture.get(); 阻塞等待,从Future中获取结果---执行完目标方法可以更新此代理对象
代码主要执行流(代码详细执行流看文章开头的三篇博客):
1、demoService.sayHello("zhangsan");
-->FutureFilter.invoke(final Invoker<?> invoker, final Invocation invocation) -->DubboInvoker.doInvoke(final Invocation invocation)
FutureFilter:
1 public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException { 2 final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation); 3 4 fireInvokeCallback(invoker, invocation); 5 // need to configure if there's return value before the invocation in order to help invoker to judge if it's 6 // necessary to return future. 7 Result result = invoker.invoke(invocation); 8 if (isAsync) { 9 asyncCallback(invoker, invocation); 10 } else { 11 syncCallback(invoker, invocation, result); 12 } 13 return result; 14 }
对于如上异步操作(asyncFuture1()和asyncFuture2()),FutureFilter没起任何作用,该Filter主要会用在事件通知中,后续再说。
DubboInvoker.doInvoke(final Invocation invocation):
1 protected Result doInvoke(final Invocation invocation) throws Throwable { 2 RpcInvocation inv = (RpcInvocation) invocation; 3 final String methodName = RpcUtils.getMethodName(invocation); 4 inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); 5 inv.setAttachment(Constants.VERSION_KEY, version); 6 7 ExchangeClient currentClient; 8 if (clients.length == 1) { 9 currentClient = clients[0]; 10 } else { 11 currentClient = clients[index.getAndIncrement() % clients.length]; 12 } 13 try { 14 boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); 15 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); 16 int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); 17 if (isOneway) { //无返回值 18 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); 19 currentClient.send(inv, isSent); 20 RpcContext.getContext().setFuture(null); 21 return new RpcResult(); 22 } else if (isAsync) { //异步有返回值 23 ResponseFuture future = currentClient.request(inv, timeout); 24 RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); 25 return new RpcResult(); 26 } else { //同步有返回值 27 RpcContext.getContext().setFuture(null); 28 return (Result) currentClient.request(inv, timeout).get(); 29 } 30 } catch (TimeoutException e) { 31 throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); 32 } catch (RemotingException e) { 33 throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); 34 } 35 }
模式:
- 如果是isOneway(不需要返回值),不管同步还是异步,请求直接发出,不会创建Future,直接返回RpcResult空对象。
- 如果是isAsync(异步),则
- 先创建ResponseFuture对象,之后使用FutureAdapter包装该ResponseFuture对象;(创建ResponseFuture对象与同步的代码相同,最后得到的是一个DefaultFuture对象)
- 然后将该FutureAdapter对象设入当前线程的上下文中RpcContext.getContext();
- 最后返回空的RpcResult
- 如果是同步,则先创建ResponseFuture对象,之后直接调用其get()方法进行阻塞调用(见文章开头的三篇文章)
简单来看一下FutureAdapter:
1 public class FutureAdapter<V> implements Future<V> { 2 3 private final ResponseFuture future; 4 5 public FutureAdapter(ResponseFuture future) { 6 this.future = future; 7 } 8 9 public ResponseFuture getFuture() { 10 return future; 11 } 12 13 public boolean cancel(boolean mayInterruptIfRunning) { 14 return false; 15 } 16 17 public boolean isCancelled() { 18 return false; 19 } 20 21 public boolean isDone() { 22 return future.isDone(); 23 } 24 25 @SuppressWarnings("unchecked") 26 public V get() throws InterruptedException, ExecutionException { 27 try { 28 return (V) (((Result) future.get()).recreate()); 29 } catch (RemotingException e) { 30 throw new ExecutionException(e.getMessage(), e); 31 } catch (Throwable e) { 32 throw new RpcException(e); 33 } 34 } 35 36 @SuppressWarnings("unchecked") 37 public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 38 int timeoutInMillis = (int) unit.convert(timeout, TimeUnit.MILLISECONDS); 39 try { 40 return (V) (((Result) future.get(timeoutInMillis)).recreate()); 41 } catch (com.alibaba.dubbo.remoting.TimeoutException e) { 42 throw new TimeoutException(StringUtils.toString(e)); 43 } catch (RemotingException e) { 44 throw new ExecutionException(e.getMessage(), e); 45 } catch (Throwable e) { 46 throw new RpcException(e); 47 } 48 } 49 }
最后,回头看一下FutureFilter:
1 public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException { 2 final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation); 3 4 fireInvokeCallback(invoker, invocation); 5 // need to configure if there's return value before the invocation in order to help invoker to judge if it's 6 // necessary to return future. 7 Result result = invoker.invoke(invocation); 8 if (isAsync) { 9 asyncCallback(invoker, invocation); 10 } else { 11 syncCallback(invoker, invocation, result); 12 } 13 return result; 14 }
1 private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) { 2 Future<?> f = RpcContext.getContext().getFuture(); 3 if (f instanceof FutureAdapter) { 4 ResponseFuture future = ((FutureAdapter<?>) f).getFuture(); 5 future.setCallback(new ResponseCallback() { 6 public void done(Object rpcResult) { 7 if (rpcResult == null) { 8 logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName())); 9 return; 10 } 11 ///must be rpcResult 12 if (!(rpcResult instanceof Result)) { 13 logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName())); 14 return; 15 } 16 Result result = (Result) rpcResult; 17 if (result.hasException()) { 18 fireThrowCallback(invoker, invocation, result.getException()); 19 } else { 20 fireReturnCallback(invoker, invocation, result.getValue()); 21 } 22 } 23 24 public void caught(Throwable exception) { 25 fireThrowCallback(invoker, invocation, exception); 26 } 27 }); 28 } 29 }
这里的future对象时之前创建好的DefaultFuture对象。
1 private volatile Response response; 2 private volatile ResponseCallback callback; 3 4 public boolean isDone() { 5 return response != null; 6 } 7 8 public void setCallback(ResponseCallback callback) { 9 if (isDone()) { 10 invokeCallback(callback); 11 } else { 12 boolean isdone = false; 13 lock.lock(); 14 try { 15 if (!isDone()) { 16 this.callback = callback; 17 } else { 18 isdone = true; 19 } 20 } finally { 21 lock.unlock(); 22 } 23 if (isdone) { 24 invokeCallback(callback); 25 } 26 } 27 }
这里判断响应是否已经返回了,如果返回了,直接执行invokeCallback(callback),否则将传入的ResponseCallback对象赋值给callback对象。
2、Future<String> helloFuture = RpcContext.getContext().getFuture();
RpcContext:
1 private static final ThreadLocal<RpcContext> LOCAL = new ThreadLocal<RpcContext>() { 2 @Override 3 protected RpcContext initialValue() { 4 return new RpcContext(); 5 } 6 }; 7 8 private Future<?> future; 9 10 public static RpcContext getContext() { 11 return LOCAL.get(); 12 } 13 14 public <T> Future<T> getFuture() { 15 return (Future<T>) future; 16 }
从当前线程上下文中获取之前存进去的FutureAdapter对象。
3、final String helloStr = helloFuture.get();
helloFuture是上述的FutureAdapter对象,其get()调用的是内部的DefaultFuture的get(),该方法与同步调用时相同,源码分析见文章开头的三篇文章。
1 public V get() throws InterruptedException, ExecutionException { 2 try { 3 return (V) (((Result) future.get()).recreate()); 4 } catch (RemotingException e) { 5 throw new ExecutionException(e.getMessage(), e); 6 } catch (Throwable e) { 7 throw new RpcException(e); 8 } 9 }
get方法的超时设置除了直接在xml中配置之外,还可以在代码中手动执行(优先级高)
1 final String helloStr2 = helloFuture.get(7000, TimeUnit.MILLISECONDS);
三、asyncFuture2()源码解析
下面来看一下asyncFuture2()源码:
1、Future<String> helloFuture = RpcContext.getContext().asyncCall(()-> demoService.sayHello("zhangsan"));
1 public <T> Future<T> asyncCall(Callable<T> callable) { 2 try { 3 try { 4 setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString()); 5 // 1 执行传入的任务(此处创建FutureAdapter对象,并且设置到当前线程的RpcContext的future对象中) 6 final T o = callable.call(); 7 //local invoke will return directly 8 if (o != null) { 9 FutureTask<T> f = new FutureTask<T>(new Callable<T>() { 10 public T call() throws Exception { 11 return o; 12 } 13 }); 14 f.run(); 15 return f; 16 } else { 17 18 } 19 } catch (Exception e) { 20 throw new RpcException(e); 21 } finally { 22 removeAttachment(Constants.ASYNC_KEY); 23 } 24 } catch (final RpcException e) { 25 return new Future<T>() { 26 public boolean cancel(boolean mayInterruptIfRunning) { 27 return false; 28 } 29 30 public boolean isCancelled() { 31 return false; 32 } 33 34 public boolean isDone() { 35 return true; 36 } 37 38 public T get() throws InterruptedException, ExecutionException { 39 throw new ExecutionException(e.getCause()); 40 } 41 42 public T get(long timeout, TimeUnit unit) 43 throws InterruptedException, ExecutionException, 44 TimeoutException { 45 return get(); 46 } 47 }; 48 } 49 // 2 从当前线程的RpcContext中获取future对象 50 return ((Future<T>) getContext().getFuture()); 51 }
这里外层的catch的作用是什么?没搞清楚 https://github.com/alibaba/dubbo/issues/1346
2、final String helloStr = helloFuture.get();
与同步相同。
总结:dubbo异步与同步的差别:
- 同步:创建DefaultFuture之后,直接get阻塞等待;
- 异步:创建DefaultFuture之后,使用FutureAdapter进行包装,之后设置到当前线程的RpcContext中;后续用户在合适的时候自己从RpcContext获取future,之后get。(把阻塞放在别的地方,最后获取的时候阻塞)
相关推荐
【标题】:“Dubbo异步调用的优化共20页.pdf” 【描述】:这份文档详细...通过学习这份20页的文档,开发者不仅能掌握Dubbo异步调用的基本原理,还能获得实际操作的指导,从而在自己的项目中实现更高效的异步调用优化。
对原有的dubbo远程调用的异步的缺陷性进行了优化方案
5-42 业务系统集成Dubbo异步调用实现(2).mp4
Dubbo 的异步通信机制是基于 Apache MINA 框架的 Reactor 模型通信框架,使用单一长连接和 NIO 异步通讯,适合小数据量大并发的服务调用。 下面是 Dubbo 的基本原理机制的详细说明: 客户端调用远程接口 1. ...
实现Springcloud向dubbo项目的接口调用。 FeignToDubbo-starter模块负责利用Feign将底层的Http协议转化为dubbo协议,供SpringCloud项目使用。引入FeignToDubbo-starter后会引入dubbo的依赖,使用注解DubboRefence...
《Dubbo调用模块详解:深入理解异步调用与容错策略》 在分布式服务框架Dubbo中,调用模块扮演着至关重要的角色,它负责实现远程方法调用并确保返回结果的顺利获取。本篇文章将深入探讨Dubbo调用模块的基本组成,...
6. 如果配置了异步调用,消费者不会立即等待结果,而是通过回调或者Future对象获取结果。 总结,Dubbo的调用模块提供了丰富的功能和灵活的配置,以满足不同场景下的服务调用需求。通过深入理解这些组件及其交互,...
在.NET Remoting中,可以通过`AsyncCallBack`委托来处理异步调用的完成。在gRPC中,使用protobuf定义服务接口,客户端和服务器端都有对应的异步版本。 在实际应用中,我们还需要考虑错误处理、性能优化、连接管理和...
此外,Dubbo还提供了异步调用和长连接等功能,进一步提升性能。 四、负载均衡 Dubbo内置了多种负载均衡策略,如Random、RoundRobin、LeastActive等,可以根据实际场景选择合适的策略,确保请求均匀分发到各个服务...
在`dubbo-samples-async`模块中,展示了如何使用异步调用。 8. **监控(Monitor)** Dubbo提供了监控中心,可以收集服务的调用统计、性能指标等数据。在`dubbo-samples-monitor`模块中,你可以看到如何配置监控并...
异步调用web服务 oneWay When the Service Gateway is being used within WebSphere ESB or BPM Advanaced for a one way operation, a HTTP 200 response code
4. **处理异步调用**:由于Android主线程不能执行耗时操作,所以通常需要使用异步调用来调用Dubbo服务。可以使用Android的AsyncTask、Handler、Coroutines(Kotlin)等机制来处理异步回调。 5. **错误处理和重试...
Dubbo支持同步和异步调用模式,异步调用可以在不阻塞当前线程的情况下发起请求,提高系统并发处理能力。同时,还可以设置回调机制,处理调用结果。 总结,这个"Dubbo实例练习"涵盖了Dubbo的核心概念和实际操作,...
Dubbo通过异步回调和Future模式实现了非阻塞I/O,提升了系统的并发处理能力。 36-Dubbo的基本应用与高级应用则涵盖了从简单的服务发布、消费到复杂的监控、调优等多个方面。例如,通过监控中心,我们可以实时查看...
Dubbo支持同步调用和异步调用两种模式。异步调用可以显著提高系统并发处理能力,通过Future或Callback对象,消费者可以在服务完成后再进行后续处理,提高系统响应速度。 九、服务调用模型 Dubbo的调用模型分为直连...
Dubbo提供了多种调用方式,如同步、异步、单向等,可以根据业务需求选择。 6. **测试验证**:在消费者端,编写测试用例验证服务调用是否成功。如果一切正常,应能看到服务提供者返回的预期结果。 值得注意的是,...
Dubbo是一个高性能、轻量级的开源Java RPC框架,它实现了服务的发布、发现、负载均衡等功能,使得开发者可以方便地进行分布式服务调用。 在Dubbo的RPC调用流程中,主要涉及以下步骤: 1. **服务消费方发起调用**:...
- **Dubbo服务调用超时**:在一次服务调用中,如果调用方(Consumer)未能在预设时间内获得响应,则会认为该次调用超时。 - **雪崩效应**:当多个服务同时发生超时现象时,大量的请求会在短时间内集中到服务端,超出...