页次: 1
网络拥塞的概念大家一定不陌生,肯定都有亲生体会:比如节假日的高速路堵车。本来车流量已经很大了,如果再不限制高速口的车进入,整条路只会越来越堵,所以交管部门可能会临时限流,只允许车辆下高速,不允许上高速!互联网刚发明的那会还没有拥塞的概念,各个节点死命地传输数据,导致网络中各种路由设备的buff瞬间被填满,新来的包只能丢弃(像不像针对网络中专设备的DOS攻击了?)!为了维持网络的正常运转,需要接入网络的所有节点有节制地发送数据,避免网络拥塞!但如果节点过于节制,发送的数据报文过少,又会降低整个网络的吞吐量、利用率(这些网络设备闲着也是闲着)等,怎么才能尽可能最大化吞吐量,又能避免网络拥塞了?拥塞控制算法诞生了!拥塞控制的算法很多,我这个版本采用的是cubic算法(算法细节可以参考链接2和3),本文以这个为准分析算法实现的细节!
1、拥塞控制的控制引擎介绍:
(1)由于拥塞控制涉及到很多复杂的计算公式,每个公式都有很多计算因子factor,所以要先给这些factor初始化赋值,这些赋值在cubictcp_register函数中统一完成,如下:
/*初始化各种计算的factor*/
static int __init cubictcp_register(void)
{
BUILD_BUG_ON(sizeof(struct bictcp) > ICSK_CA_PRIV_SIZE);
/* Precompute a bunch of the scaling factors that are used per-packet
* based on SRTT of 100ms;计算SRTT=100ms时的缩放因子
*/
beta_scale = 8*(BICTCP_BETA_SCALE+beta) / 3
/ (BICTCP_BETA_SCALE - beta);
cube_rtt_scale = (bic_scale * 10); /* 1024*c/rtt */
/* calculate the "K" for (wmax-cwnd) = c/rtt * K^3
* so K = cubic_root( (wmax-cwnd)*rtt/c )
* the unit of K is bictcp_HZ=2^10, not HZ
*
* c = bic_scale >> 10
* rtt = 100ms
*
* the following code has been designed and tested for
* cwnd < 1 million packets
* RTT < 100 seconds
* HZ < 1,000,00 (corresponding to 10 nano-second)
*/
/* 1/c * 2^2*bictcp_HZ * srtt */
cube_factor = 1ull << (10+3*BICTCP_HZ); /* 2^40 */
/* divide by bic_scale and by constant Srtt (100ms) */
do_div(cube_factor, bic_scale * 10);
return tcp_register_congestion_control(&cubictcp);
}
(2)由于cubic只是拥塞控制的算法之一(还有其他好几种算法),并且未来可能还会诞生新的拥塞控制算法,为了便于管理现有算法,同时也能给新增的算法预留空间,linux采用了链表来保存/注册所有的拥塞控制算法,实现的方法就是初始化函数最后一行的tcp_register_congestion_control,如下:
/*
* Attach new congestion control algorithm to the list
* of available options.
*/
int tcp_register_congestion_control(struct tcp_congestion_ops *ca)
{
int ret = 0;
/* all algorithms must implement ssthresh and cong_avoid ops */
if (!ca->ssthresh || !(ca->cong_avoid || ca->cong_control)) {
pr_err("%s does not implement required ops\n", ca->name);
return -EINVAL;
}
//根据名称、长度等计算出一个hash值,加快比对速度(早期版本是对比字符串,效率低;key是32bit的整数,比对效率比字符串快多了)
ca->key = jhash(ca->name, sizeof(ca->name), strlen(ca->name));
spin_lock(&tcp_cong_list_lock);
if (ca->key == TCP_CA_UNSPEC || tcp_ca_find_key(ca->key)) {//检查一下是不是已经在list里面了
pr_notice("%s already registered or non-unique key\n",
ca->name);
ret = -EEXIST;
} else {//新的拥塞控制算法加入链表
list_add_tail_rcu(&ca->list, &tcp_cong_list);
pr_debug("%s registered\n", ca->name);
}
spin_unlock(&tcp_cong_list_lock);
return ret;
}
(3)从注释的地方可以看到加入链表的是tcp_congestion_ops结构体,这个结构体也是所有拥塞控制算法需要实现的结构体;每次有拥塞算法被发明后,就实现这个结构体中的部分甚至全部方法,然后调用上面的注册方法把新算法加入链表,后续可以通过遍历链表的方式选择合适的拥塞控制算法!结构体如下:
struct tcp_congestion_ops {
struct list_head list;
u32 key;/* 算法名称的哈希值 */
u32 flags;
/* initialize private data (optional) */
void (*init)(struct sock *sk);
/* cleanup private data (optional) */
void (*release)(struct sock *sk);
/* return slow start threshold (required) */
u32 (*ssthresh)(struct sock *sk);
/* do new cwnd calculation (required) */
void (*cong_avoid)(struct sock *sk, u32 ack, u32 acked);
/* call before changing ca_state (optional) */
void (*set_state)(struct sock *sk, u8 new_state);
/* call when cwnd event occurs (optional) */
void (*cwnd_event)(struct sock *sk, enum tcp_ca_event ev);
/* call when ack arrives (optional) */
void (*in_ack_event)(struct sock *sk, u32 flags);
/* new value of cwnd after loss (optional) */
u32 (*undo_cwnd)(struct sock *sk);
/* hook for packet ack accounting (optional) */
void (*pkts_acked)(struct sock *sk, const struct ack_sample *sample);
/* suggest number of segments for each skb to transmit (optional) */
u32 (*tso_segs_goal)(struct sock *sk);
/* returns the multiplier used in tcp_sndbuf_expand (optional) */
u32 (*sndbuf_expand)(struct sock *sk);
/* call when packets are delivered to update cwnd and pacing rate,
* after all the ca_state processing. (optional)
*/
void (*cong_control)(struct sock *sk, const struct rate_sample *rs);
/* get info for inet_diag (optional) */
size_t (*get_info)(struct sock *sk, u32 ext, int *attr,
union tcp_cc_info *info);
char name[TCP_CA_NAME_MAX];/* 拥塞控制算法的名称 */
struct module *owner;
};
有成员字段,也有函数指针;函数名是固定的,也就是说函数的功能都是固定的,每种拥塞控制算法各自实现这些函数功能就行了,这个思路和驱动类似:linux定义了驱动的接口函数名称,每个驱动厂家自己实现接口函数的功能!事实上拥塞算法也是和驱动一样,都是以模块的形式加载到内核的!
(4)上面的结构体定义了每种拥塞控制算法的接口名称和功能,那么具体到cubic算法,这些功能都是在哪些函数实现的了?在net\ipv4\tcp_cubic.c文件里面做的映射:
static struct tcp_congestion_ops cubictcp __read_mostly = {
.init = bictcp_init,
.ssthresh = bictcp_recalc_ssthresh,
.cong_avoid = bictcp_cong_avoid,
.set_state = bictcp_state,
.undo_cwnd = bictcp_undo_cwnd,
.cwnd_event = bictcp_cwnd_event,
.pkts_acked = bictcp_acked,
.owner = THIS_MODULE,
.name = "cubic",
};
这个结构体的名称叫cubictcp,刚好就在文章开头介绍的cubictcp_register函数中注册的!
2、上述介绍围绕着初始化、注册等重要功能,这些功能抽象出的框架就是拥塞控制引擎!尽管拥塞算法实现的细节可以不同,但是使用的引擎框架都是一样的,不得不佩服linux研发人员的模块抽象和概括能力(好多大厂职级晋升答辩的重要考核之一就是这种模块总结抽象、举一反三的能力!);
(1)cubic算法的实现涉及到很变量,为了统一管理,这了变量都定义在bictcp结构体中了,如下:
/* BIC TCP Parameters */
struct bictcp {
u32 cnt; /* increase cwnd by 1 after ACKs; 每次 cwnd 增长 1/cnt 的比例*/
u32 last_max_cwnd; /* last maximum snd_cwnd; snd_cwnd 之前的最大值*/
u32 loss_cwnd; /* congestion window at last loss;最近一次发生丢失的时候的拥塞窗口 */
u32 last_cwnd; /* the last snd_cwnd;最近的 snd_cwnd */
u32 last_time; /* time when updated last_cwnd;更新 last_cwnd 的时间 */
u32 epoch_start; /* beginning of an epoch;一轮的开始 */
#define ACK_RATIO_SHIFT 4
/*限制了delayed_ack的最大值为(32 << 4),也就是说Packets/ACKs的最大估计值限制为32;
为什么要增加这个限制呢?这是因为如果发送窗口很大,并且这个窗口的数据包的ACK大量丢失,那
么发送端就会得到一个累积确认了非常多数据包的ACK,这会造成delayed_ack值剧烈的增大。如果
一个ACK累积确认的数据包超过4096个,那么16位的delayed_ack就会溢出,最后的值可能为0。我
们知道delayed_ack在bictcp_update中是作为除数的,这时会产生除数为0的错误*/
u32 delayed_ack; /* estimate the ratio of Packets/ACKs << 4 */
};
(2)这么多变量,使用之前肯定是要初始化的,如下:
//将必要的参数都初始化为 0
static inline void bictcp_reset(struct bictcp *ca)
{
ca->cnt = 0;
ca->last_max_cwnd = 0;
ca->last_cwnd = 0;
ca->last_time = 0;
ca->epoch_start = 0;
ca->delayed_ack = 2 << ACK_RATIO_SHIFT;
}
static void bictcp_init(struct sock *sk)
{
struct bictcp *ca = inet_csk_ca(sk);
bictcp_reset(ca);
ca->loss_cwnd = 0;//此时尚未发送任何丢失,所以初始化为 0
if (initial_ssthresh)
tcp_sk(sk)->snd_ssthresh = initial_ssthresh;
}
(2)这么多变量,使用之前肯定是要初始化的,如下:
//将必要的参数都初始化为 0
static inline void bictcp_reset(struct bictcp *ca)
{
ca->cnt = 0;
ca->last_max_cwnd = 0;
ca->last_cwnd = 0;
ca->last_time = 0;
ca->epoch_start = 0;
ca->delayed_ack = 2 << ACK_RATIO_SHIFT;
}
static void bictcp_init(struct sock *sk)
{
struct bictcp *ca = inet_csk_ca(sk);
bictcp_reset(ca);
ca->loss_cwnd = 0;//此时尚未发送任何丢失,所以初始化为 0
if (initial_ssthresh)
tcp_sk(sk)->snd_ssthresh = initial_ssthresh;
}
(3)算法开始执行后,需要计算ssthresh,在bictcp_recalc_ssthresh中实现:
/*
* behave like Reno until low_window is reached,
* then increase congestion window slowly
*/
static u32 bictcp_recalc_ssthresh(struct sock *sk)
{
const struct tcp_sock *tp = tcp_sk(sk);
struct bictcp *ca = inet_csk_ca(sk);
ca->epoch_start = 0; /* end of epoch */
/* Wmax and fast convergence */
if (tp->snd_cwnd < ca->last_max_cwnd && fast_convergence)
ca->last_max_cwnd = (tp->snd_cwnd * (BICTCP_BETA_SCALE + beta))
/ (2 * BICTCP_BETA_SCALE);
else
ca->last_max_cwnd = tp->snd_cwnd;
ca->loss_cwnd = tp->snd_cwnd;
if (tp->snd_cwnd <= low_window)
return max(tp->snd_cwnd >> 1U, 2U);
else
return max((tp->snd_cwnd * beta) / BICTCP_BETA_SCALE, 2U);
}
这里面最核心的就是Fast Convergence机制了!在网络中,一旦有新的节点加入通信,肯定是要占用网络带宽的,势必需要其他正在通信的节点让渡出部分带宽。为了让旧节点尽快释放带宽,这里采用了Fast Convergence机制:每次发生丢包后,会对比此次丢包时拥塞窗口的大小和之前的拥塞窗口大小,如小于上次的拥塞窗口,说明有新节点加入通信,占用了部分带宽。此时旧节点需要多留一些带宽给新节点使用,以使得网络尽快收敛到稳定状态!
(3)当发生丢包时,记录当前窗口为W-max, 减小窗口后的大小为W,BIC算法就是根据这个原理在(W, W-max]区间内做二分搜索;当接近于W-max时曲线应该更平滑,当离W-max较远的时候,曲线可以更加陡峭;
/*
* Compute congestion window to use.
二分法重新计算拥塞窗口
*/
static inline void bictcp_update(struct bictcp *ca, u32 cwnd)
{
if (ca->last_cwnd == cwnd &&
(s32)(tcp_time_stamp - ca->last_time) <= HZ / 32)
return;
ca->last_cwnd = cwnd;
ca->last_time = tcp_time_stamp;
if (ca->epoch_start == 0) /* record the beginning of an epoch */
ca->epoch_start = tcp_time_stamp;
/* start off normal */
if (cwnd <= low_window) {
ca->cnt = cwnd;
return;
}
/* binary increase :二分增加;相比reno的线性增加,这里的效率明显高很多*/
if (cwnd < ca->last_max_cwnd) {
__u32 dist = (ca->last_max_cwnd - cwnd)
/ BICTCP_B;
if (dist > max_increment)
/* linear increase */
ca->cnt = cwnd / max_increment;
else if (dist <= 1U)
/* binary search increase */
ca->cnt = (cwnd * smooth_part) / BICTCP_B;
else
/* binary search increase */
ca->cnt = cwnd / dist;
} else {
/* slow start AMD linear increase */
if (cwnd < ca->last_max_cwnd + BICTCP_B)
/* slow start */
ca->cnt = (cwnd * smooth_part) / BICTCP_B;
else if (cwnd < ca->last_max_cwnd + max_increment*(BICTCP_B-1))
/* slow start */
ca->cnt = (cwnd * (BICTCP_B-1))
/ (cwnd - ca->last_max_cwnd);
else
/* linear increase */
ca->cnt = cwnd / max_increment;
}
/* if in slow start or link utilization is very low */
if (ca->last_max_cwnd == 0) {
if (ca->cnt > 20) /* increase cwnd 5% per RTT */
ca->cnt = 20;
}
ca->cnt = (ca->cnt << ACK_RATIO_SHIFT) / ca->delayed_ack;
if (ca->cnt == 0) /* cannot be zero */
ca->cnt = 1;
}
static void bictcp_cong_avoid(struct sock *sk, u32 ack, u32 acked)
{
struct tcp_sock *tp = tcp_sk(sk);
struct bictcp *ca = inet_csk_ca(sk);
if (!tcp_is_cwnd_limited(sk))
return;
if (tcp_in_slow_start(tp))
tcp_slow_start(tp, acked);
else {
bictcp_update(ca, tp->snd_cwnd);//计算ca->cnt, 表示增加一个cwnd需要的ack数量
tcp_cong_avoid_ai(tp, ca->cnt, 1);// 根据ca->cnt计算snd_cwnd
}
}
计算snd_cwnd值:
/* In theory this is tp->snd_cwnd += 1 / tp->snd_cwnd (or alternative w),
* for every packet that was ACKed.
*/
void tcp_cong_avoid_ai(struct tcp_sock *tp, u32 w, u32 acked)
{
/* If credits accumulated at a higher w, apply them gently now. */
/*如果 w 很大,那么, snd_cwnd_cnt 可能会积累为一个很大的值;
此后, w 由于种种原因突然被缩小了很多。那么下面计算处理的 delta 就会很大。
这可能导致流量的爆发。为了避免这种情况,这里提前增加了一个特判
*/
if (tp->snd_cwnd_cnt >= w) {
tp->snd_cwnd_cnt = 0;
tp->snd_cwnd++;
}
/* 累计被确认的包的数目 */
tp->snd_cwnd_cnt += acked;
if (tp->snd_cwnd_cnt >= w) {
/* 窗口增大的大小应当为被确认的包的数目除以当前窗口大小。
* 以往都是直接加一,但直接加一并不是正确的加法增加 (AI) 的实现。
* 例如, w 为 10, acked 为 20 时,应当增加 20/10=2,而不是 1。
*/
u32 delta = tp->snd_cwnd_cnt / w;
tp->snd_cwnd_cnt -= delta * w;
tp->snd_cwnd += delta;
}
tp->snd_cwnd = min(tp->snd_cwnd, tp->snd_cwnd_clamp);
}
(4)重新计算epoch_start的值:
/* 如果目前没有任何数据包在传输了,那么需要重新设定epoch_start。这个是为了
解决当应用程序在一段时间内不发送任何数据时, now-epoch_start 会变得很大,由此,
根据 Cubic 函数计算出来的目标拥塞窗口值也会变得很大。但显然,这是一个错误。因此,
需要在应用程序重新开始发送数据时,重置epoch_start 的值。在这里CA_EVENT_TX_START事
件表明目前所有的包都已经被确认了(即没有任何正在传输的包),而应用程序又开始
发送新的数据包了。所有的包都被确认说明应用程序有一段时间没有发包。因而,在程
序又重新开始发包时,需要重新设定 epoch_start的值,以便在计算拥塞窗口的大小时,
仍能合理地遵循 cubic 函数的曲线*/
static void bictcp_cwnd_event(struct sock *sk, enum tcp_ca_event event)
{
if (event == CA_EVENT_TX_START) {
struct bictcp *ca = inet_csk_ca(sk);
u32 now = tcp_time_stamp;
s32 delta;
delta = now - tcp_sk(sk)->lsndtime;
/* We were application limited (idle) for a while.
* Shift epoch_start to keep cwnd growth to cubic curve.
*/
if (ca->epoch_start && delta > 0) {
ca->epoch_start += delta;
if (after(ca->epoch_start, now))
ca->epoch_start = now;
}
return;
}
}
(5)发送到收到ACK后,需要重新计算链路的延迟情况,以确认后续的各种窗口
/* Track delayed acknowledgment ratio using sliding window
* ratio = (15*ratio + sample) / 16
*/
static void bictcp_acked(struct sock *sk, const struct ack_sample *sample)
{
const struct inet_connection_sock *icsk = inet_csk(sk);
if (icsk->icsk_ca_state == TCP_CA_Open) {
struct bictcp *ca = inet_csk_ca(sk);
ca->delayed_ack += sample->pkts_acked -
(ca->delayed_ack >> ACK_RATIO_SHIFT);
}
}
总结:
1、拥塞控制本质:
(1)目的:控制发送方发数据包的速度,避免少数节点无节制发数据导致整个网络被占满,其他节点无法通信;但如果过度节制又会导致网络大幅空闲,利用率降低,所以需要在拥塞和利用率之间找到平衡!
(2)节点新加入时,不知道网络是否拥塞,只能不停地探测:依次同时发送1、2、4、8个.....(按照2^N增长)数据包!当然,这种指数级别的增长肯定是会到头的,达到以下3个条件之一后就要考虑减少同时发送的数据包了:
达到了人为事先设置的ssthresh值
等待的ack超时,可能是发送的包丢了,也可能是ack的包丢了;当然也可能没丢,还在路上了!
收到多个(一般是3个)冗余的ack,说明对方没收到前序的某个包
(3)减少同时发送的数据包,具体减少到多少了?一般是从一半的量重新开始!比如同时发送16个数据包时触发了上述3个条件之一,那么重新从每次发送8个数据包开始探测!
(4)重新开始探测后,每次同时发送的数据包需要增加多少个了?reno是线性增加,BIC是二分增加,cubic用的是3次方的公式计算增加值!
reno的线性增加:
bic的二分增加:
cubic的三次函数:
2、jhash是个比较好的hash算法,这个版本的实现如下:
/* jhash - hash an arbitrary key
* @k: sequence of bytes as key
* @length: the length of the key
* @initval: the previous hash, or an arbitray value
*
* The generic version, hashes an arbitrary sequence of bytes.
* No alignment or length assumptions are made about the input key.
*
* Returns the hash value of the key. The result depends on endianness.
*/
static inline u32 jhash(const void *key, u32 length, u32 initval)
{
u32 a, b, c;
const u8 *k = key;
/* Set up the internal state */
a = b = c = JHASH_INITVAL + length + initval;
/* All but the last block: affect some 32 bits of (a,b,c) */
while (length > 12) {
a += __get_unaligned_cpu32(k);
b += __get_unaligned_cpu32(k + 4);
c += __get_unaligned_cpu32(k + 8);
__jhash_mix(a, b, c);
length -= 12;
k += 12;
}
/* Last block: affect all 32 bits of (c) */
/* All the case statements fall through */
switch (length) {
case 12: c += (u32)k[11]<<24;
case 11: c += (u32)k[10]<<16;
case 10: c += (u32)k[9]<<8;
case 9: c += k[8];
case 8: b += (u32)k[7]<<24;
case 7: b += (u32)k[6]<<16;
case 6: b += (u32)k[5]<<8;
case 5: b += k[4];
case 4: a += (u32)k[3]<<24;
case 3: a += (u32)k[2]<<16;
case 2: a += (u32)k[1]<<8;
case 1: a += k[0];
__jhash_final(a, b, c);
case 0: /* Nothing left to add */
break;
}
return c;
}
参考:
1、https://www.bilibili.com/video/BV1L4411a7RN?from=search&seid=9607714680684056910 拥塞控制
2、https://www.bilibili.com/video/BV1MU4y157SY/ cubic拥塞控制
3、https://www.cnblogs.com/huang-xiang/p/13226229.html cubic拥塞控制算法
离线
页次: 1