`
flychao88
  • 浏览: 751995 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Dubbo源码分析系列1---Dubbo异步通信

 
阅读更多

1、client一个线程调用远程接口,生成一个唯一的ID(比如一段随机字符串,UUID等),Dubbo是使用AtomicLong从0开始累计数字。

 

2、将打包的方法调用信息(如调用的接口名称,方法名称,参数值列表等),和处理结果的回调对象callback,全部封装在一起,组成一个对象object。

 

3、向专门存放调用信息的全局ConcurrentHashMap里面put(ID, object)。

 

4、将ID和打包的方法调用信息封装成一对象connRequest,使用IoSession.write(connRequest)异步发送出去。

 

5、当前线程再使用callback的get()方法试图获取远程返回的结果,在get()内部,则使用synchronized获取回调对象callback的锁, 再先检测是否已经获取到结果,如果没有,然后调用callback的wait()方法,释放callback上的锁,让当前线程处于等待状态。

 

6、服务端接收到请求并处理后,将结果(此结果中包含了前面的ID,即回传)发送给客户端,客户端socket连接上专门监听消息的线程收到消息,分析结果,取到ID,再从前面的ConcurrentHashMap里面get(ID),从而找到callback,将方法调用结果设置到callback对象里。

 

7、监听线程接着使用synchronized获取回调对象callback的锁(因为前面调用过wait(),那个线程已释放callback的锁了),再notifyAll(),唤醒前面处于等待状态的线程继续执行(callback的get()方法继续执行就能拿到调用结果了),至此,整个过程结束。

 

客户端部分源码:

//同步调用远程接口
public Object invokeWithSync(Object appRequest, RequestControl control) throws RemotingException, InterruptedException {
        byte protocol = getProtocol(control);
        if (!TRConstants.isValidProtocol(protocol)) {
            throw new RemotingException("Invalid serialization protocol [" + protocol + "] on invokeWithSync.");
        }
        ResponseFuture future = invokeWithFuture(appRequest, control);
        return future.get();  //获取结果时让当前线程等待,ResponseFuture其实就是前面说的callback
}
public ResponseFuture invokeWithFuture(Object appRequest, RequestControl control) {
         byte protocol = getProtocol(control);
         long timeout = getTimeout(control);
         ConnectionRequest request = new ConnectionRequest(appRequest);
         request.setSerializeProtocol(protocol);
         Callback2FutureAdapter adapter = new Callback2FutureAdapter(request);
         connection.sendRequestWithCallback(request, adapter, timeout);
         return adapter;
}

 

回调部分源码如下:

Callback2FutureAdapter implements ResponseFuture
public Object get() throws RemotingException, InterruptedException {
    synchronized (this) {  // 旋锁
        while (!isDone) {  // 是否有结果了
            wait(); //没结果是释放锁,让当前线程处于等待状态
        }
    }
    if (errorCode == TRConstants.RESULT_TIMEOUT) {
         throw new TimeoutException("Wait response timeout, request["
         + connectionRequest.getAppRequest() + "].");
    }
    else if (errorCode > 0) {
        throw new RemotingException(errorMsg);
    }
    else {
         return appResp;
    }
}
客户端收到服务端结果后,回调时相关方法,即设置isDone = true并notifyAll()
public void handleResponse(Object _appResponse) {
         appResp = _appResponse; //将远程调用结果设置到callback中来
         setDone();
}
public void onRemotingException(int _errorType, String _errorMsg) {
         errorCode = _errorType;
         errorMsg = _errorMsg;
         setDone();
}
private void setDone() {
         isDone = true;
         synchronized (this) { //获取锁,因为前面wait()已经释放了callback的锁了
             notifyAll(); // 唤醒处于等待的线程
         }
}

 

Dubbo通信部分源码:

 
// 用来存放请求和回调的MAP
private final ConcurrentHashMap<Long, Object[]> requestResidents;
 
//发送消息出去
void sendRequestWithCallback(ConnectionRequest connRequest, ResponseCallback callback, long timeoutMs) {
         long requestId = connRequest.getId();
         long waitBegin = System.currentTimeMillis();
         long waitEnd = waitBegin + timeoutMs;
         Object[] queue = new Object[4];
         int idx = 0;
         queue[idx++] = waitEnd;
         queue[idx++] = waitBegin;   //用于记录日志
         queue[idx++] = connRequest; //用于记录日志
         queue[idx++] = callback;
         requestResidents.put(requestId, queue); // 记录响应队列
         write(connRequest);
 
         // 埋点记录等待响应的Map的大小
         StatLog.addStat("TBRemoting-ResponseQueues", "size", requestResidents.size(),
                   1L);
}
public void write(final Object connectionMsg) {
//mina里的IoSession.write()发送消息
         WriteFuture writeFuture = ioSession.write(connectionMsg);
         // 注册FutureListener,当请求发送失败后,能够立即做出响应
         writeFuture.addListener(new MsgWrittenListener(this, connectionMsg));
}
 
/**
* 在得到响应后,删除对应的请求队列,并执行回调
* 调用者:MINA线程
*/
public void putResponse(final ConnectionResponse connResp) {
         final long requestId = connResp.getRequestId();
         Object[] queue = requestResidents.remove(requestId);
         if (null == queue) {
             Object appResp = connResp.getAppResponse();
             String appRespClazz = (null == appResp) ? "null" : appResp.getClass().getName();
             StringBuilder sb = new StringBuilder();
             sb.append("Not found response receiver for requestId=[").append(requestId).append("],");
             sb.append("from [").append(connResp.getHost()).append("],");
             sb.append("response type [").append(appRespClazz).append("].");
             LOGGER.warn(sb.toString());
             return;
         }
         int idx = 0;
         idx++;
         long waitBegin = (Long) queue[idx++];
         ConnectionRequest connRequest = (ConnectionRequest) queue[idx++];
         ResponseCallback callback = (ResponseCallback) queue[idx++];
         // ** 把回调任务交给业务提供的线程池执行 **
         Executor callbackExecutor = callback.getExecutor();
         callbackExecutor.execute(new CallbackExecutorTask(connResp, callback));
 
         long duration = System.currentTimeMillis() - waitBegin; // 实际读响应时间
         logIfResponseError(connResp, duration, connRequest.getAppRequest());
}

 

以上代码是dubbo老版本的,但是思路却是和新版本完全一样的,在下一篇文章中会重点介绍新版本通信代码。。。。。未完待续。。。。。

分享到:
评论

相关推荐

    dubbo源码学习.zip--------------

    Dubbo使用Netty作为底层通信框架,实现了高性能的异步非阻塞IO模型,这在"IOboss"和"IOworker"两个关键组件中体现得尤为明显。IOboss负责管理网络连接,接收新连接,而IOworker则负责处理已连接的IO事件,执行读写...

    基于Dubbo的分布式系统架构-简易版支付系统源码

    Dubbo是由阿里巴巴开源的一款高性能、轻量级的Java RPC框架,它主要解决了服务间通信的问题,提供了包括服务注册、服务发现、负载均衡、调用链跟踪等一系列服务治理功能。在分布式系统中,Dubbo作为一个服务框架,...

    dubbo-zk-conf-kafka项目配置

    2. **事件驱动**:通过`Kafka` 消息队列,实现服务间的异步通信,提高系统的响应速度。 3. **数据流处理**:`Kafka` 集群可以作为数据流处理平台,将数据实时传递给下游的处理系统。 `core` 文件夹可能包含了项目的...

    Dubbo视频教程--高级篇--第24节--简易版支付系统介绍

    - **ActiveMQ**: 消息队列中间件,负责服务间的消息传递和异步通信,提高了系统的解耦和吞吐量。 - **Redis**: 分布式缓存,用于加速数据访问速度和减轻后端数据库压力。 #### 6. 部署结构 本教程中未详细描述具体...

    nacos-sentinel-dubbo-rocketmq-spring-cloud-example.zip

    本项目可能利用RocketMQ进行服务间的异步通信,确保消息的可靠传递,提高系统的吞吐量和响应速度。 具体到项目“nacos-sentinel-dubbo-rocketmq-spring-cloud-example”,我们可以期待看到以下内容: 1. Nacos的...

    SpringBoot+Dubbo构建的电商平台-微服务架构、商城、电商、微服务、高并发、kafka、Elasticsearch.zip

    在本项目中,我们主要探讨的是使用SpringBoot和Dubbo技术构建一个先进的电商平台,该平台采用了微服务架构,能够处理高并发流量,并整合了Kafka消息队列和Elasticsearch搜索引擎,以提升系统的稳定性和数据检索效率...

    dubbo-user-book.rar_SOA_dubbo_dubbo-user-book_分布式

    在服务监控方面,Dubbo集成了统计分析和健康检查功能,允许开发者实时了解服务的运行状态,如调用次数、成功率、平均响应时间等。这些数据对于定位问题和优化服务性能至关重要。 此外,Dubbo内置的容错机制保证了...

    dubbo学习 dubbo-dubbo-3.0.5.tar.gz

    - **Netty 通信框架**:Dubbo 使用 Netty 作为底层通信框架,提供高效的网络通信能力。 - **多协议支持**:Dubbo 支持 HTTP、gRPC 等多种协议,适应不同场景的需求。 综上所述,Dubbo 3.0.5 不仅是一个强大的...

    dubbo示例代码dubbo-sample

    Dubbo支持多种通信协议,如dubbo、rmi、hessian等。在`dubbo.properties`配置文件中,可以设置服务使用的通信协议,例如默认的dubbo协议。不同的协议会影响服务调用的性能和特性。 5. **负载均衡(Load Balance)*...

    Dubbo视频教程--高可用架构篇

    通过"edu-demo"系列文件,学习者可以了解如何配置和运行这些组件,以及在实际项目中如何结合使用消息队列实现微服务间的异步通信,提高系统的响应速度和容错能力。这些实践案例对于理解和掌握Dubbo在构建高可用架构...

    DUBBO多个版本的jar包

    1. **Dubbo版本介绍**: - Dubbo 2.5.x:这是Dubbo的早期版本,主要关注RPC(远程过程调用)功能,提供了服务提供者和服务消费者之间的通信能力,支持多种协议如HTTP、RMI、Hessian等,并引入了Zookeeper作为默认...

    dubbo的源码分析

    深入源码分析,我们可以看到Dubbo如何通过Netty框架来实现异步非阻塞的网络通信,以及如何通过Hessian或Protobuf等序列化库进行数据交换。 接下来,我们关注服务注册与发现。Dubbo提供了Zookeeper、Eureka、Redis等...

    dubbo源码分析-2(注册表AbstractRegistry设计技巧讲解)

    本文将深入分析"Dubbo源码分析-2(注册表AbstractRegistry设计技巧讲解)"这一主题,探讨`AbstractRegistry`的设计理念和实现细节。 `AbstractRegistry`是Dubbo中的抽象注册表类,它是所有具体注册表实现(如...

    dubbo-master

    2. **高性能:**基于 Netty 框架实现,提供了高效的异步通信能力。 3. **透明化:**服务调用就像调用本地方法一样简单,降低了分布式系统的复杂性。 4. **多种协议支持:**如 Dubbo 协议、RMI、HTTP、Hessian 等。 5...

    dubbo源码解析2.0.7z

    《Dubbo源码解析2.0》是一份深入剖析阿里巴巴开源框架Dubbo核心机制的资料,专注于2.0版本的源代码分析。Dubbo作为Java领域最知名的分布式服务框架之一,其设计理念、实现原理以及在实际应用中的优化策略都是开发者...

    dubbo入门案例与资料-java

    1. **全面兼容Java 8**:Dubbo 3.0 支持 Java 8 及以上版本,利用新特性和优化提升性能。 2. **升级Netty到4.x**:采用更高效的Netty 4.x 版本,提高网络通信效率。 3. **支持HTTP/2和gRPC**:除了原有的 Dubbo 协议...

    Dubbo-Zookeeper-Netty-SpringMVC, 使用dubbo注册服务,netty做服务器,springmvc提供restful接口.zip

    在本项目中,Dubbo被用来作为服务提供者和服务消费者之间的通信桥梁。服务提供者通过注册服务到注册中心,使得服务消费者可以通过服务名找到并调用相应的服务。Dubbo的优秀特性还包括服务的透明化调用、集群容错和...

    Dubbo 3 深度剖析 - 透过源码认识你

    分析源码还能帮助开发者学习到Dubbo 3的最佳实践,比如如何优雅地关闭服务、如何进行性能调优等,这些都是提升系统稳定性和效率的关键。 通过对《Dubbo 3 深度剖析 - 透过源码认识你》的学习,开发者能够从底层...

    dubbo-dubbo-2.5.3-codeanalysis:dubbo源码分析

    1. **Remoting 模块**:研究 Dubbo 如何建立客户端和服务端之间的通信链路。 2. **Protocol 模块**:了解 Dubbo 的服务暴露和引用机制。 3. **Invoker 和 Exporter**:深入理解服务的抽象表示和实际执行载体。 4. **...

    Dubbo高性能网关--Flurry介绍.pdf

    1. **基于Netty容器**:与通常使用Tomcat作为容器的传统网关不同,Flurry利用Netty的异步事件驱动模型,能够更有效地处理高并发请求。Netty的Reactive模式设计可以最大化利用CPU资源,提高单机处理能力。每个HTTP...

Global site tag (gtag.js) - Google Analytics