`
jimmee
  • 浏览: 538013 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

UDT协议-基于UDP的可靠数据传输协议的实现分析(4)-发送和接收的算法

阅读更多

0. 计时器

udt有四种计时器:

ACK, NAK, EXP and SND


1. 发送端的算法

数据结构和变量:
1)SenderLossList: 记录发送方丢失的包的列表,根据序号升序排列
2)sendBuffer: 记录发送过的包和序号

发送算法:

1)如果丢失列表不为空,则重传这些packet包,并从丢失列表中移出,到5)
2)若应用层有数据,则执行发送
3) 进行检查
   a. 若未确认的包的数量超过流量窗口的大小,则回到1)
   b. 组织一个新的数据包,之后发送出去
4) 若当前的包的序号是16n(16的倍数),回到2),此处的作用是pair packet,可用于估计带宽的
5) 记录包的发送时间到SND PKT History Window中
6)如果属于发送速率降低后发送的第一个包,则等待SYN的时间
7) 得到STP-t的时间, t是step 1 to step 4 花费的时间. 回到 1).

算法的核心:
(1)为了保证可靠性,没有发送成功的包,需要重传
(2)可以发送的包数量受拥塞窗口和滑动窗口的限制
(3)根据发送速率的控制,两个包之间的发送时间间隔需要控制

udt-java版本实现的算法,基本一致
   

/**
     * sender algorithm
     */
    long iterationStart;
    public void senderAlgorithm()throws InterruptedException, IOException{
        while(!paused){
            // 本次迭代的时间
            iterationStart=Util.getCurrentTime();
            // 步骤1):if the sender's loss list is not empty
            // 移出
            Long entry=senderLossList.getFirstEntry();
            if(entry!=null){
                // 执行重传
                handleRetransmit(entry);
            }
            else
            {
                // 步骤2) 若应用层有数据,则执行发送
                // 步骤3) 进行检查
                //if the number of unacknowledged data packets does not exceed the congestion
                //and the flow window sizes, pack a new packet
                int unAcknowledged=unacknowledged.get();
               
                // 未确认的包的数量小于拥塞窗口大小和流控制窗口
                if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize()
                        && unAcknowledged<session.getFlowWindowSize()){
                    //check for application data
                    DataPacket dp=flowWindow.consumeData();
                    if(dp!=null){
                        send(dp);
                        largestSentSequenceNumber=dp.getPacketSequenceNumber();
                    }
                    else{
                        statistics.incNumberOfMissingDataEvents();
                    }
                }else{
                    //congestion window full, wait for an ack
                    if(unAcknowledged>=session.getCongestionControl().getCongestionWindowSize()){
                        statistics.incNumberOfCCWindowExceededEvents();
                    }
                   
                    // udt-java自己加入的处理方式
                    waitForAck();
                }
            }

            //wait
            if(largestSentSequenceNumber % 16 !=0){
                // 步骤7)
                long snd=(long)session.getCongestionControl().getSendInterval();
                long passed=Util.getCurrentTime()-iterationStart;
                int x=0;
                while(snd-passed>0){
                    //can't wait with microsecond precision :(
                    if(x==0){
                        statistics.incNumberOfCCSlowDownEvents();
                        x++;
                    }
                    passed=Util.getCurrentTime()-iterationStart;
                    if(stopped)return;
                }
            }
        }
    }

 
2. 接收端的算法

数据结构和变量
1)ReceiverLossListEntry和ReceiverLossList: ReceiverLossListEntry记录了丢失的packet的序号和反馈的时间,
ReceiverLossList是ReceiverLossListEntry的列表,根据序号升序排列
2)AckHistoryEntry和AckHistoryWindow:AckHistoryEntry记录发送的确认序列号和发送时间信息,AckHistoryWindow
是AckHistoryEntry的循环队列
3)PacketHistoryWindow:记录到达的package的信息
4)PacketPairWindow:记录两个probing packet pair之间的时间间隔
5) largestReceivedSeqNumber(LRSN): 接收到的最大的确认的序号
6) exp-count: A variable to record number of continuous EXP timeout
   events and its initial value is 1.

接收算法:
1)检查ACK, NAK, or EXP timer是否到期可以处理了
2) 执行带时间限制的poll操作接收包,若没有到达的包,回到步骤1)
3) 设置exp-count=1,更新ETP: ETP = RTT + 4 * RTTVar + ATP.
4) 当所有的包都确认后,重置EXP的过期时间
5) 检查是否是控制包,是的话,处理完之后,回到1)
6) 若当前接收到的包序号是16n + 1, 在Packet Pair Window中保存此包和上一个包之间的时间间隔
7) 记录到达包到PKT History Window中.
8) a. 若当前序号大于上次LRSN+1,则将LRSN到本序号之间的包放到接收丢失列表中,并发送NAK包
   b. 若当前序号小于LRSN,则从接receiver's loss list中移出
9) 更新LRSN. 回到1).

udt-java的实现:

 */
    public void receiverAlgorithm()throws InterruptedException,IOException{
        // 1) 步骤1,检查各种定时器
        //check ACK timer
        long currentTime=Util.getCurrentTime();
        if(nextACK<currentTime){
            nextACK=currentTime+ackTimerInterval;
            processACKEvent(true);
        }
        //check NAK timer
        if(nextNAK<currentTime){
            nextNAK=currentTime+nakTimerInterval;
            processNAKEvent();
        }

        //check EXP timer
        if(nextEXP<currentTime){
            nextEXP=currentTime+expTimerInterval;
            processEXPEvent();
        }
        // 步骤2)perform time-bounded UDP receive
        UDTPacket packet=handoffQueue.poll(Util.getSYNTime(), TimeUnit.MICROSECONDS);
        if(packet!=null){
            // 有数据,步骤3)
            //reset exp count to 1
            expCount=1;
            // 步骤4),实现有偏差
            //If there is no unacknowledged data packet, or if this is an
            //ACK or NAK control packet, reset the EXP timer.
            boolean needEXPReset=false;
            if(packet.isControlPacket()){
                ControlPacket cp=(ControlPacket)packet;
                int cpType=cp.getControlPacketType();
                if(cpType==ControlPacketType.ACK.ordinal() || cpType==ControlPacketType.NAK.ordinal()){
                    needEXPReset=true;
                }
            }
           
            if(needEXPReset){
                nextEXP=Util.getCurrentTime()+expTimerInterval;
            }
            if(storeStatistics)processTime.begin();
            // 处理packet
            processUDTPacket(packet);
           
            if(storeStatistics)processTime.end();
        }
       
        Thread.yield();
    }

    // 处理数据包
    protected void onDataPacketReceived(DataPacket dp)throws IOException{
        long currentSequenceNumber = dp.getPacketSequenceNumber();
       
        //for TESTING : check whether to drop this packet
//        n++;
//        //if(dropRate>0 && n % dropRate == 0){
//            if(n % 1111 == 0){   
//                logger.info("**** TESTING:::: DROPPING PACKET "+currentSequenceNumber+" FOR TESTING");
//                return;
//            }
//        //}
        // 将数据放到接收缓存中
        boolean OK=session.getSocket().getInputStream().haveNewData(currentSequenceNumber,dp.getData());
        if(!OK){
            //need to drop packet...
            return;
        }
       
        long currentDataPacketArrivalTime = Util.getCurrentTime();

        /* 步骤6)(4).if the seqNo of the current data packet is 16n+1,record the
        time interval between this packet and the last data packet
        in the packet pair window*/
        if((currentSequenceNumber%16)==1 && lastDataPacketArrivalTime>0){
            long interval=currentDataPacketArrivalTime -lastDataPacketArrivalTime;
            packetPairWindow.add(interval);
        }
       
        // 步骤7)(5).record the packet arrival time in the PKT History Window.
        packetHistoryWindow.add(currentDataPacketArrivalTime);

       
        //store current time
        lastDataPacketArrivalTime=currentDataPacketArrivalTime;

       
        // 步骤8)
        //(6).number of detected lossed packet
        /*(6.a).if the number of the current data packet is greater than LSRN+1,
            put all the sequence numbers between (but excluding) these two values
            into the receiver's loss list and send them to the sender in an NAK packet*/
        if(SequenceNumber.compare(currentSequenceNumber,largestReceivedSeqNumber+1)>0){
            sendNAK(currentSequenceNumber);
        }
        else if(SequenceNumber.compare(currentSequenceNumber,largestReceivedSeqNumber)<0){
                /*(6.b).if the sequence number is less than LRSN,remove it from
                 * the receiver's loss list
                 */
                receiverLossList.remove(currentSequenceNumber);
        }

        statistics.incNumberOfReceivedDataPackets();
       
        // 步骤9),更新LRSN
        //(7).Update the LRSN
        if(SequenceNumber.compare(currentSequenceNumber,largestReceivedSeqNumber)>0){
            largestReceivedSeqNumber=currentSequenceNumber;
        }

        //(8) need to send an ACK? Some cc algorithms use this
        if(ackInterval>0){
            if(n % ackInterval == 0)processACKEvent(false);
        }
    }

 

ACK Event处理:
1)确定ACK number: 如果receiver's loss list为空,ACK number为LRSN + 1;否则
ACK number为receiver's loss list中的最小值
2) ACK number大于ACK2,或者ACK number等于上次的ACK number并且时间间隔小于2RTT,则结束这个ACK的发送
3) 设置ACK sequence number
4) 计算包到达的速率,算法如下:计算PKT History Window中最后16个到达间隔的均值(AI),
移除间隔大于AI*8或者小于AI/8的均值,若剩下的数量大于8, 重新计算AI',the packet arrival speed is
1/AI' (number of packets per second); 否则返回0.
5) 计算滑动窗口effective flow window size as: max(min(W, available receiver buffer size), 2)
6) 计算estimated link capacity,算法如下:如果是quick start phase阶段,返回0,否则计算Packet Pair Window
中的最后16个packet pair的到达均值(PI),link capacity is 1/PI (number of packets per second).
7) 组装ACK包并发送:Pack the ACK sequence number, ACK number, RTT, RTT variance,
   effective flow window size, and estimated link capacity into the
   ACK packet and send it out.
8) 记录发送的ACK信息到ACK History Window中

          

     protected void processACKEvent(boolean isTriggeredByTimer)throws IOException{
        //步骤1)(1).Find the sequence number *prior to which* all the packets have been received
        final long ackNumber;
        ReceiverLossListEntry entry=receiverLossList.getFirstEntry();
        if (entry==null) {
            ackNumber = largestReceivedSeqNumber + 1;
        } else {
            ackNumber = entry.getSequenceNumber();
        }
        // 步骤2)(2).a) if ackNumber equals to the largest sequence number ever acknowledged by ACK2
        if (ackNumber == largestAcknowledgedAckNumber){
            //do not send this ACK
            return;
        }else if (ackNumber==lastAckNumber) {
            //or it is equals to the ackNumber in the last ACK 
            //and the time interval between these two ACK packets
            //is less than 2 RTTs,do not send(stop)
            long timeOfLastSentAck=ackHistoryWindow.getTime(lastAckNumber);
            if(Util.getCurrentTime()-timeOfLastSentAck< 2*roundTripTime){
                return;
            }
        }
        final long ackSeqNumber;
        //if this ACK is not triggered by ACK timers,send out a light Ack and stop.
        if(!isTriggeredByTimer){
            ackSeqNumber=sendLightAcknowledgment(ackNumber);
            return;
        }
        else{
            // 步骤7)
            //pack the packet speed and link capacity into the ACK packet and send it out.
            //(7).records  the ACK number,ackseqNumber and the departure time of
            //this Ack in the ACK History Window
            ackSeqNumber=sendAcknowledgment(ackNumber);
        }
       
        // 步骤8)
        AckHistoryEntry sentAckNumber= new AckHistoryEntry(ackSeqNumber,ackNumber,Util.getCurrentTime());
        ackHistoryWindow.add(sentAckNumber);
        //store ack number for next iteration
        lastAckNumber=ackNumber;
    }

 
NAK Event处理:

找到feedback time大于k*RTT的包,发送丢失信息
   

protected void processNAKEvent()throws IOException{
        //find out all sequence numbers whose last feedback time larger than is k*RTT
        List<Long>seqNumbers=receiverLossList.getFilteredSequenceNumbers(roundTripTime,true);
        sendNAK(seqNumbers);
    }

 

EXP Event处理:
1)将所有未确认的包放到发送丢失列表中
2)如果ExpCount > 16 并且具体距离上次操作的时间超过3s 或者 3min已过,则关闭UDT并退出
3) 如果发送丢失列表为空,则发送keep-alive packet包
4) Increase ExpCount by 1.

  

  protected void processEXPEvent()throws IOException{
        if(session.getSocket()==null || !session.getSocket().isActive())return;
        UDTSender sender=session.getSocket().getSender();
        //put all the unacknowledged packets in the senders loss list
        sender.putUnacknowledgedPacketsIntoLossList();
        if(expCount>16 && System.currentTimeMillis()-sessionUpSince > IDLE_TIMEOUT){
            if(!connectionExpiryDisabled &&!stopped){
                sendShutdown();
                stop();
                logger.info("Session "+session+" expired.");
                return;
            }
        }
        if(!sender.haveLostPackets()){
            sendKeepAlive();
        }

 

On ACK packet处理:

1) Update the largest acknowledged sequence number.
2) Send back an ACK2 with the same ACK sequence number in this ACK.
3) Update RTT and RTTVar.
4) Update both ACK and NAK period to 4 * RTT + RTTVar + SYN.
5) Update flow window size.
6) If this is a Light ACK, stop.
7) Update packet arrival rate: A = (A * 7 + a) / 8, where a is the
   value carried in the ACK.
8) Update estimated link capacity: B = (B * 7 + b) / 8, where b is
   the value carried in the ACK.
9) Update sender's buffer (by releasing the buffer that has been
   acknowledged).
10) Update sender's loss list (by removing all those that has been
      acknowledged).

   

protected void onAcknowledge(Acknowledgement acknowledgement)throws IOException{
        ackLock.lock();
        ackCondition.signal();
        ackLock.unlock();

        CongestionControl cc=session.getCongestionControl();
        long rtt=acknowledgement.getRoundTripTime();
        if(rtt>0){
            long rttVar=acknowledgement.getRoundTripTimeVar();
            // 步骤3) 更新rtt和rttVar
            cc.setRTT(rtt,rttVar);
            statistics.setRTT(rtt, rttVar);
        }
        // 步骤7),步骤8), 更新包的速率和估计带宽
        long rate=acknowledgement.getPacketReceiveRate();
        if(rate>0){
            long linkCapacity=acknowledgement.getEstimatedLinkCapacity();
            cc.updatePacketArrivalRate(rate, linkCapacity);
            statistics.setPacketArrivalRate(cc.getPacketArrivalRate(), cc.getEstimatedLinkCapacity());
        }

        long ackNumber=acknowledgement.getAckNumber();
        cc.onACK(ackNumber);
        statistics.setCongestionWindowSize((long)cc.getCongestionWindowSize());
        // 步骤9),10)
        //need to remove all sequence numbers up the ack number from the sendBuffer
        boolean removed=false;
        for(long s=lastAckSequenceNumber;s<ackNumber;s++){
            synchronized (sendLock) {
                // 发送缓存里移除
                removed=sendBuffer.remove(s)!=null;
                // 发送丢失列表里移除
                senderLossList.remove(s);
            }
            if(removed){
                unacknowledged.decrementAndGet();
            }
        }
        // 步骤1)
        lastAckSequenceNumber=Math.max(lastAckSequenceNumber, ackNumber);   
        // 步骤2)
        //send ACK2 packet to the receiver
        sendAck2(ackNumber);
        statistics.incNumberOfACKReceived();
        if(storeStatistics)statistics.storeParameters();
    }

 

On NAK packet处理:
1) Add all sequence numbers carried in the NAK into the sender's loss
      list.
2) Update the SND period by rate control
3) Reset the EXP time variable.

On ACK2 packet处理:
1) Locate the related ACK in the ACK History Window according to the
   ACK sequence number in this ACK2.
2) Update the largest ACK number ever been acknowledged.
3) Calculate new rtt according to the ACK2 arrival time and the ACK
   departure time, and update the RTT value as: RTT = (RTT * 7 +
      rtt) / 8.
4) Update RTTVar by: RTTVar = (RTTVar * 3 + abs(RTT - rtt)) / 4.
5) Update both ACK and NAK period to 4 * RTT + RTTVar + SYN.

   

 protected void onAck2PacketReceived(Acknowledgment2 ack2){
        AckHistoryEntry entry=ackHistoryWindow.getEntry(ack2.getAckSequenceNumber());
        if(entry!=null){
            long ackNumber=entry.getAckNumber();
            largestAcknowledgedAckNumber=Math.max(ackNumber, largestAcknowledgedAckNumber);
           
            long rtt=entry.getAge();
            if(roundTripTime>0)roundTripTime = (roundTripTime*7 + rtt)/8;
            else roundTripTime = rtt;
            roundTripTimeVar = (roundTripTimeVar* 3 + Math.abs(roundTripTimeVar- rtt)) / 4;
            ackTimerInterval=4*roundTripTime+roundTripTimeVar+Util.getSYNTime();
            nakTimerInterval=ackTimerInterval;
            statistics.setRTT(roundTripTime, roundTripTimeVar);
        }
    }

 

On message drop request received处理:
   1) Tag all packets belong to the message in the receiver buffer so
      that they will not be read.
   2) Remove all corresponding packets in the receiver's loss list.

On Keep-alive packet received处理:
   Do nothing.

On Handshake/Shutdown packet received处理:后续分析

分享到:
评论

相关推荐

    UDT协议-基于UDP的可靠数据传输协议---UDT实现源码

    UDT协议的源码分析可以帮助我们深入理解其工作原理,如何在不牺牲可靠性的前提下实现高效的UDP数据传输。通过阅读源码,我们可以学习到如何设计和实现一个高性能的传输层协议,这对于网络编程和大数据传输领域的研究...

    UDT协议-基于UDP的可靠数据传输协议.docx

    UDT(User Datagram Transport)协议是一种基于UDP(User Datagram Protocol)的可靠数据传输协议,设计目的是为了解决在高带宽延迟乘积(Bandwidth-Delay Product, BDP)环境中,传统的TCP协议效率低下的问题。...

    UDT协议-基于UDP的可靠数据传输协议.pdf

    UDT(User Datagram Transport)协议是一种基于用户数据报协议(UDP)的可靠数据传输协议,设计用于解决在高带宽时延乘积(BDP)网络环境下的传输效率和公平性问题。传统的TCP协议在BDP较大时表现不佳,因为其拥塞...

    UDT协议-基于UDP的可靠数据传输协议 (2).pdf

    UDT(User Datagram Transport)协议是一种基于UDP(User Datagram Protocol)的可靠数据传输协议,主要设计用于处理高带宽时延乘积(BDP)环境下的高效数据传输。在传统的TCP协议中,由于Additive Increase ...

    UDT协议-基于UDP的可靠数据传输协议 (2).docx

    UDT(User Datagram Transport)协议是一种基于用户数据报协议(UDP)的可靠数据传输协议,设计初衷是为了在高带宽时延乘积(BDP)的网络环境中提供高效、公平和稳定的传输服务,尤其适用于大数据量传输场景,如网格...

    udt UDT网络传输协议开源包,是基于UDP的可靠传输协议

    7. **API设计**:UDT提供了用户友好的API,使得开发者能够方便地集成到自己的应用程序中,进行可靠的UDP数据传输。 总的来说,UDT协议在保持UDP的高效性的同时,提供了类似TCP的可靠性,是那些对传输速度和可靠性都...

    UDT协议基于UDP的可靠数据传输协议.pdf

    UDT(User Datagram Transport)协议是一种基于用户数据报协议(UDP)的可靠数据传输协议,设计目的是为了在高带宽时延乘积(BDP)的网络环境中提供高效、公平和稳定的传输服务。传统的TCP协议在面对高BDP时表现不佳...

    udt-java 可靠UDP传输 源码

    UDT(UDP-based Data Transfer Protocol)是为了解决TCP在大数据传输时的效率问题而设计的一种用户数据报协议(UDP)上的可靠传输协议。UDT的设计目标是在保持UDP的低延迟和高带宽利用率的同时,提供类似TCP的可靠性...

    UDT协议UDP可靠数据传输协议.pdf

    UDT,全称为User Datagram Transport,是一种基于UDP(用户数据报协议)的可靠数据传输协议。UDP本身是一种无连接的、不可靠的传输协议,它不保证数据的顺序和完整性,也不进行拥塞控制。然而,UDT针对UDP的这些不足...

    UDT协议UDP可靠数据传输协议.docx

    UDT,全称为User Datagram Transport,是一种基于UDP(用户数据报协议)的可靠数据传输协议,设计用于在高带宽时延乘积(Bandwidth-Delay Product, BDP)环境中提供高效、公平和稳定的传输服务。传统的TCP协议在面对...

    udt 基于udp的可靠连接

    UDT(UDP-based Data Transfer Protocol)是一种用于高速数据传输的协议,它建立在用户数据报协议(UDP)之上,但提供了TCP(传输控制协议)类似的可靠性。UDT的设计目标是解决在大规模网络环境下,尤其是互联网上的...

    基于UDP的高速传输协议UDT4 详解

    1. **基于UDP的基础**:UDT4利用了用户数据报协议(UDP)的无连接特性,避免了TCP的三次握手和四次挥手过程,从而减少了建立和关闭连接的时间,提高了数据传输的即时性。 2. **拥塞控制**:尽管UDP没有内置的拥塞...

    udt源码 udp可靠性传输

    UDT(UDP-based Data Transport)是一种专为大数据传输设计的开源传输协议,它构建于用户数据报协议(UDP)之上,旨在提供比TCP更高效、更可靠的传输服务。UDT结合了UDP的低延迟和高带宽利用率特性,并通过自定义的...

    udt 4.8 基于udp的可靠通讯

    UDT(UDP-based Data Transfer Protocol)是一种用于网络通信的开源协议,主要设计用来提供类似于TCP的可靠传输服务,但基于用户数据报协议(UDP)。UDT 4.8是其最新版本,它在前几版的基础上进行了优化和增强,旨在...

    用UDP实现的可靠传输源码

    在“udt4”这个文件中,很可能是UDT(UDP-based Reliable Transport)的源代码实现,UDT是一个开源的、高性能的、基于UDP的可靠传输协议,它结合了TCP的可靠性和UDP的低延迟,适用于大数据传输和实时应用。UDT内部...

    基于udp:tcp:实现手机镜像功能的demo;同时添加了一个使用udt-java实现视频传输的用例.zip

    UDT是一种基于UDP的高效可靠的数据传输协议,设计初衷是为了满足大规模数据传输和实时应用的需求。UDT通过自定义的拥塞控制算法,能够在保持低延迟的同时,提供类似TCP的可靠性。在视频传输的案例中,UDT可能是为了...

    基于DUP的可靠数据传输的几个实现方法

    这时,基于UDP(用户数据报协议)的可靠数据传输方案应运而生,以克服TCP的不足。 UDP是一种无连接的、轻量级的传输协议,不保证数据包的顺序和可靠性,因此需要额外的机制来确保数据的完整传输。以下是一些基于UDP...

    长距离高带宽环境下UDT协议的分析与应用

    UDT(UDP-based Data Transfer Protocol)协议是基于UDP(用户数据报协议)的一种数据传输协议,它的设计目标是为了解决在高带宽、长距离网络环境下,TCP(传输控制协议)所面临的传输效率问题。UDT特别适用于数据...

    udt.sdk.3.2.tar.gz_udp 可靠 Linux_udt_可靠 udp

    UDT(UDP-based Data Transport)是一种专为高速数据传输设计的传输层协议,它基于用户数据报协议(UDP)但提供了类似TCP的可靠性。UDT SDK 3.2 是一个用于开发UDT应用的软件开发工具包,适用于Linux操作系统。这个...

Global site tag (gtag.js) - Google Analytics