CS144计算机网络 Lab3

一、简介

这里记录了笔者学习 CS144 计算机网络 Lab3 的一些笔记 - TCP 发送方实现 TCPSender

CS144 Lab3 实验指导书 - Lab Checkpoint 3: the TCP sender

个人 CS144 实验项目地址 - github

二、环境配置

当前我们的实验代码位于 master 分支,而在完成 Lab 之前需要合并一些依赖代码,因此执行以下命令:

1
git merge origin/lab3-startercode

之后重新 make 编译即可。

三、TCPSender 简述

1. TCPSender 功能

TCP Sender 负责将数据以 TCP 报文的形式发送,其需要完成的功能有:

  • 将 ByteStream 中的数据以 TCP 报文形式持续发送给接收者。
  • 处理 TCPReceiver 传入的 ackno 和 window size,以追踪接收者当前的接收状态,以及检测丢包情况。
  • 经过一个超时时间后仍然没有接收到 TCPReceiver 发送的针对某个数据包的 ack 包,则重传对应的原始数据包。

2. 如何检测丢包

TCP 使用超时重传机制。TCPSender 除了将原始数据流分解成众多 TCP 报文并发送以外,它还会追踪每个已发送报文(已被发送但还未被接收)的发送时间。如果某些已发送报文太久没有被接收方确认(即接收方接收到对应的 ackno),则该数据包必须重传

需要注意的是,接收方返回的 ackno 并不一定对应着发送方返回的 seqno(也不和 seqno 有算数关系),这是因为发送的数据可能会因为内存问题,被接收方截断。

接收方确认某个报文,指的是该报文的所有字节索引都已被确认。这意味着如果该报文只有部分被确认,则不能说明该报文已被完全确认。

TCP 的超时机制比较麻烦,这是因为超时机制直接影响到应用程序从远程服务器上读取数据的响应时间,以及影响到网络拥堵的程度。以下是实现 TCPSender 时需要注意的一些点:

  • 每隔几毫秒,TCPSender的 tick 函数将会被调用,其参数声明了过去的时间。这是 TCPSender 唯一能调用的超时时间相关函数。因为直接调用 clock 或者 time 将会导致测试套件不可用。

  • TCPSender 在构造时会被给予一个重传超时时间 RTO的初始值。RTO 是在重新发送未完成 TCP 段之前需要等待的毫秒数。RTO值将会随着时间的流逝(或者更应该说是网络环境的变化)而变化,但初始的RTO将始终不变。

  • 在 TCPSender 中,我们需要实现一个重传计时器。该计时器将会在 RTO 结束时进行一些操作。

  • 当每次发送包含数据的数据包时,都需要启动重传计时器,并让它在 RTO 毫秒后超时。若所有发送中报文均被确认,则终止重传计时器。

  • 如果重传计时器超时,则需要进行以下几步(稍微有点麻烦)

    • 重传尚未被 TCP 接收方完全确认的最早报文(即最低 ackno所对应的报文)。这一步需要我们将发送中的报文数据保存至一个新的数据结构中,这样才可以追踪正处于发送状态的数据。

    • 如果接收者的 window size 不为 0,即可以正常接收数据,则

      • 跟踪连续重传次数。过多的重传次数可能意味着网络的中断,需要立即停止重传。
      • 将RTO的值设置为先前的两倍,以降低较差网络环境的重传速度,以避免加深网络环境的拥堵。
      • 重置并重启重传计时器。

      接收者 window size 为 0 的情况将在下面说明。

  • 当接收者给发送者一个确认成功接收新数据的 ack 包时(absolute ack seqno 比之前接收到的 ackno 更大):

    • 将 RTO 设置回初始值
    • 如果发送方存在尚未完成的数据,则重新启动重传定时器
    • 连续重传计数清零。

3. TCPSender 要求

在该实验中,我们需要完成 TCPSender 的以下四个接口:

  • fill_window:TCPSender 从 ByteStream 中读取数据,并以 TCPSegement 的形式发送,尽可能地填充接收者的窗口。但每个TCP段的大小不得超过 TCPConfig::MAX PAYLOAD SIZE

    若接收方的 Windows size 为 0,则发送方将按照接收方 window size 为 1 的情况进行处理,持续发包。

    因为虽然此时发送方发送的数据包可能会被接收方拒绝,但接收方可以在反向发送 ack 包时,将自己最新的 window size 返回给发送者。否则若双方停止了通信,那么当接收方的 window size 变大后,发送方仍然无法得知接收方可接受的字节数量。

    若远程没有 ack 这个在 window size 为 0 的情况下发送的一字节数据包,那么发送者重传时不要将 RTO 乘2。这是因为将 RTO 双倍的目的是为了避免网络拥堵,但此时的数据包丢弃并不是因为网络拥堵的问题,而是远程放不下了。

  • ack_received:对接收方返回的 ackno 和 window size 进行处理。丢弃那些已经完全确认但仍然处于追踪队列的数据包。同时如果 window size 仍然存在空闲,则继续发包。

  • tick:该函数将会被调用以指示经过的时间长度。发送方可能需要重新发送一些超时且没有被确认的数据包。

  • send_empty_segment:生成并发送一个在 seq 空间中长度为 0正确设置 seqno 的 TCPSegment,这可让用户发送一个空的 ACK 段。

4. TCPSender 状态转换图

我们无需定义新的状态变量,只需合理利用好各个公共接口的状态,即可快速确认当前的状态。

image-20211109080457029

四、TCPSender 实现

实现起来有几个坑点:

  • 当 SYN 设置后,payload 应该在尽可能装的基础之上,少装入 1byte,因为这个 byte 大小被 SYN 占用。

    而在 payload 尽可能装的基础上,若 FIN 装不下了,则必须在下一个包中装入 FIN 。

  • FIN 包的发送必须满足三个条件:

    • 从来没发送过 FIN。这是为了防止发送方在发送 FIN 包并接收到 FIN ack 包之后,循环用 FIN 包填充发送窗口的情况。
    • 输入字节流处于 EOF
    • window 减去 payload 大小后,仍然可以存放下 FIN
  • 当循环填充发送窗口时,若发送窗口大小足够但本地没有数据包需要发送,则必须停止发送。

    若当前 Segment 是 FIN 包,则在发送完该包后,立即停止填充发送窗口。

  • 重传定时器追踪的是发送者距离上次接收到新 ack 包的时间,而不是每个处于发送中的包的超时时间。因此除 SYN 包以外(它会启动定时器),其他发包操作将不会重置 重传定时器,同时也无需为每个数据包配备一个定时器。

    同时,只有存在新数据包被接收方确认后,才会重置定时器。

    tick 函数也是类似,只有存在处于发送状态的数据包时,重传定时器才起作用。若重传定时器超时,则重传的是第一个 seqno 最小且尚未重传的数据包。

  • 当接收方的 window size 为 0 时,仍旧按照 window size 为 1 时去处理,发送一字节数据。但是,若远程没有发送 ack 包的时候,不要将 RTO 双倍,还是重置为之前的 RTO。

以下是我的实现:

类声明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class TCPSender {
private:
int _timeout{-1};
int _timecount{0};

std::map<size_t, TCPSegment> _outgoing_map{};
size_t _outgoing_bytes{0};

size_t _last_window_size{1};
bool _set_syn_flag{false};
bool _set_fin_flag{false};
size_t _consecutive_retransmissions_count{0};

//! our initial sequence number, the number for our SYN.
WrappingInt32 _isn;

[...]
}

类方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
uint64_t TCPSender::bytes_in_flight() const { return _outgoing_bytes; }

void TCPSender::fill_window() {
// 如果远程窗口大小为 0, 则把其视为 1 进行操作
size_t curr_window_size = _last_window_size ? _last_window_size : 1;
// 循环填充窗口
while (curr_window_size > _outgoing_bytes) {
// 尝试构造单个数据包
// 如果此时尚未发送 SYN 数据包,则立即发送
TCPSegment segment;
if (!_set_syn_flag) {
segment.header().syn = true;
_set_syn_flag = true;
}
// 设置 seqno
segment.header().seqno = next_seqno();

// 装入 payload.
const size_t payload_size =
min(TCPConfig::MAX_PAYLOAD_SIZE, curr_window_size - _outgoing_bytes - segment.header().syn);
string payload = _stream.read(payload_size);

/**
* 读取好后,如果满足以下条件,则增加 FIN
* 1. 从来没发送过 FIN
* 2. 输入字节流处于 EOF
* 3. window 减去 payload 大小后,仍然可以存放下 FIN
*/
if (!_set_fin_flag && _stream.eof() && payload.size() + _outgoing_bytes < curr_window_size)
_set_fin_flag = segment.header().fin = true;

segment.payload() = Buffer(move(payload));

// 如果没有任何数据,则停止数据包的发送
if (segment.length_in_sequence_space() == 0)
break;

// 如果没有正在等待的数据包,则重设更新时间
if (_outgoing_map.empty()) {
_timeout = _initial_retransmission_timeout;
_timecount = 0;
}

// 发送
_segments_out.push(segment);

// 追踪这些数据包
_outgoing_bytes += segment.length_in_sequence_space();
_outgoing_map.insert(make_pair(_next_seqno, segment));
// 更新待发送 abs seqno
_next_seqno += segment.length_in_sequence_space();

// 如果设置了 fin,则直接退出填充 window 的操作
if (segment.header().fin)
break;
}
}

//! \param ackno The remote receiver's ackno (acknowledgment number)
//! \param window_size The remote receiver's advertised window size
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
size_t abs_seqno = unwrap(ackno, _isn, _next_seqno);
// 如果传入的 ack 是不可靠的,则直接丢弃
if (abs_seqno > _next_seqno)
return;
// 遍历数据结构,将已经接收到的数据包丢弃
for (auto iter = _outgoing_map.begin(); iter != _outgoing_map.end();) {
// 如果一个发送的数据包已经被成功接收
const TCPSegment &seg = iter->second;
if (iter->first + seg.length_in_sequence_space() <= abs_seqno) {
_outgoing_bytes -= seg.length_in_sequence_space();
iter = _outgoing_map.erase(iter);

// 如果有新的数据包被成功接收,则清空超时时间
_timeout = _initial_retransmission_timeout;
_timecount = 0;
}
// 如果当前遍历到的数据包还没被接收,则说明后面的数据包均未被接收,因此直接返回
else
break;
}
_consecutive_retransmissions_count = 0;
// 填充后面的数据
_last_window_size = window_size;
fill_window();
}

//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method
void TCPSender::tick(const size_t ms_since_last_tick) {
_timecount += ms_since_last_tick;

auto iter = _outgoing_map.begin();
// 如果存在发送中的数据包,并且定时器超时
if (iter != _outgoing_map.end() && _timecount >= _timeout) {
// 如果窗口大小不为0还超时,则说明网络拥堵
if (_last_window_size > 0)
_timeout *= 2;
_timecount = 0;
_segments_out.push(iter->second);
// 连续重传计时器增加
++_consecutive_retransmissions_count;
}
}

unsigned int TCPSender::consecutive_retransmissions() const { return _consecutive_retransmissions_count; }

void TCPSender::send_empty_segment() {
TCPSegment segment;
segment.header().seqno = next_seqno();
_segments_out.push(segment);
}
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2020-2024 Kiprey
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~