`
wuhuajun
  • 浏览: 93870 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

异步长连接 实现方案(转)

    博客分类:
  • java
 
阅读更多

转自:http://blog.163.com/tsing_hua/blog/static/13962222420128195741354/

由于Dubbo底层采用Socket进行通信,自己对通信理理论也不是很清楚,所以顺便把通信的知识也学习一下。

n  通信理论

计算机与外界的信息交换称为通信。基本的通信方法有并行通信和串行通信两种。

1.一组信息(通常是字节)的各位数据被同时传送的通信方法称为并行通信。并行通信依靠并行IO接口实现。并行通信速度快,但传输线根数多,只适用于近距离(相距数公尺)的通信。

2.一组信息的各位数据被逐位顺序传送的通信方式称为串行通信。串行通信可通过串行接口来实现。串行通信速度慢,但传输线少,适宜长距离通信。

串行通信按信息传送方向分为以下3种:

1)   单工

只能一个方向传输数据

 

【原创】Alibaba Dubbo框架同步调用原理分析-1 - sun - 学无止境

2)   半双工

信息能双向传输,但不能同时双向传输

 

【原创】Alibaba Dubbo框架同步调用原理分析-1 - sun - 学无止境

3)   全双工

能双向传输并且可以同时双向传输

 

【原创】Alibaba Dubbo框架同步调用原理分析-1 - sun - 学无止境 

n  Socket

Socket 是一种应用接口, TCP/IP 是网络传输协议,虽然接口相同但是不同的协议会有不同的服务性质。创建Socket 连接时,可以指定使用的传输层协议,Socket 可以支持不同的传输层协议(TCP UDP ),当使用TCP 协议进行连接时,该Socket 连接就是一个TCP 连接。Soket TCP/IP 并没有必然的联系。Socket 编程接口在设计的时候,就希望也能适应其他的网络协议。所以,socket 的出现只是可以更方便的使用TCP/IP 协议栈而已。

引自:http://hi.baidu.com/lewutian/blog/item/b28e27fd446d641d09244d08.html

上一个通信理论其实是想说Socket(TCP)通信是全双工的方式

n  Dubbo远程同步调用原理分析

Dubbo开源文档上了解到一个调用过程如下图

http://code.alibabatech.com/wiki/display/dubbo/User+Guide#UserGuide-APIReference

另外文档里有说明:Dubbo缺省协议采用单一长连接和NIO异步通讯,适合于小数据量大并发的服务调用,以及服务消费者机器数远大于服务提供者机器数的情况。

 

【原创】Alibaba Dubbo框架同步调用原理分析-1 - sun - 学无止境

Dubbo缺省协议,使用基于mina1.1.7+hessian3.2.1的tbremoting交互。

  • 连接个数:单连接
  • 连接方式:长连接
  • 传输协议:TCP
  • 传输方式:NIO异步传输
  • 序列化:Hessian二进制序列化
  • 适用范围:传入传出参数数据包较小(建议小于100K),消费者比提供者个数多,单一消费者无法压满提供者,尽量不要用dubbo协议传输大文件或超大字符串
  • 适用场景:常规远程服务方法调用

 通常,一个典型的同步远程调用应该是这样的:

【原创】Alibaba Dubbo框架同步调用原理分析-1 - sun - 学无止境

1, 客户端线程调用远程接口,向服务端发送请求,同时当前线程应该处于“暂停“状态,即线程不能向后执行了,必需要拿到服务端给自己的结果后才能向后执行

2, 服务端接到客户端请求后,处理请求,将结果给客户端
3, 客户端收到结果,然后当前线程继续往后执行
 
Dubbo里使用到了Socket(采用apache mina框架做底层调用)来建立长连接,发送、接收数据,底层使用apache mina框架的IoSession进行发送消息。
 
查看Dubbo文档及源代码可知,Dubbo底层使用Socket发送消息的形式进行数据传递,结合了mina框架,使用IoSession.write()方法,这个方法调用后对于整个远程调用(从发出请求到接收到结果)来说是一个异步的,即对于当前线程来说,将请求发送出来,线程就可以往后执行了,至于服务端的结果,是服务端处理完成后,再以消息的形式发送给客户端的。于是这里出现了2个问题:
  • 当前线程怎么让它“暂停”,等结果回来后,再向后执行?
  • 正如前面所说,Socket通信是一个全双工的方式,如果有多个线程同时进行远程方法调用,这时建立在client server之间的socket连接上会有很多双方发送的消息传递,前后顺序也可能是乱七八糟的,server处理完结果后,将结果消息发送给clientclient收到很多消息,怎么知道哪个消息结果是原先哪个线程调用的?

分析源代码,基本原理如下:
  1. client一个线程调用远程接口,生成一个唯一的ID(比如一段随机字符串,UUID等),Dubbo是使用AtomicLong0开始累计数字的
  2. 将打包的方法调用信息(如调用的接口名称,方法名称,参数值列表等),和处理结果的回调对象callback,全部封装在一起,组成一个对象object
  3. 向专门存放调用信息的全局ConcurrentHashMap里面put(ID, object)
  4. ID和打包的方法调用信息封装成一对象connRequest,使用IoSession.write(connRequest)异步发送出去
  5. 当前线程再使用callbackget()方法试图获取远程返回的结果,在get()内部,则使用synchronized获取回调对象callback的锁, 再先检测是否已经获取到结果,如果没有,然后调用callbackwait()方法,释放callback上的锁,让当前线程处于等待状态。
  6. 服务端接收到请求并处理后,将结果(此结果中包含了前面的ID,即回传)发送给客户端,客户端socket连接上专门监听消息的线程收到消息,分析结果,取到ID,再从前面的ConcurrentHashMap里面get(ID),从而找到callback,将方法调用结果设置到callback对象里。
  7. 监听线程接着使用synchronized获取回调对象callback的锁(因为前面调用过wait(),那个线程已释放callback的锁了),再notifyAll(),唤醒前面处于等待状态的线程继续执行(callbackget()方法继续执行就能拿到调用结果了),至此,整个过程结束。
这里还需要画一个大图来描述,后面再补了
需要注意的是,这里的callback对象是每次调用产生一个新的,不能共享,否则会有问题;另外ID必需至少保证在一个Socket连接里面是唯一的。

现在,前面两个问题已经有答案了,
  • 当前线程怎么让它“暂停”,等结果回来后,再向后执行?
     答:先生成一个对象obj,在一个全局map里put(ID,obj)存放起来,再用synchronized获取obj锁,再调用obj.wait()让当前线程处于等待状态,然后另一消息监听线程等到服务端结果来了后,再map.get(ID)找到obj,再用synchronized获取obj锁,再调用obj.notifyAll()唤醒前面处于等待状态的线程。
  • 正如前面所说,Socket通信是一个全双工的方式,如果有多个线程同时进行远程方法调用,这时建立在client server之间的socket连接上会有很多双方发送的消息传递,前后顺序也可能是乱七八糟的,server处理完结果后,将结果消息发送给clientclient收到很多消息,怎么知道哪个消息结果是原先哪个线程调用的?
     答:使用一个ID,让其唯一,然后传递给服务端,再服务端又回传回来,这样就知道结果是原先哪个线程的了。
 
 
接上一篇,看一下Dubbo的相关代码

关键代码:

com.taobao.remoting.impl.DefaultClient.java

//同步调用远程接口

public Object invokeWithSync(Object appRequest, RequestControl control) throws RemotingException, InterruptedException {

        byte protocol = getProtocol(control);

        if (!TRConstants.isValidProtocol(protocol)) {

            throw new RemotingException("Invalid serialization protocol [" + protocol + "] on invokeWithSync.");

        }

        ResponseFuture future = invokeWithFuture(appRequest, control);

        return future.get() //获取结果时让当前线程等待,ResponseFuture其实就是前面说的callback

}

public ResponseFuture invokeWithFuture(Object appRequest, RequestControl control) {

         byte protocol = getProtocol(control);

         long timeout = getTimeout(control);

         ConnectionRequest request = new ConnectionRequest(appRequest);

         request.setSerializeProtocol(protocol);

         Callback2FutureAdapter adapter = new Callback2FutureAdapter(request);

         connection.sendRequestWithCallback(request, adapter, timeout);

         return adapter;

}

 

 

Callback2FutureAdapter implements ResponseFuture

public Object get() throws RemotingException, InterruptedException {

synchronized (this) {  // 旋锁

    while (!isDone) {  // 是否有结果了

wait(); //没结果是释放锁,让当前线程处于等待状态

    }

}

if (errorCode == TRConstants.RESULT_TIMEOUT) {

    throw new TimeoutException("Wait response timeout, request["

    + connectionRequest.getAppRequest() + "].");

}

else if (errorCode > 0) {

    throw new RemotingException(errorMsg);

}

else {

    return appResp;

}

}

客户端收到服务端结果后,回调时相关方法,即设置isDone = truenotifyAll()

public void handleResponse(Object _appResponse) {

         appResp = _appResponse; //将远程调用结果设置到callback中来

         setDone();

}

public void onRemotingException(int _errorType, String _errorMsg) {

         errorCode = _errorType;

         errorMsg = _errorMsg;

         setDone();

}

private void setDone() {

         isDone = true;

         synchronized (this) { //获取锁,因为前面wait()已经释放了callback的锁了

             notifyAll(); // 唤醒处于等待的线程

         }

}

 

com.taobao.remoting.impl.DefaultConnection.java

 

// 用来存放请求和回调的MAP

private final ConcurrentHashMap<Long, Object[]> requestResidents;

 

//发送消息出去

void sendRequestWithCallback(ConnectionRequest connRequest, ResponseCallback callback, long timeoutMs) {

         long requestId = connRequest.getId();

         long waitBegin = System.currentTimeMillis();

         long waitEnd = waitBegin + timeoutMs;

         Object[] queue = new Object[4];

         int idx = 0;

         queue[idx++] = waitEnd;

         queue[idx++] = waitBegin;   //用于记录日志

         queue[idx++] = connRequest; //用于记录日志

         queue[idx++] = callback;

         requestResidents.put(requestId, queue); // 记录响应队列

         write(connRequest);

 

         // 埋点记录等待响应的Map的大小

         StatLog.addStat("TBRemoting-ResponseQueues", "size", requestResidents.size(),

                   1L);

}

public void write(final Object connectionMsg) {

//mina里的IoSession.write()发送消息

         WriteFuture writeFuture = ioSession.write(connectionMsg);

         // 注册FutureListener,当请求发送失败后,能够立即做出响应

         writeFuture.addListener(new MsgWrittenListener(this, connectionMsg));

}

 

/**

在得到响应后,删除对应的请求队列,并执行回调

调用者:MINA线程

*/

public void putResponse(final ConnectionResponse connResp) {

         final long requestId = connResp.getRequestId();

         Object[] queue = requestResidents.remove(requestId);

         if (null == queue) {

             Object appResp = connResp.getAppResponse();

             String appRespClazz = (null == appResp) ? "null" : appResp.getClass().getName();

             StringBuilder sb = new StringBuilder();

             sb.append("Not found response receiver for requestId=[").append(requestId).append("],");

             sb.append("from [").append(connResp.getHost()).append("],");

             sb.append("response type [").append(appRespClazz).append("].");

             LOGGER.warn(sb.toString());

             return;

         }

         int idx = 0;

         idx++;

         long waitBegin = (Long) queue[idx++];

         ConnectionRequest connRequest = (ConnectionRequest) queue[idx++];

         ResponseCallback callback = (ResponseCallback) queue[idx++];

         // ** 把回调任务交给业务提供的线程池执行 **

         Executor callbackExecutor = callback.getExecutor();

         callbackExecutor.execute(new CallbackExecutorTask(connResp, callback));

 

         long duration = System.currentTimeMillis() - waitBegin; // 实际读响应时间

         logIfResponseError(connResp, duration, connRequest.getAppRequest());

}

 

CallbackExecutorTask

static private class CallbackExecutorTask implements Runnable {

         final ConnectionResponse resp;

         final ResponseCallback callback;

         final Thread createThread;

 

         CallbackExecutorTask(ConnectionResponse _resp, ResponseCallback _cb) {

             resp = _resp;

             callback = _cb;

             createThread = Thread.currentThread();

         }

 

         public void run() {

             // 预防这种情况:业务提供的Executor,让调用者线程来执行任务

             if (createThread == Thread.currentThread()

                       && callback.getExecutor() != DIYExecutor.getInstance()) {

                   StringBuilder sb = new StringBuilder();

                   sb.append("The network callback task [" + resp.getRequestId() + "] cancelled, cause:");

                   sb.append("Can not callback task on the network io thhread.");

                   LOGGER.warn(sb.toString());

                   return;

             }

 

             if (TRConstants.RESULT_SUCCESS == resp.getResult()) {

                   callback.handleResponse(resp.getAppResponse()); //设置调用结果

             }

             else {

                   callback.onRemotingException(resp.getResult(), resp

                            .getErrorMsg());  //处理调用异常

             }

         }

}

 

另外:

1, 服务端在处理客户端的消息,然后再处理时,使用了线程池来并行处理,不用一个一个消息的处理

同样,客户端接收到服务端的消息,也是使用线程池来处理消息,再回调

分享到:
评论

相关推荐

    [重要]基于Websphere MQ持久化消息实现异步转同步—方案二

    标题中的“基于Websphere MQ持久化消息实现异步转同步—方案二”是指在分布式系统中,通过使用Websphere MQ(WebSphere Message Broker,一种消息中间件)来处理异步通信,并通过消息的持久化特性,确保消息在异常...

    全异步SwooleAPI开发框架内置Promise异步MySQL连接池内存缓存管理异步Task方案等

    5. **异步Task方案**: 异步Task是Swoole框架中的一个强大特性,常用于处理后台任务,如发送邮件、执行长时间运算或数据处理等。通过创建独立的工作进程,异步Task可以将这些任务从主业务逻辑中分离出来,不会阻塞...

    异步Socket方式实现TCP网络通讯

    在使用异步Socket实现TCP网络通讯时,我们首先需要创建一个Socket对象,然后绑定到特定的IP地址和端口号,监听客户端的连接请求。 在服务器端,我们需要实现以下步骤: 1. 创建Socket实例,通常使用`new Socket...

    nodejs异步IO的实现

    随着JavaScript ES6及后续版本的引入,Node.js开始支持Promise和async/await语法,为异步编程提供了更优雅的解决方案。Promise可以链式调用,避免了回调地狱,而async/await则进一步提升了代码可读性,使得异步代码...

    异步编程的实现

    在JavaScript中,`async/await` 是异步编程的一种优雅解决方案。`async` 关键字用于定义一个异步函数,而`await` 关键字用于等待一个Promise对象的结果。这种方式让异步代码看起来更像同步代码,提高了可读性和可...

    异步处理设计方案

    异步处理设计方案是一种常见的优化策略,特别是在大型的BS(Browser-Server)系统中,它可以显著提升系统的响应速度和处理能力。在Z-Shop系统中,由于下订单操作涉及到大量数据库写操作,以及需要向多个统计分析表...

    Mina长连接框架实现Android客户端与服务器端通信

    在Android客户端与服务器端通信中,Mina框架因其高效的异步I/O处理能力,常被用来构建长连接,实现稳定且低延迟的数据传输。 长连接是指在通信过程中,一旦建立起连接,就保持该连接不关闭,直到通信结束或出现异常...

    C#实现Socket异步通信完整封装库源码

    总的来说,C#实现的Socket异步通信封装库源码为开发者提供了一套完整的网络通信解决方案,涵盖了服务器监听、客户端连接、服务抽象以及异步操作的管理。这种封装使得开发者能够更加专注于业务逻辑,而不用过于关心...

    php 长连接服务器端

    PHP实现长连接通常通过Comet技术或者WebSocket协议来完成。题目中提到的“comet”很可能是指这个技术。Comet是一种服务器推送技术,它允许服务器主动向客户端发送数据,而不是等待客户端发起请求。这种方式常用于...

    异步电机串级调速系统方案.doc

    异步电机串级调速系统方案 ...异步电机串级调速系统方案是指将异步电机的转子绕组与外界实现电气联接,以实现转差功率馈送式调速的方法。这种调速方法具有良好的调速性能和效率,但需要增加一些设备。

    CurlmultiThread.rar

    然而,长连接并不总是理想的解决方案,例如在高并发场景下,如果连接过多,可能会导致服务器资源紧张。因此,开发者需要根据具体应用场景权衡长连接和短连接的优缺点。 在多线程环境中,长连接的使用需格外谨慎,...

    (通用异步收发器)与蓝牙的接口连接

    ### UART(通用异步收发器)与蓝牙的接口连接 #### 一、引言 随着无线技术的迅速发展,蓝牙技术作为一种新兴的无线通信标准,已经在市场上占据了重要的位置。蓝牙技术不仅因其便利性受到用户的青睐,同时也因其...

    异步十六进制加法计数器(上升沿触发)(D)(设计方案1、2).zip

    在异步计数器中,多个D触发器通过反馈网络连接,每个触发器的时钟输入通常会受到前一个触发器的输出控制,形成级联结构。设计时序图时,关键是要确保每个触发器在正确的时间被激活,以便按照正确的顺序更新计数值。 ...

    PLC控制交流异步电动机正反转实验报告.doc

    在PLC控制中,通过编写程序改变输出端口的状态,进而改变电动机的三相电源连接顺序,实现电动机的正反转。 **三、实验设备** 实验所需设备包括计算机、PLC实验装置、实验挂箱、三相鼠笼式异步电动机以及各种导线。 ...

    基于WPF的UDP异步组播C#.rar

    总结来说,这个示例提供了一个基于WPF、C#的UDP异步组播解决方案,它利用了C#的异步编程特性,优化了UI性能,同时封装成了易于使用的DLL库。如果你在WPF应用中需要实现UDP组播功能,这个示例将是一个非常有价值的...

    《深入浅出Node.js》:Node异步编程解决方案之事件发布-订阅模式.docx

    《深入浅出Node.js》这本书深入探讨了Node.js的异步编程解决方案,特别是事件发布-订阅模式。在Node.js中,异步编程是其核心特性,它利用非阻塞I/O和事件驱动机制来提高性能,避免了多线程带来的复杂性。异步编程的...

    ALTERA器件中复位电路实现之异步复位同步化

    异步复位同步化技术在FPGA设计中提供了一种平衡复位速度与时序分析复杂性的方案。通过对异步复位信号进行适当的处理,可以在确保系统稳定的同时,提高系统的响应速度。通过合理的布局和时序约束,可以有效地避免亚...

    oracle 数据库,在C++中用连接池实现高速连接与访问.rar

    本话题将深入探讨如何在C++环境中利用连接池技术实现对Oracle数据库的高速连接与访问。 一、Oracle数据库基础 Oracle数据库提供了一套完整的数据管理解决方案,包括事务处理、数据仓库、网络数据库、安全性等。它的...

    TCP异步聊天客户端

    本篇主要介绍一个基于TCP协议的异步聊天客户端实现方案,其中包括私聊与群聊功能的实现思路与具体代码注释。TCP(Transmission Control Protocol)是一种面向连接的、可靠的、基于字节流的传输层通信协议,它为两台...

    基于java的进程间异步通信系统的设计与实现.pdf

    《基于Java的进程间异步通信系统的设计与实现》探讨的是如何在Java环境中构建一个高效、可靠的进程间通信系统,特别是在分布式应用系统开发中的重要性。进程间通信(IPC)是分布式系统的基础,它决定了系统的性能和...

Global site tag (gtag.js) - Google Analytics