实现TCPSender

这个lab有几个难点,分别是:

  1. TCP中会维护好几种状态:

    • CLOSE: SYN还没有被发送
    • SYN_SENT: SYN被发送了但是没有收到对应的ACK
    • SYN_ACKED: 正常状态,可以利用sender来通信了
    • SYN_ACKED_ASLO: sender已经发送完所以的字节流了,但是还没有发生FIN。
    • FIN_SENT: FIN已经被发送,但是还没有收到对应的ACK
    • FIN_ACKED: sender已经完成所以的任务。
  2. TCPSender使用的是累计确认的协议,也就是说如果收到合法的ACK,那么ACK之前的所有已经发送segments已经被成功接受。(如果收到的ACK大于我还未发送的Sequence Number那就是非法的ACK)

  3. timer的开启,重启,关闭的时机。

    在做这个是lab之前我一直以为是给每一个发出的segments安装一个timer,当segments超时的时候重发,但是看了huangrt01大佬的代码后,发现在这个lab中timer,timer记录的是在RTO时间内有没有收到合法的ACK,如果没有收到合法的ACK,那么就需要重传。

    下面简单的概括一下timer使用的时机:

    • 当发送一个新的segment的时候,如果timer没有开启,那么需要开启timer。
    • 当在RTO内收到一个合法的ACK,有两种情况:
      1. 如果sender没发完segments那么需要重启timer,重启的意思是timer从0开始计时。
      2. 如果sender已经发完所有的segments了那么需要关闭timer
    • 当超时的情况发生,也是两种情况:
      1. window_size = 0 : 重启timer,重传segments
      2. window_size != 0 : double RTO, 重启timer,重传segments

timer实现更加规范看RFC 6298第五小节

代码

tcp_sender.hh

//! \make by myself
class TCPTimer {
  private:
    //! true : timer start , false : timer not start
    bool _start;
    unsigned int init_time;
    //! Transmission time
    unsigned int transmission_time;
    //! retransmission timeout
    unsigned int RTO;

  public:
    //! Number of consecutive retransmissions
    unsigned int num_of_retransmission;

    TCPTimer(unsigned int time)
        : _start(false)
        , init_time(time)
        , transmission_time(0)
        , RTO(init_time)
        , num_of_retransmission(0) {}

    bool running() { return _start; }

    void close() {
        _start = false;
        num_of_retransmission = 0;
    }
    void start() {
        _start = true;
        RTO = init_time;
        transmission_time = 0;
        num_of_retransmission = 0;
    }
//! if window == 0 then keep RTO , otherwise double RTO
    void doubleOrkeep_RTO_and_restart(const size_t window) {
        if (!running())
            return;

        if (window != 0)
            RTO *= 2;

        transmission_time = 0;
        num_of_retransmission++;
    }

    bool timeout(const size_t ms_since_last_tick) {
        if (!running())
            return false;

        if (ms_since_last_tick + transmission_time >= RTO)
            return true;

        transmission_time += ms_since_last_tick;
        return false;
    }
};

class TCPSender {
  private:
   .............
    //! make by myself
    uint64_t _ackno;
    size_t _window_size;
    uint64_t _bytes_in_flight;
    TCPTimer timer;

    std::queue<TCPSegment> _segments_track{};

    void send_no_empty_segments(TCPSegment &seg);

    .............
}

tcp_sender.cc

#include "tcp_sender.hh"

#include "tcp_config.hh"

#include <random>

template <typename... Targs>
void DUMMY_CODE(Targs &&... /* unused */) {}

using namespace std;

TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout
            ,const std::optional<WrappingInt32> fixed_isn)
    : _isn(fixed_isn.value_or(WrappingInt32{random_device()()}))
    , _initial_retransmission_timeout{retx_timeout}
    , _stream(capacity)
    , _ackno(0)
    , _window_size(1)
    , _bytes_in_flight(0)
    , timer(retx_timeout) {}

uint64_t TCPSender::bytes_in_flight() const { return _bytes_in_flight; }

void TCPSender::send_no_empty_segments(TCPSegment &seg) {
    seg.header().seqno = wrap(_next_seqno, _isn);
    _next_seqno += seg.length_in_sequence_space();
    _bytes_in_flight += seg.length_in_sequence_space();
    _segments_out.push(seg);
    _segments_track.push(seg);
    if (!timer.running()) {
        timer.start();
    }
}

void TCPSender::fill_window() {
    /*Status: CLOSED -> stream waiting to begin*/
    if (_next_seqno == 0) {
        TCPSegment seg;
        seg.header().syn = true;
        seg.header().seqno = next_seqno();
        send_no_empty_segments(seg);
    }
    /*Status: SYN_SENT -> stream start but nothing acknowledged*/
    else if (_next_seqno == _bytes_in_flight) {
        return;
    }

    size_t window_size = _window_size == 0 ? 1 : _window_size;
    size_t remain = 0;
//! 这里window_size 一定是大于 (_next_seqno - _ackno),不用担心溢出问题。文章后面解释
    while ((remain = window_size - (_next_seqno - _ackno))) {
        TCPSegment seg;
size_t len = TCPConfig::MAX_PAYLOAD_SIZE > remain ? remain : TCPConfig::MAX_PAYLOAD_SIZE;
        /*Status: SYN_ACKED -> stream ongoing*/
        if (!_stream.eof()) {
            seg.payload() = Buffer(_stream.read(len));
            if (_stream.eof() && remain - seg.length_in_sequence_space() > 0)
                seg.header().fin = true;
            if (seg.length_in_sequence_space() == 0)
                return;
            send_no_empty_segments(seg);
        }
/*Status: SYN_ACKED -> stream ongoing (stream has reached EOF but FIN hasn't been send yet)*/
        else if (_stream.eof()) {
            if (_next_seqno < _stream.bytes_written() + 2) {
                seg.header().fin = true;
                send_no_empty_segments(seg);
            }
         /*Status: FIN_SENT and FIN_ACKED both do nothing Just return */
            else
                return;
        }
    }
}

void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
    uint64_t abs_ackno = unwrap(ackno, _isn, _ackno);

    //! 超出范围 _next_seqno还没发呢,哪来的abs_ackno > _next_seqno
    if (abs_ackno > _next_seqno) {
        return;
    }

    _window_size = static_cast<size_t>(window_size);

    //! 比abs_ackno大的先来了
    if (abs_ackno <= _ackno) {
        return;
    }

    _ackno = abs_ackno;

    //! 成功接受到新的ackno
    timer.start();

    while (!_segments_track.empty()) {
        TCPSegment seg = _segments_track.front();
        if (ackno.raw_value() < seg.header().seqno.raw_value() 
        + static_cast<uint32_t>(seg.length_in_sequence_space()))
            break;
        _bytes_in_flight -= seg.length_in_sequence_space();
        _segments_track.pop();
    }

    fill_window();

    if (_segments_track.empty()) {
        timer.close();
    }
}

void TCPSender::tick(const size_t ms_since_last_tick) {
    if (!timer.running() || !timer.timeout(ms_since_last_tick))
        return;
    if (_segments_track.empty()) {
        timer.close();
        return;
    }
    timer.doubleOrkeep_RTO_and_restart(_window_size);
    _segments_out.push(_segments_track.front());
}

unsigned int TCPSender::consecutive_retransmissions() const { 
    return timer.num_of_retransmission; 
}

void TCPSender::send_empty_segment() {
    TCPSegment seg;
    seg.header().seqno = wrap(_next_seqno, _isn);
    _segments_out.push(seg);
}

解释remain = window_size - (_next_seqno - _ackno))不会溢出

要解释这里就要联系一下lab2中实现的TCPReceiver

这里的window_size指的是receiver中buffer剩余的空间,当receiver往buffer中放入字节流的时候window_size变小,当上层应用读取buffer中的字节流的时候,window_size变大。

注意:
不是sender发送segment给receiver,receiver就立刻将segement放入buffer中了,而是当segments连续了才会被放入buffer中

这里我假设上次发送时的窗口大小为window_size_pre,同时上次 发送的首个segments就没有被receiver接收到 ,而其他的都被接收到了, 这个时候receiver不会把这批segments放入buffer中 ,所以发回来的ACK中装的window_size是等于window_size_pre的,所以window_size 不会大于(_next_seqno - _ackno);



计算机网络   CS144      CS144 lab3

本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!