由于Dubbo底层采用Socket进行通信,自己对通信理理论也不是很清楚,所以顺便把通信的知识也学习一下。
通信理论
计算机与外界的信息交换称为通信。基本的通信方法有并行通信和串行通信两种。
1.一组信息(通常是字节)的各位数据被同时传送的通信方法称为并行通信。并行通信依靠并行I/O接口实现。并行通信速度快,但传输线根数多,只适用于近距离(相距数公尺)的通信。
2.一组信息的各位数据被逐位顺序传送的通信方式称为串行通信。串行通信可通过串行接口来实现。串行通信速度慢,但传输线少,适宜长距离通信。
串行通信按信息传送方向分为以下3种:
1) 单工
只能一个方向传输数据
2) 半双工
信息能双向传输,但不能同时双向传输
3) 全双工
能双向传输并且可以同时双向传输
Socket
Socket 是一种应用接口, TCP/IP 是网络传输协议,虽然接口相同, 但是不同的协议会有不同的服务性质。创建Socket 连接时,可以指定使用的传输层协议,Socket 可以支持不同的传输层协议(TCP 或UDP ),当使用TCP 协议进行连接时,该Socket 连接就是一个TCP 连接。Soket 跟TCP/IP 并没有必然的联系。Socket 编程接口在设计的时候,就希望也能适应其他的网络协议。所以,socket 的出现只是可以更方便的使用TCP/IP 协议栈而已。
引自:http://hi.baidu.com/lewutian/blog/item/b28e27fd446d641d09244d08.html
上一个通信理论其实是想说Socket(TCP)通信是全双工的方式
Dubbo远程同步调用原理分析
从Dubbo开源文档上了解到一个调用过程如下图
http://code.alibabatech.com/wiki/display/dubbo/User+Guide#UserGuide-APIReference
另外文档里有说明:Dubbo缺省协议采用单一长连接和NIO异步通讯,适合于小数据量大并发的服务调用,以及服务消费者机器数远大于服务提供者机器数的情况。
Dubbo缺省协议,使用基于mina1.1.7+hessian3.2.1的tbremoting交互。
•连接个数:单连接
•连接方式:长连接
•传输协议:TCP
•传输方式:NIO异步传输
•序列化:Hessian二进制序列化
•适用范围:传入传出参数数据包较小(建议小于100K),消费者比提供者个数多,单一消费者无法压满提供者,尽量不要用dubbo协议传输大文件或超大字符串。
•适用场景:常规远程服务方法调用
通常,一个典型的同步远程调用应该是这样的:
1, 客户端线程调用远程接口,向服务端发送请求,同时当前线程应该处于“暂停“状态,即线程不能向后执行了,必需要拿到服务端给自己的结果后才能向后执行
2, 服务端接到客户端请求后,处理请求,将结果给客户端
3, 客户端收到结果,然后当前线程继续往后执行
Dubbo里使用到了Socket(采用apache mina框架做底层调用)来建立长连接,发送、接收数据,底层使用apache mina框架的IoSession进行发送消息。
查看Dubbo文档及源代码可知,Dubbo底层使用Socket发送消息的形式进行数据传递,结合了mina框架,使用IoSession.write()方法,这个方法调用后对于整个远程调用(从发出请求到接收到结果)来说是一个异步的,即对于当前线程来说,将请求发送出来,线程就可以往后执行了,至于服务端的结果,是服务端处理完成后,再以消息的形式发送给客户端的。于是这里出现了2个问题:
•当前线程怎么让它“暂停”,等结果回来后,再向后执行?
•正如前面所说,Socket通信是一个全双工的方式,如果有多个线程同时进行远程方法调用,这时建立在client server之间的socket连接上会有很多双方发送的消息传递,前后顺序也可能是乱七八糟的,server处理完结果后,将结果消息发送给client,client收到很多消息,怎么知道哪个消息结果是原先哪个线程调用的?
________________________________________
分析源代码,基本原理如下:
1.client一个线程调用远程接口,生成一个唯一的ID(比如一段随机字符串,UUID等),Dubbo是使用AtomicLong从0开始累计数字的
2.将打包的方法调用信息(如调用的接口名称,方法名称,参数值列表等),和处理结果的回调对象callback,全部封装在一起,组成一个对象object
3.向专门存放调用信息的全局ConcurrentHashMap里面put(ID, object)
4.将ID和打包的方法调用信息封装成一对象connRequest,使用IoSession.write(connRequest)异步发送出去
5.当前线程再使用callback的get()方法试图获取远程返回的结果,在get()内部,则使用synchronized获取回调对象callback的锁, 再先检测是否已经获取到结果,如果没有,然后调用callback的wait()方法,释放callback上的锁,让当前线程处于等待状态。
6.服务端接收到请求并处理后,将结果(此结果中包含了前面的ID,即回传)发送给客户端,客户端socket连接上专门监听消息的线程收到消息,分析结果,取到ID,再从前面的ConcurrentHashMap里面get(ID),从而找到callback,将方法调用结果设置到callback对象里。
7.监听线程接着使用synchronized获取回调对象callback的锁(因为前面调用过wait(),那个线程已释放callback的锁了),再notifyAll(),唤醒前面处于等待状态的线程继续执行(callback的get()方法继续执行就能拿到调用结果了),至此,整个过程结束。
这里还需要画一个大图来描述,后面再补了
需要注意的是,这里的callback对象是每次调用产生一个新的,不能共享,否则会有问题;另外ID必需至少保证在一个Socket连接里面是唯一的。
________________________________________
现在,前面两个问题已经有答案了,
•当前线程怎么让它“暂停”,等结果回来后,再向后执行?
答:先生成一个对象obj,在一个全局map里put(ID,obj)存放起来,再用synchronized获取obj锁,再调用obj.wait()让当前线程处于等待状态,然后另一消息监听线程等到服务端结果来了后,再map.get(ID)找到obj,再用synchronized获取obj锁,再调用obj.notifyAll()唤醒前面处于等待状态的线程。
•正如前面所说,Socket通信是一个全双工的方式,如果有多个线程同时进行远程方法调用,这时建立在client server之间的socket连接上会有很多双方发送的消息传递,前后顺序也可能是乱七八糟的,server处理完结果后,将结果消息发送给client,client收到很多消息,怎么知道哪个消息结果是原先哪个线程调用的?
答:使用一个ID,让其唯一,然后传递给服务端,再服务端又回传回来,这样就知道结果是原先哪个线程的了。
这种做法不是第一次见了,10年在上一公司里,也是远程接口调用,不过走的消息中间件rabbitmq,同步调用的原理跟这类似,详见:rabbitmq 学习-9- RpcClient发送消息和同步接收消息原理
关键代码:
com.taobao.remoting.impl.DefaultClient.java //同步调用远程接口 public Object invokeWithSync(Object appRequest, RequestControl control) throws RemotingException, InterruptedException { byte protocol = getProtocol(control); if (!TRConstants.isValidProtocol(protocol)) { throw new RemotingException("Invalid serialization protocol [" + protocol + "] on invokeWithSync."); } ResponseFuture future = invokeWithFuture(appRequest, control); return future.get(); //获取结果时让当前线程等待,ResponseFuture其实就是前面说的callback } public ResponseFuture invokeWithFuture(Object appRequest, RequestControl control) { byte protocol = getProtocol(control); long timeout = getTimeout(control); ConnectionRequest request = new ConnectionRequest(appRequest); request.setSerializeProtocol(protocol); Callback2FutureAdapter adapter = new Callback2FutureAdapter(request); connection.sendRequestWithCallback(request, adapter, timeout); return adapter; } |
Callback2FutureAdapter implements ResponseFuture public Object get() throws RemotingException, InterruptedException { synchronized (this) { // 旋锁 while (!isDone) { // 是否有结果了 wait(); //没结果是释放锁,让当前线程处于等待状态 } } if (errorCode == TRConstants.RESULT_TIMEOUT) { throw new TimeoutException("Wait response timeout, request[" + connectionRequest.getAppRequest() + "]."); } else if (errorCode > 0) { throw new RemotingException(errorMsg); } else { return appResp; } } 客户端收到服务端结果后,回调时相关方法,即设置isDone = true并notifyAll() public void handleResponse(Object _appResponse) { appResp = _appResponse; //将远程调用结果设置到callback中来 setDone(); } public void onRemotingException(int _errorType, String _errorMsg) { errorCode = _errorType; errorMsg = _errorMsg; setDone(); } private void setDone() { isDone = true; synchronized (this) { //获取锁,因为前面wait()已经释放了callback的锁了 notifyAll(); // 唤醒处于等待的线程 } } |
com.taobao.remoting.impl.DefaultConnection.java
// 用来存放请求和回调的MAP private final ConcurrentHashMap<Long, Object[]> requestResidents;
//发送消息出去 void sendRequestWithCallback(ConnectionRequest connRequest, ResponseCallback callback, long timeoutMs) { long requestId = connRequest.getId(); long waitBegin = System.currentTimeMillis(); long waitEnd = waitBegin + timeoutMs; Object[] queue = new Object[4]; int idx = 0; queue[idx++] = waitEnd; queue[idx++] = waitBegin; //用于记录日志 queue[idx++] = connRequest; //用于记录日志 queue[idx++] = callback; requestResidents.put(requestId, queue); // 记录响应队列 write(connRequest);
// 埋点记录等待响应的Map的大小 StatLog.addStat("TBRemoting-ResponseQueues", "size", requestResidents.size(), 1L); } public void write(final Object connectionMsg) { //mina里的IoSession.write()发送消息 WriteFuture writeFuture = ioSession.write(connectionMsg); // 注册FutureListener,当请求发送失败后,能够立即做出响应 writeFuture.addListener(new MsgWrittenListener(this, connectionMsg)); }
/** * 在得到响应后,删除对应的请求队列,并执行回调 * 调用者:MINA线程 */ public void putResponse(final ConnectionResponse connResp) { final long requestId = connResp.getRequestId(); Object[] queue = requestResidents.remove(requestId); if (null == queue) { Object appResp = connResp.getAppResponse(); String appRespClazz = (null == appResp) ? "null" : appResp.getClass().getName(); StringBuilder sb = new StringBuilder(); sb.append("Not found response receiver for requestId=[").append(requestId).append("],"); sb.append("from [").append(connResp.getHost()).append("],"); sb.append("response type [").append(appRespClazz).append("]."); LOGGER.warn(sb.toString()); return; } int idx = 0; idx++; long waitBegin = (Long) queue[idx++]; ConnectionRequest connRequest = (ConnectionRequest) queue[idx++]; ResponseCallback callback = (ResponseCallback) queue[idx++]; // ** 把回调任务交给业务提供的线程池执行 ** Executor callbackExecutor = callback.getExecutor(); callbackExecutor.execute(newCallbackExecutorTask(connResp, callback));
long duration = System.currentTimeMillis() - waitBegin; // 实际读响应时间 logIfResponseError(connResp, duration, connRequest.getAppRequest()); } |
CallbackExecutorTask static private class CallbackExecutorTask implements Runnable { final ConnectionResponse resp; final ResponseCallback callback; final Thread createThread;
CallbackExecutorTask(ConnectionResponse _resp, ResponseCallback _cb) { resp = _resp; callback = _cb; createThread = Thread.currentThread(); }
public void run() { // 预防这种情况:业务提供的Executor,让调用者线程来执行任务 if (createThread == Thread.currentThread() && callback.getExecutor() != DIYExecutor.getInstance()) { StringBuilder sb = new StringBuilder(); sb.append("The network callback task [" + resp.getRequestId() + "] cancelled, cause:"); sb.append("Can not callback task on the network io thhread."); LOGGER.warn(sb.toString()); return; }
if (TRConstants.RESULT_SUCCESS == resp.getResult()) { callback.handleResponse(resp.getAppResponse()); //设置调用结果 } else { callback.onRemotingException(resp.getResult(), resp .getErrorMsg()); //处理调用异常 } } } |
另外:
1, 服务端在处理客户端的消息,然后再处理时,使用了线程池来并行处理,不用一个一个消息的处理
同样,客户端接收到服务端的消息,也是使用线程池来处理消息,再回调
相关推荐
Spring Cloud Alibaba、Dubbo和Nacos都是在这一领域中广泛使用的开源框架。本篇文章将深入探讨如何将这三个组件整合在一起,实现内部服务的有效调用。 首先,Spring Cloud Alibaba是阿里巴巴提供的一套微服务解决...
Dubbo 框架是一种分布式服务框架,旨在提供高性能和透明化的 RPC 远程服务调用方案,以及 SOA 服务治理方案。 Dubbo 框架的核心部分包含远程通讯、集群容错和自动发现三个部分。 在 Dubbo 框架中,服务提供者和...
### Dubbo框架RPC实现原理 #### Dubbo框架简介 Dubbo是由阿里巴巴B2B技术部于2011年12月开源的一个高性能、透明化的RPC远程服务调用框架。它主要应用于分布式系统的服务治理方案,提供了服务自动注册和发现、软负载...
1、Dubbo 远程调用实现 2、内带zookeeper-3.4.5消息服务 3、直接导入myeclipse运行:dubbo-server导入tomcat中运行 4、dubbo-client 运行测试类/dubbo-client/src/com/fengjx/main/Consumer.java
【标题】"dubbo框架的代码" Dubbo是阿里巴巴开源的一款高性能、轻量级的Java分布式服务框架,它旨在提高微服务架构中的服务治理效率,促进服务间的通信和协同。Dubbo的核心特性包括服务发现、服务调用、负载均衡、...
### Dubbo框架详解 #### 一、Dubbo框架概述 Dubbo是由阿里巴巴开源的一款高性能、轻量级的微服务框架,旨在提供一个基于Java的分布式服务解决方案。它可以帮助开发者更轻松地构建、部署和管理复杂的应用程序和服务...
Dubbo框架和SSM框架搭建。
Dubbo是一款由阿里巴巴开源的高性能、轻量级的Java RPC框架,它专注于服务治理,提供了服务注册、发现、调用、负载均衡、容错、监控等一系列完整的服务治理方案。本文档将深入探讨Dubbo的核心特性和在分布式架构中的...
Dubbo的主要目标是提供一个高性能、透明化的RPC(远程过程调用)框架,使得服务消费方可以像调用本地方法一样调用远程服务。它具备服务注册与发现、负载均衡、容错、调用链跟踪等核心功能,实现了服务之间的高效通信...
在IT行业中,分布式服务框架是构建大型复杂系统的关键技术之一,Dubbo作为阿里巴巴开源的一款高...开发者可以通过分析和运行这个案例,深入了解Dubbo和Zookeeper的协同工作原理,为实际项目中的服务治理打下坚实基础。
分布式Dubbo框架是一种高性能的Java RPC框架,它基于微服务的设计理念,用于实现服务的注册与发现,以及服务之间的通信。在公司级项目中,分布式Dubbo框架常用于构建微服务架构,将大型的单一应用程序拆分为一组小的...
Dubbo是中国阿里巴巴开源的一款高性能、轻量级的Java服务治理框架,它主要负责服务的发布、发现、调用以及监控。在"**dubbo提供与调用**"这个主题下,我们将深入探讨Dubbo的核心概念、配置文件以及Web配置。 1. **...
本文主要介绍了Spring整合Dubbo框架的过程和原理分析,包括Dubbo架构、服务提供者开发、服务消费者开发、服务注册中心、Dubbo原理分析和Spring整合Dubbo框架等。通过本文,读者可以了解Dubbo框架的原理和应用场景,...
阿里巴巴的Dubbo是一款高性能、轻量级的Java开源框架,主要用于实现服务间的远程调用,是分布式系统中的重要组成部分。Dubbo 2.5.1是其一个重要版本,为开发者提供了稳定且便捷的服务治理功能,对于学习和应用分布式...
SpringBoot2.0 整合 Dubbo 框架实现 RPC 服务远程调用方法 本文主要介绍了 SpringBoot2.0 整合 Dubbo 框架实现 RPC 服务远程调用方法的详细步骤和配置。在本文中,我们将详细介绍 Dubbo 框架的简介、核心角色说明、...
- **RPC(Remote Procedure Call)**:Dubbo的核心是基于接口的远程调用,允许服务提供者在自己的进程内暴露服务,而服务消费者可以在自己的进程内调用远程服务,就像调用本地方法一样。 - **服务注册与发现**:...
【标题】"Dubbo框架设计原则ppt"与【描述】"dubbo框架设计Java并发ppt"揭示了本次讨论的核心——Dubbo框架的设计原则以及在Java并发环境中的应用。这两个主题对于理解和优化分布式服务架构至关重要。 首先,让我们...
标题中的“nodejs使用原生的dubbo协议打通了dubbo的rpc方法调用”意味着在Node.js环境中,开发者成功地实现了对Dubbo服务的RPC(远程过程调用)访问,利用了Dubbo协议的特性。Dubbo是阿里巴巴开源的一个高性能、轻量...
1. **Dubbo**: Dubbo是阿里巴巴开源的一款高性能、轻量级的Java RPC框架,它能够帮助开发者实现服务之间的远程调用。在微服务架构中,Dubbo可以作为服务治理的核心,提供服务注册、服务发现、负载均衡、容错处理等...