公告

Gentoo交流群:87709706 欢迎您的加入

#1 2022-10-11 12:58:30

batsom
管理团队
注册时间: 2022-08-03
帖子: 594
个人网站

linux源码解读(二十二):网络通信简介——网络拥塞控制之cubic算法

网络拥塞的概念大家一定不陌生,肯定都有亲生体会:比如节假日的高速路堵车。本来车流量已经很大了,如果再不限制高速口的车进入,整条路只会越来越堵,所以交管部门可能会临时限流,只允许车辆下高速,不允许上高速!互联网刚发明的那会还没有拥塞的概念,各个节点死命地传输数据,导致网络中各种路由设备的buff瞬间被填满,新来的包只能丢弃(像不像针对网络中专设备的DOS攻击了?)!为了维持网络的正常运转,需要接入网络的所有节点有节制地发送数据,避免网络拥塞!但如果节点过于节制,发送的数据报文过少,又会降低整个网络的吞吐量、利用率(这些网络设备闲着也是闲着)等,怎么才能尽可能最大化吞吐量,又能避免网络拥塞了?拥塞控制算法诞生了!拥塞控制的算法很多,我这个版本采用的是cubic算法(算法细节可以参考链接2和3),本文以这个为准分析算法实现的细节!

  1、拥塞控制的控制引擎介绍:

  (1)由于拥塞控制涉及到很多复杂的计算公式,每个公式都有很多计算因子factor,所以要先给这些factor初始化赋值,这些赋值在cubictcp_register函数中统一完成,如下:

/*初始化各种计算的factor*/
static int __init cubictcp_register(void)
{
    BUILD_BUG_ON(sizeof(struct bictcp) > ICSK_CA_PRIV_SIZE);

    /* Precompute a bunch of the scaling factors that are used per-packet
     * based on SRTT of 100ms;计算SRTT=100ms时的缩放因子
     */

    beta_scale = 8*(BICTCP_BETA_SCALE+beta) / 3
        / (BICTCP_BETA_SCALE - beta);

    cube_rtt_scale = (bic_scale * 10);    /* 1024*c/rtt */

    /* calculate the "K" for (wmax-cwnd) = c/rtt * K^3
     *  so K = cubic_root( (wmax-cwnd)*rtt/c )
     * the unit of K is bictcp_HZ=2^10, not HZ
     *
     *  c = bic_scale >> 10
     *  rtt = 100ms
     *
     * the following code has been designed and tested for
     * cwnd < 1 million packets
     * RTT < 100 seconds
     * HZ < 1,000,00  (corresponding to 10 nano-second)
     */

    /* 1/c * 2^2*bictcp_HZ * srtt */
    cube_factor = 1ull << (10+3*BICTCP_HZ); /* 2^40 */

    /* divide by bic_scale and by constant Srtt (100ms) */
    do_div(cube_factor, bic_scale * 10);

    return tcp_register_congestion_control(&cubictcp);
}

  (2)由于cubic只是拥塞控制的算法之一(还有其他好几种算法),并且未来可能还会诞生新的拥塞控制算法,为了便于管理现有算法,同时也能给新增的算法预留空间,linux采用了链表来保存/注册所有的拥塞控制算法,实现的方法就是初始化函数最后一行的tcp_register_congestion_control,如下:

/*
 * Attach new congestion control algorithm to the list
 * of available options.
 */
int tcp_register_congestion_control(struct tcp_congestion_ops *ca)
{
    int ret = 0;

    /* all algorithms must implement ssthresh and cong_avoid ops */
    if (!ca->ssthresh || !(ca->cong_avoid || ca->cong_control)) {
        pr_err("%s does not implement required ops\n", ca->name);
        return -EINVAL;
    }
    //根据名称、长度等计算出一个hash值,加快比对速度(早期版本是对比字符串,效率低;key是32bit的整数,比对效率比字符串快多了)
    ca->key = jhash(ca->name, sizeof(ca->name), strlen(ca->name));

    spin_lock(&tcp_cong_list_lock);
    if (ca->key == TCP_CA_UNSPEC || tcp_ca_find_key(ca->key)) {//检查一下是不是已经在list里面了
        pr_notice("%s already registered or non-unique key\n",
              ca->name);
        ret = -EEXIST;
    } else {//新的拥塞控制算法加入链表
        list_add_tail_rcu(&ca->list, &tcp_cong_list);
        pr_debug("%s registered\n", ca->name);
    }
    spin_unlock(&tcp_cong_list_lock);

    return ret;
}

  (3)从注释的地方可以看到加入链表的是tcp_congestion_ops结构体,这个结构体也是所有拥塞控制算法需要实现的结构体;每次有拥塞算法被发明后,就实现这个结构体中的部分甚至全部方法,然后调用上面的注册方法把新算法加入链表,后续可以通过遍历链表的方式选择合适的拥塞控制算法!结构体如下:

struct tcp_congestion_ops {
    struct list_head    list;
    u32 key;/* 算法名称的哈希值 */
    u32 flags;

    /* initialize private data (optional) */
    void (*init)(struct sock *sk);
    /* cleanup private data  (optional) */
    void (*release)(struct sock *sk);

    /* return slow start threshold (required) */
    u32 (*ssthresh)(struct sock *sk);
    /* do new cwnd calculation (required) */
    void (*cong_avoid)(struct sock *sk, u32 ack, u32 acked);
    /* call before changing ca_state (optional) */
    void (*set_state)(struct sock *sk, u8 new_state);
    /* call when cwnd event occurs (optional) */
    void (*cwnd_event)(struct sock *sk, enum tcp_ca_event ev);
    /* call when ack arrives (optional) */
    void (*in_ack_event)(struct sock *sk, u32 flags);
    /* new value of cwnd after loss (optional) */
    u32  (*undo_cwnd)(struct sock *sk);
    /* hook for packet ack accounting (optional) */
    void (*pkts_acked)(struct sock *sk, const struct ack_sample *sample);
    /* suggest number of segments for each skb to transmit (optional) */
    u32 (*tso_segs_goal)(struct sock *sk);
    /* returns the multiplier used in tcp_sndbuf_expand (optional) */
    u32 (*sndbuf_expand)(struct sock *sk);
    /* call when packets are delivered to update cwnd and pacing rate,
     * after all the ca_state processing. (optional)
     */
    void (*cong_control)(struct sock *sk, const struct rate_sample *rs);
    /* get info for inet_diag (optional) */
    size_t (*get_info)(struct sock *sk, u32 ext, int *attr,
               union tcp_cc_info *info);

    char         name[TCP_CA_NAME_MAX];/* 拥塞控制算法的名称 */
    struct module     *owner;
};

  有成员字段,也有函数指针;函数名是固定的,也就是说函数的功能都是固定的,每种拥塞控制算法各自实现这些函数功能就行了,这个思路和驱动类似:linux定义了驱动的接口函数名称,每个驱动厂家自己实现接口函数的功能!事实上拥塞算法也是和驱动一样,都是以模块的形式加载到内核的!

   (4)上面的结构体定义了每种拥塞控制算法的接口名称和功能,那么具体到cubic算法,这些功能都是在哪些函数实现的了?在net\ipv4\tcp_cubic.c文件里面做的映射:

static struct tcp_congestion_ops cubictcp __read_mostly = {
    .init        = bictcp_init,
    .ssthresh    = bictcp_recalc_ssthresh,
    .cong_avoid    = bictcp_cong_avoid,
    .set_state    = bictcp_state,
    .undo_cwnd    = bictcp_undo_cwnd,
    .cwnd_event    = bictcp_cwnd_event,
    .pkts_acked     = bictcp_acked,
    .owner        = THIS_MODULE,
    .name        = "cubic",
};

  这个结构体的名称叫cubictcp,刚好就在文章开头介绍的cubictcp_register函数中注册的!

       2、上述介绍围绕着初始化、注册等重要功能,这些功能抽象出的框架就是拥塞控制引擎!尽管拥塞算法实现的细节可以不同,但是使用的引擎框架都是一样的,不得不佩服linux研发人员的模块抽象和概括能力(好多大厂职级晋升答辩的重要考核之一就是这种模块总结抽象、举一反三的能力!);

      (1)cubic算法的实现涉及到很变量,为了统一管理,这了变量都定义在bictcp结构体中了,如下:

/* BIC TCP Parameters */
struct bictcp {
    u32    cnt;        /* increase cwnd by 1 after ACKs; 每次 cwnd 增长 1/cnt 的比例*/
    u32    last_max_cwnd;    /* last maximum snd_cwnd; snd_cwnd 之前的最大值*/
    u32    loss_cwnd;    /* congestion window at last loss;最近一次发生丢失的时候的拥塞窗口 */
    u32    last_cwnd;    /* the last snd_cwnd;最近的 snd_cwnd */
    u32    last_time;    /* time when updated last_cwnd;更新 last_cwnd 的时间 */
    u32    epoch_start;    /* beginning of an epoch;一轮的开始 */
#define ACK_RATIO_SHIFT    4
    /*限制了delayed_ack的最大值为(32 << 4),也就是说Packets/ACKs的最大估计值限制为32;
    为什么要增加这个限制呢?这是因为如果发送窗口很大,并且这个窗口的数据包的ACK大量丢失,那
    么发送端就会得到一个累积确认了非常多数据包的ACK,这会造成delayed_ack值剧烈的增大。如果
    一个ACK累积确认的数据包超过4096个,那么16位的delayed_ack就会溢出,最后的值可能为0。我
    们知道delayed_ack在bictcp_update中是作为除数的,这时会产生除数为0的错误*/
    u32    delayed_ack;    /* estimate the ratio of Packets/ACKs << 4 */
};

  (2)这么多变量,使用之前肯定是要初始化的,如下:

//将必要的参数都初始化为 0 
static inline void bictcp_reset(struct bictcp *ca)
{
    ca->cnt = 0;
    ca->last_max_cwnd = 0;
    ca->last_cwnd = 0;
    ca->last_time = 0;
    ca->epoch_start = 0;
    ca->delayed_ack = 2 << ACK_RATIO_SHIFT;
}

static void bictcp_init(struct sock *sk)
{
    struct bictcp *ca = inet_csk_ca(sk);

    bictcp_reset(ca);
    ca->loss_cwnd = 0;//此时尚未发送任何丢失,所以初始化为 0

    if (initial_ssthresh)
        tcp_sk(sk)->snd_ssthresh = initial_ssthresh;
}

  (2)这么多变量,使用之前肯定是要初始化的,如下:

//将必要的参数都初始化为 0 
static inline void bictcp_reset(struct bictcp *ca)
{
    ca->cnt = 0;
    ca->last_max_cwnd = 0;
    ca->last_cwnd = 0;
    ca->last_time = 0;
    ca->epoch_start = 0;
    ca->delayed_ack = 2 << ACK_RATIO_SHIFT;
}

static void bictcp_init(struct sock *sk)
{
    struct bictcp *ca = inet_csk_ca(sk);

    bictcp_reset(ca);
    ca->loss_cwnd = 0;//此时尚未发送任何丢失,所以初始化为 0

    if (initial_ssthresh)
        tcp_sk(sk)->snd_ssthresh = initial_ssthresh;
}

   (3)算法开始执行后,需要计算ssthresh,在bictcp_recalc_ssthresh中实现:

/*
 *    behave like Reno until low_window is reached,
 *    then increase congestion window slowly
 */
static u32 bictcp_recalc_ssthresh(struct sock *sk)
{
    const struct tcp_sock *tp = tcp_sk(sk);
    struct bictcp *ca = inet_csk_ca(sk);

    ca->epoch_start = 0;    /* end of epoch */

    /* Wmax and fast convergence */
    if (tp->snd_cwnd < ca->last_max_cwnd && fast_convergence)
        ca->last_max_cwnd = (tp->snd_cwnd * (BICTCP_BETA_SCALE + beta))
            / (2 * BICTCP_BETA_SCALE);
    else
        ca->last_max_cwnd = tp->snd_cwnd;

    ca->loss_cwnd = tp->snd_cwnd;

    if (tp->snd_cwnd <= low_window)
        return max(tp->snd_cwnd >> 1U, 2U);
    else
        return max((tp->snd_cwnd * beta) / BICTCP_BETA_SCALE, 2U);
}

  这里面最核心的就是Fast Convergence机制了!在网络中,一旦有新的节点加入通信,肯定是要占用网络带宽的,势必需要其他正在通信的节点让渡出部分带宽。为了让旧节点尽快释放带宽,这里采用了Fast Convergence机制:每次发生丢包后,会对比此次丢包时拥塞窗口的大小和之前的拥塞窗口大小,如小于上次的拥塞窗口,说明有新节点加入通信,占用了部分带宽。此时旧节点需要多留一些带宽给新节点使用,以使得网络尽快收敛到稳定状态!

(3)当发生丢包时,记录当前窗口为W-max, 减小窗口后的大小为W,BIC算法就是根据这个原理在(W, W-max]区间内做二分搜索;当接近于W-max时曲线应该更平滑,当离W-max较远的时候,曲线可以更加陡峭;

/*
 * Compute congestion window to use.
   二分法重新计算拥塞窗口
 */
static inline void bictcp_update(struct bictcp *ca, u32 cwnd)
{
    if (ca->last_cwnd == cwnd &&
        (s32)(tcp_time_stamp - ca->last_time) <= HZ / 32)
        return;

    ca->last_cwnd = cwnd;
    ca->last_time = tcp_time_stamp;

    if (ca->epoch_start == 0) /* record the beginning of an epoch */
        ca->epoch_start = tcp_time_stamp;

    /* start off normal */
    if (cwnd <= low_window) {
        ca->cnt = cwnd;
        return;
    }

    /* binary increase :二分增加;相比reno的线性增加,这里的效率明显高很多*/
    if (cwnd < ca->last_max_cwnd) {
        __u32    dist = (ca->last_max_cwnd - cwnd)
            / BICTCP_B;

        if (dist > max_increment)
            /* linear increase */
            ca->cnt = cwnd / max_increment;
        else if (dist <= 1U)
            /* binary search increase */
            ca->cnt = (cwnd * smooth_part) / BICTCP_B;
        else
            /* binary search increase */
            ca->cnt = cwnd / dist;
    } else {
        /* slow start AMD linear increase */
        if (cwnd < ca->last_max_cwnd + BICTCP_B)
            /* slow start */
            ca->cnt = (cwnd * smooth_part) / BICTCP_B;
        else if (cwnd < ca->last_max_cwnd + max_increment*(BICTCP_B-1))
            /* slow start */
            ca->cnt = (cwnd * (BICTCP_B-1))
                / (cwnd - ca->last_max_cwnd);
        else
            /* linear increase */
            ca->cnt = cwnd / max_increment;
    }

    /* if in slow start or link utilization is very low */
    if (ca->last_max_cwnd == 0) {
        if (ca->cnt > 20) /* increase cwnd 5% per RTT */
            ca->cnt = 20;
    }

    ca->cnt = (ca->cnt << ACK_RATIO_SHIFT) / ca->delayed_ack;
    if (ca->cnt == 0)            /* cannot be zero */
        ca->cnt = 1;
}

static void bictcp_cong_avoid(struct sock *sk, u32 ack, u32 acked)
{
    struct tcp_sock *tp = tcp_sk(sk);
    struct bictcp *ca = inet_csk_ca(sk);

    if (!tcp_is_cwnd_limited(sk))
        return;

    if (tcp_in_slow_start(tp))
        tcp_slow_start(tp, acked);
    else {
        bictcp_update(ca, tp->snd_cwnd);//计算ca->cnt, 表示增加一个cwnd需要的ack数量
        tcp_cong_avoid_ai(tp, ca->cnt, 1);// 根据ca->cnt计算snd_cwnd
    }
}

  计算snd_cwnd值:

/* In theory this is tp->snd_cwnd += 1 / tp->snd_cwnd (or alternative w),
 * for every packet that was ACKed.
 */
void tcp_cong_avoid_ai(struct tcp_sock *tp, u32 w, u32 acked)
{
    /* If credits accumulated at a higher w, apply them gently now. */
    /*如果 w 很大,那么, snd_cwnd_cnt 可能会积累为一个很大的值;
      此后, w 由于种种原因突然被缩小了很多。那么下面计算处理的 delta 就会很大。
      这可能导致流量的爆发。为了避免这种情况,这里提前增加了一个特判
    */
    if (tp->snd_cwnd_cnt >= w) {
        tp->snd_cwnd_cnt = 0;
        tp->snd_cwnd++;
    }
    /* 累计被确认的包的数目 */
    tp->snd_cwnd_cnt += acked;
    if (tp->snd_cwnd_cnt >= w) {
        /* 窗口增大的大小应当为被确认的包的数目除以当前窗口大小。
        * 以往都是直接加一,但直接加一并不是正确的加法增加 (AI) 的实现。
        * 例如, w 为 10, acked 为 20 时,应当增加 20/10=2,而不是 1。
        */
        u32 delta = tp->snd_cwnd_cnt / w;

        tp->snd_cwnd_cnt -= delta * w;
        tp->snd_cwnd += delta;
    }
    tp->snd_cwnd = min(tp->snd_cwnd, tp->snd_cwnd_clamp);
}

  (4)重新计算epoch_start的值:

/*  如果目前没有任何数据包在传输了,那么需要重新设定epoch_start。这个是为了
    解决当应用程序在一段时间内不发送任何数据时, now-epoch_start 会变得很大,由此,
    根据 Cubic 函数计算出来的目标拥塞窗口值也会变得很大。但显然,这是一个错误。因此,
    需要在应用程序重新开始发送数据时,重置epoch_start 的值。在这里CA_EVENT_TX_START事
    件表明目前所有的包都已经被确认了(即没有任何正在传输的包),而应用程序又开始
    发送新的数据包了。所有的包都被确认说明应用程序有一段时间没有发包。因而,在程
    序又重新开始发包时,需要重新设定 epoch_start的值,以便在计算拥塞窗口的大小时,
    仍能合理地遵循 cubic 函数的曲线*/
static void bictcp_cwnd_event(struct sock *sk, enum tcp_ca_event event)
{
    if (event == CA_EVENT_TX_START) {
        struct bictcp *ca = inet_csk_ca(sk);
        u32 now = tcp_time_stamp;
        s32 delta;

        delta = now - tcp_sk(sk)->lsndtime;

        /* We were application limited (idle) for a while.
         * Shift epoch_start to keep cwnd growth to cubic curve.
         */
        if (ca->epoch_start && delta > 0) {
            ca->epoch_start += delta;
            if (after(ca->epoch_start, now))
                ca->epoch_start = now;
        }
        return;
    }
}

  (5)发送到收到ACK后,需要重新计算链路的延迟情况,以确认后续的各种窗口

/* Track delayed acknowledgment ratio using sliding window
 * ratio = (15*ratio + sample) / 16
 */
static void bictcp_acked(struct sock *sk, const struct ack_sample *sample)
{
    const struct inet_connection_sock *icsk = inet_csk(sk);

    if (icsk->icsk_ca_state == TCP_CA_Open) {
        struct bictcp *ca = inet_csk_ca(sk);

        ca->delayed_ack += sample->pkts_acked -
            (ca->delayed_ack >> ACK_RATIO_SHIFT);
    }
}

总结:

1、拥塞控制本质:

(1)目的:控制发送方发数据包的速度,避免少数节点无节制发数据导致整个网络被占满,其他节点无法通信;但如果过度节制又会导致网络大幅空闲,利用率降低,所以需要在拥塞和利用率之间找到平衡!

(2)节点新加入时,不知道网络是否拥塞,只能不停地探测:依次同时发送1、2、4、8个.....(按照2^N增长)数据包!当然,这种指数级别的增长肯定是会到头的,达到以下3个条件之一后就要考虑减少同时发送的数据包了:

    达到了人为事先设置的ssthresh值
    等待的ack超时,可能是发送的包丢了,也可能是ack的包丢了;当然也可能没丢,还在路上了!
    收到多个(一般是3个)冗余的ack,说明对方没收到前序的某个包

(3)减少同时发送的数据包,具体减少到多少了?一般是从一半的量重新开始!比如同时发送16个数据包时触发了上述3个条件之一,那么重新从每次发送8个数据包开始探测!

(4)重新开始探测后,每次同时发送的数据包需要增加多少个了?reno是线性增加,BIC是二分增加,cubic用的是3次方的公式计算增加值!   

     reno的线性增加:
FluxBB bbcode 测试

     bic的二分增加:
FluxBB bbcode 测试

    cubic的三次函数:
FluxBB bbcode 测试

2、jhash是个比较好的hash算法,这个版本的实现如下:

/* jhash - hash an arbitrary key
 * @k: sequence of bytes as key
 * @length: the length of the key
 * @initval: the previous hash, or an arbitray value
 *
 * The generic version, hashes an arbitrary sequence of bytes.
 * No alignment or length assumptions are made about the input key.
 *
 * Returns the hash value of the key. The result depends on endianness.
 */
static inline u32 jhash(const void *key, u32 length, u32 initval)
{
    u32 a, b, c;
    const u8 *k = key;

    /* Set up the internal state */
    a = b = c = JHASH_INITVAL + length + initval;

    /* All but the last block: affect some 32 bits of (a,b,c) */
    while (length > 12) {
        a += __get_unaligned_cpu32(k);
        b += __get_unaligned_cpu32(k + 4);
        c += __get_unaligned_cpu32(k + 8);
        __jhash_mix(a, b, c);
        length -= 12;
        k += 12;
    }
    /* Last block: affect all 32 bits of (c) */
    /* All the case statements fall through */
    switch (length) {
    case 12: c += (u32)k[11]<<24;
    case 11: c += (u32)k[10]<<16;
    case 10: c += (u32)k[9]<<8;
    case 9:  c += k[8];
    case 8:  b += (u32)k[7]<<24;
    case 7:  b += (u32)k[6]<<16;
    case 6:  b += (u32)k[5]<<8;
    case 5:  b += k[4];
    case 4:  a += (u32)k[3]<<24;
    case 3:  a += (u32)k[2]<<16;
    case 2:  a += (u32)k[1]<<8;
    case 1:  a += k[0];
         __jhash_final(a, b, c);
    case 0: /* Nothing left to add */
        break;
    }

    return c;
}

参考:

1、https://www.bilibili.com/video/BV1L4411a7RN?from=search&seid=9607714680684056910  拥塞控制

2、https://www.bilibili.com/video/BV1MU4y157SY/ cubic拥塞控制

3、https://www.cnblogs.com/huang-xiang/p/13226229.html  cubic拥塞控制算法

离线

页脚

Powered by FluxBB

本站由XREA提供空间支持