公告

Gentoo交流群:87709706 欢迎您的加入

#1 2022-10-11 11:58:22

batsom
管理团队
注册时间: 2022-08-03
帖子: 586
个人网站

linux源码解读(十八):红黑树在内核的应用——timer定时器

定时器都知道吧?个人认为是linux最核心的功能之一了!比如线程sleep(5000),5s后再唤醒执行,cpu是怎么知道5s的时间到了?还有nginx这种反向代理每隔一段时间都要检测客户端的是否还在,如果掉线了就没必要再分配资源维护连接关系啦。那么间隔固定时间检测心跳的定时机制是怎么实现的了?

       1、(1) linux系统和时间相关最核心的变量就是jiffies!在include\linux\raid\pq.h中的定义如下:

# define jiffies    raid6_jiffies()
#define HZ 1000
//返回当前时间的毫秒值,比如时间戳
static inline uint32_t raid6_jiffies(void)
{
    struct timeval tv;
    gettimeofday(&tv, NULL);
    return tv.tv_sec*1000 //tv_sec是秒,乘以1000转成毫秒
            + tv.tv_usec/1000;//tv_usec是微秒,除以1000转成毫秒
}

  这段代码的信息量很大,最核心的有两点:

          有个宏定义是HZ,值是1000;既然HZ在这里和时间相关,猜都能猜到应该是毫秒单位,因为1s=1000ms;正规的定义:HZ表示1秒产生时钟中断的次数,这里是每秒产生1000次时钟中断,也就是每次时钟中断的间隔是1毫秒!
          gettimeofday函数把当前时间戳的值存放在了timeval结构体,两个字段分别是秒和微妙单位;最后raid6_jiffies统一转成毫秒单位返回。所以,这不就是个时间戳么?

       继续深入gettimeofday函数,发现最终是通过系统调用获取当前时间的!核心的结构体就是vsyscall_gtod_data!
FluxBB bbcode 测试

       如果是32位系统,jiffies最大值也就是2^32,超过这个值会产生溢出,回绕到0重新开始计数!所以为了处理回绕问题,linux内核专门提供了比较的方法:

#define time_after(a,b)         \
        (typecheck(unsigned long, a) && \
         typecheck(unsigned long, b) && \
         ((long)(b) - (long)(a) < 0))
#define time_before(a,b)        time_after(b,a)
#define time_after_eq(a,b)      \
        (typecheck(unsigned long, a) && \
         typecheck(unsigned long, b) && \
         ((long)(a) - (long)(b) >= 0))
#define time_before_eq(a,b)     time_after_eq(b,a)

       (2) 定时器、定时器,本质就是在特定的时间干特定的活!比如社畜码农早上7:20起床,7:50上班车,9点打开电脑开始搬砖等!在linux系统内核,怎么把特定时间和特定的动作关联在一起了?C++可以创建一个类,成员变量是时间,成员函数是特定的动作;linux内核用C写的,用结构体也能完成同样的功能。这一切都是用timer_list结构体实现的,如下:

struct timer_list {
    /*
     * All fields that change during normal runtime grouped to the
     * same cacheline
     */
    struct hlist_node    entry;//链表结构,串接timer_list
    unsigned long        expires;//到期时间,一般用jiffies+5*HZ:表示5秒后触发定时器的回到函数
    void            (*function)(unsigned long);//定时器时间到后的回调函数
    unsigned long        data;//回调函数的参数
    u32            flags;

#ifdef CONFIG_TIMER_STATS
    int            start_pid;
    void            *start_site;
    char            start_comm[16];
#endif
#ifdef CONFIG_LOCKDEP
    struct lockdep_map    lockdep_map;
#endif
};

       结构体定义好了,该怎么用了?先看一段demo代码,形象理解一下timer的用法:

#include <linux/module.h>
#include <linux/timer.h>
#include <linux/jiffies.h>
 
void time_pre(struct timer_list *timer);
struct timer_list mytimer;
// DEFINE_TIMER(mytimer, time_pre);
void time_pre(struct timer_list *timer)
{
    printk("%s\n", __func__);
    mytimer.expires = jiffies + 500 * HZ/1000;  // 500ms 运行一次
    mod_timer(&mytimer, mytimer.expires);       // 2.2 如果需要周期性执行任务,在定时器回调函数中添加 mod_timer
}
 
// 驱动接口
int __init chr_init(void)
{
    timer_setup(&mytimer, time_pre, 0);        // 1. 初始化
    mytimer.expires = jiffies + 500 * HZ/1000;  //0.5秒触发一次
    add_timer(&mytimer);                       // 2.1 向内核中添加定时器
    printk("init success\n");
    return 0;
}
 
void __exit chr_exit(void)
{
    if(timer_pending(&mytimer))
    {
        del_timer(&mytimer);    // 3.释放定时器
    }
    printk("exit Success \n");
}
 
module_init(chr_init);
module_exit(chr_exit);
 
MODULE_LICENSE("GPL");
MODULE_AUTHOR("XXX");
MODULE_DESCRIPTION("a simple char device example");

      大概的思路: 生成timer_list结构体并初始化,然后用add_timer注册刚才初始化的timer,等expire时间到后就调用callback回调函数!思路很简单,从这里能看到核心的函数时add_timer和mod_timer函数,而这两个函数最终都调用了__mod_timer函数;从函数的源码看,用的都是队列来组织timer的,传说中的红黑树了?
FluxBB bbcode 测试

      上面的这些定时器都是依赖HZ的,这种定时器称之为低分辨率定时器,从名字就能看出来精度不高。低分辨率定时器使用的timer wheel机制来管理系统中定时器。在timer wheel的机制下,系统中的定时器不是使用单一的链表进行管理。为了达到高效访问,并且消耗极小的cpu资源,linux系统采用了五个链表数组来进行管理(原理和进程调度的O(1)算法类似,把原来单一的队列按照优先级分成若干个):五个数组之间就像手表一样存在分秒进位的操作。在tv1中存放这timer_jiffies到timer_jiffies+256,也就是说tv1存放定时器范围为0-255。如果在每一个tick上可能有多个相同的定时器都要处理,这时候就使用链表将相同的定时器串在一起超时的时候处理。tv2有64个单元,每个单元有256个tick,因此tv2的超时范围为256-256*64-1(2^14 -1), 就这样一次类推到tv3, tv4, tv5上,各个tv的范围如下:
数组    idx范围
tv1   0--2^8-1
tv2    2^8--2^14-1
tv3    2^14--2^20-1
tv4    2^20--2^26-1
tv5    2^26--2^32-1
  整个timer wheel图示如下:
FluxBB bbcode 测试

   2、(1)低精度定时器已经不适用于某些要求高的场景了(比如看门狗、usb、ethernet、块设备、kvm等子系统);为了提高精度,同时兼容低版本的linux内核,需要重新设计新的定时器,取名为hrtimer。和hrtimer配套的结构体有好几个,为了直观感受这些结构体的关系,这里用下图表示:
FluxBB bbcode 测试

       (2)hrtimer结构体和timer_list类似,都有expire字段和回调函数字段。可以发现结构体之间地关系比较复杂,甚至还互相嵌套,怎么复杂的结构体,linux内核是怎么使用的了?

         不管是哪中定时器,首先要生成定时器的实例,主要记录expire时间和回调函数,所以先调用__hrtimer_init方法初始化定时器,代码如下:

static void __hrtimer_init(struct hrtimer *timer, clockid_t clock_id,
               enum hrtimer_mode mode)
{
    struct hrtimer_cpu_base *cpu_base;
    int base;

    memset(timer, 0, sizeof(struct hrtimer));
    //初始化hrtimer的base字段
    cpu_base = raw_cpu_ptr(&hrtimer_bases);

    if (clock_id == CLOCK_REALTIME && mode != HRTIMER_MODE_ABS)
        clock_id = CLOCK_MONOTONIC;

    base = hrtimer_clockid_to_base(clock_id);
    timer->base = &cpu_base->clock_base[base];
    //初始化红黑树的node节点
    timerqueue_init(&timer->node);

#ifdef CONFIG_TIMER_STATS
    timer->start_site = NULL;
    timer->start_pid = -1;
    memset(timer->start_comm, 0, TASK_COMM_LEN);
#endif
}

  核心功能就是给base字段的属性赋值,然后初始化红黑树的node节点!随后就是把该节点加入红黑树了,便于后续动态快速地增删改查定时器!构建红黑树的函数是hrtimer_start_range_ns,代码如下:

/**
 * hrtimer_start_range_ns - (re)start an hrtimer on the current CPU
 * @timer:    the timer to be added
 * @tim:    expiry time
 * @delta_ns:    "slack" range for the timer
 * @mode:    expiry mode: absolute (HRTIMER_MODE_ABS) or
 *        relative (HRTIMER_MODE_REL)
 */
void hrtimer_start_range_ns(struct hrtimer *timer, ktime_t tim,
                u64 delta_ns, const enum hrtimer_mode mode)
{
    struct hrtimer_clock_base *base, *new_base;
    unsigned long flags;
    int leftmost;

    base = lock_hrtimer_base(timer, &flags);

    /* Remove an active timer from the queue:
    最终是调用timerqueue_del函数从红黑树删除
    */
    remove_hrtimer(timer, base, true);
    /* 如果是相对时间,则需要加上当前时间,因为内部是使用绝对时间 */ 
    if (mode & HRTIMER_MODE_REL)
        tim = ktime_add_safe(tim, base->get_time());

    tim = hrtimer_update_lowres(timer, tim, mode);
    /* 设置到期的时间范围 */  
    hrtimer_set_expires_range_ns(timer, tim, delta_ns);

    /* Switch the timer base, if necessary: */
    new_base = switch_hrtimer_base(timer, base, mode & HRTIMER_MODE_PINNED);

    timer_stats_hrtimer_set_start_info(timer);
    /* 把hrtime按到期时间排序,加入到对应时间基准系统的红黑树中 */  
    /* 如果该定时器的是最早到期的,将会返回true 
    最终调用的是timerqueue_add函数
    */  
    leftmost = enqueue_hrtimer(timer, new_base);
    if (!leftmost)
        goto unlock;

    if (!hrtimer_is_hres_active(timer)) {
        /*
         * Kick to reschedule the next tick to handle the new timer
         * on dynticks target.
         */
        if (new_base->cpu_base->nohz_active)
            wake_up_nohz_cpu(new_base->cpu_base->cpu);
    } else {
        hrtimer_reprogram(timer, new_base);
    }
unlock:
    unlock_hrtimer_base(timer, &flags);
}

  (3)定时器的红黑树建好后,该怎么用了?既然和时间相关,必然绕不开的机制:时钟中断!计算机主板上有种特殊的硬件,每间隔相同的时长就会给cpu发出脉冲信号,作用相当于计算机的“脉搏”;cpu收到这个信号后可以做出某些动作回应,这个机制就是时钟中断。最常见的时钟中断动作就是进程切换了!然而除此之外,时钟中断还有个非常重要的作用:触发和管理定时器!回顾一下上述的结构体定义和使用流程,会发现多个进程可能会需要在同一时间触发定时器,图示如下:比如在第1s的时候,进程A和B都有定时器需要被触发;再比如第7s的时候,进程A、E、F也都有定时器需要被触发,操作系统都是怎么知道应该在什么时候触发哪些定时器的了?

https://img2022.cnblogs.com/blog/2052730/202201/2052730-20220128120520815-1974947758.png

       此时红黑树的作用就凸显了:每次发生时钟中断,除了必要的进程/线程切换,还需要检查红黑树,看看最左边节点的expire是不是已经到了,如果还没有就不处理,等下一个时钟中断再检查;如果已经到了,就执行该节点的回调函数,同时删除该节点;这个过程是在hrtimer_interrupt中执行的,该函数代码如下:

/*
 * High resolution timer interrupt
 * Called with interrupts disabled
 */
void hrtimer_interrupt(struct clock_event_device *dev)
{
    struct hrtimer_cpu_base *cpu_base = this_cpu_ptr(&hrtimer_bases);
    ktime_t expires_next, now, entry_time, delta;
    int retries = 0;

    BUG_ON(!cpu_base->hres_active);
    cpu_base->nr_events++;
    dev->next_event.tv64 = KTIME_MAX;

    raw_spin_lock(&cpu_base->lock);
    entry_time = now = hrtimer_update_base(cpu_base);
retry:
    cpu_base->in_hrtirq = 1;
    /*
     * We set expires_next to KTIME_MAX here with cpu_base->lock
     * held to prevent that a timer is enqueued in our queue via
     * the migration code. This does not affect enqueueing of
     * timers which run their callback and need to be requeued on
     * this CPU.
     */
    cpu_base->expires_next.tv64 = KTIME_MAX;
    /*查看红黑树的最下节点,如果到期就执行回调函数,并删除该节点*/
    __hrtimer_run_queues(cpu_base, now);

    /* Reevaluate the clock bases for the next expiry 
    找到下一个到期的定时器
    */
    expires_next = __hrtimer_get_next_event(cpu_base);
    /*
     * Store the new expiry value so the migration code can verify
     * against it.
     */
    cpu_base->expires_next = expires_next;
    cpu_base->in_hrtirq = 0;
    raw_spin_unlock(&cpu_base->lock);

    /* Reprogramming necessary ? */
    if (!tick_program_event(expires_next, 0)) {
        cpu_base->hang_detected = 0;
        return;
    }

    /*
     * The next timer was already expired due to:
     * - tracing
     * - long lasting callbacks
     * - being scheduled away when running in a VM
     *
     * We need to prevent that we loop forever in the hrtimer
     * interrupt routine. We give it 3 attempts to avoid
     * overreacting on some spurious event.
     *
     * Acquire base lock for updating the offsets and retrieving
     * the current time.
     */
    raw_spin_lock(&cpu_base->lock);
    now = hrtimer_update_base(cpu_base);
    cpu_base->nr_retries++;
    if (++retries < 3)
        goto retry;
    /*
     * Give the system a chance to do something else than looping
     * here. We stored the entry time, so we know exactly how long
     * we spent here. We schedule the next event this amount of
     * time away.
     */
    cpu_base->nr_hangs++;
    cpu_base->hang_detected = 1;
    raw_spin_unlock(&cpu_base->lock);
    delta = ktime_sub(now, entry_time);
    if ((unsigned int)delta.tv64 > cpu_base->max_hang_time)
        cpu_base->max_hang_time = (unsigned int) delta.tv64;
    /*
     * Limit it to a sensible value as we enforce a longer
     * delay. Give the CPU at least 100ms to catch up.
     */
    if (delta.tv64 > 100 * NSEC_PER_MSEC)
        expires_next = ktime_add_ns(now, 100 * NSEC_PER_MSEC);
    else
        expires_next = ktime_add(now, delta);
    tick_program_event(expires_next, 1);
    printk_once(KERN_WARNING "hrtimer: interrupt took %llu ns\n",
            ktime_to_ns(delta));
}

   最重要的就是__hrtimer_run_queues以及继续调用的__run_hrtimer函数了,代码如下,重要部分加了中文注释:

/*
 * The write_seqcount_barrier()s in __run_hrtimer() split the thing into 3
 * distinct sections:
 *
 *  - queued:    the timer is queued
 *  - callback:    the timer is being ran
 *  - post:    the timer is inactive or (re)queued
 *
 * On the read side we ensure we observe timer->state and cpu_base->running
 * from the same section, if anything changed while we looked at it, we retry.
 * This includes timer->base changing because sequence numbers alone are
 * insufficient for that.
 *
 * The sequence numbers are required because otherwise we could still observe
 * a false negative if the read side got smeared over multiple consequtive
 * __run_hrtimer() invocations.
 */

static void __run_hrtimer(struct hrtimer_cpu_base *cpu_base,
              struct hrtimer_clock_base *base,
              struct hrtimer *timer, ktime_t *now)
{
    enum hrtimer_restart (*fn)(struct hrtimer *);
    int restart;

    lockdep_assert_held(&cpu_base->lock);

    debug_deactivate(timer);
    cpu_base->running = timer;

    /*
     * Separate the ->running assignment from the ->state assignment.
     *
     * As with a regular write barrier, this ensures the read side in
     * hrtimer_active() cannot observe cpu_base->running == NULL &&
     * timer->state == INACTIVE.
     */
    raw_write_seqcount_barrier(&cpu_base->seq);
    /*调用timerqueue_del从红黑树删除节点*/
    __remove_hrtimer(timer, base, HRTIMER_STATE_INACTIVE, 0);
    timer_stats_account_hrtimer(timer);
    /*最重要的回调函数*/
    fn = timer->function;

    /*
     * Clear the 'is relative' flag for the TIME_LOW_RES case. If the
     * timer is restarted with a period then it becomes an absolute
     * timer. If its not restarted it does not matter.
     */
    if (IS_ENABLED(CONFIG_TIME_LOW_RES))
        timer->is_rel = false;

    /*
     * Because we run timers from hardirq context, there is no chance
     * they get migrated to another cpu, therefore its safe to unlock
     * the timer base. 
     定时器是被硬件层面的时钟中断触发的,所以这个回调函数肯定是当前cpu执行的
     */
    raw_spin_unlock(&cpu_base->lock);
    trace_hrtimer_expire_entry(timer, now);
    //终于执行了回调函数
    restart = fn(timer);
    trace_hrtimer_expire_exit(timer);
    raw_spin_lock(&cpu_base->lock);

    /*
     * Note: We clear the running state after enqueue_hrtimer and
     * we do not reprogram the event hardware. Happens either in
     * hrtimer_start_range_ns() or in hrtimer_interrupt()
     *
     * Note: Because we dropped the cpu_base->lock above,
     * hrtimer_start_range_ns() can have popped in and enqueued the timer
     * for us already.
     */
    if (restart != HRTIMER_NORESTART &&
        !(timer->state & HRTIMER_STATE_ENQUEUED))
        enqueue_hrtimer(timer, base);//调用timerqueue_add把timer加入红黑树

    /*
     * Separate the ->running assignment from the ->state assignment.
     *
     * As with a regular write barrier, this ensures the read side in
     * hrtimer_active() cannot observe cpu_base->running == NULL &&
     * timer->state == INACTIVE.
     */
    raw_write_seqcount_barrier(&cpu_base->seq);

    WARN_ON_ONCE(cpu_base->running != timer);
    cpu_base->running = NULL;
}

static void __hrtimer_run_queues(struct hrtimer_cpu_base *cpu_base, ktime_t now)
{
    struct hrtimer_clock_base *base = cpu_base->clock_base;
    unsigned int active = cpu_base->active_bases;
    /*遍历各个时间基准系统,查询每个hrtimer_clock_base对应红黑树的左下节点,
    判断它的时间是否到期,如果到期,通过__run_hrtimer函数,对到期定时器进行处理,
    包括:调用定时器的回调函数、从红黑树中移除该定时器、
           根据回调函数的返回值决定是否重新启动该定时器*/
    for (; active; base++, active >>= 1) {
        struct timerqueue_node *node;
        ktime_t basenow;

        if (!(active & 0x01))
            continue;

        basenow = ktime_add(now, base->offset);
        /*
        返回红黑树中的左下节点,之所以可以在while循环中使用该函数,
        是因为__run_hrtimer会在移除旧的左下节点时,
        新的左下节点会被更新到base->active->next字段中,
        使得循环可以继续执行,直到没有新的到期定时器为止
        */
        while ((node = timerqueue_getnext(&base->active))) {
            struct hrtimer *timer;

            timer = container_of(node, struct hrtimer, node);

            /*
             * The immediate goal for using the softexpires is
             * minimizing wakeups, not running timers at the
             * earliest interrupt after their soft expiration.
             * This allows us to avoid using a Priority Search
             * Tree, which can answer a stabbing querry for
             * overlapping intervals and instead use the simple
             * BST we already have.
             * We don't add extra wakeups by delaying timers that
             * are right-of a not yet expired timer, because that
             * timer will have to trigger a wakeup anyway.
             */
            if (basenow.tv64 < hrtimer_get_softexpires_tv64(timer))
                break;

            __run_hrtimer(cpu_base, base, timer, &basenow);
        }
    }
}

  lib\timerqueue.c文件中比较重要的3个工具函数:都是红黑树常规的操作!

/**
 * timerqueue_add - Adds timer to timerqueue.
 *
 * @head: head of timerqueue
 * @node: timer node to be added
 *
 * Adds the timer node to the timerqueue, sorted by the
 * node's expires value.
 */
bool timerqueue_add(struct timerqueue_head *head, struct timerqueue_node *node)
{
    struct rb_node **p = &head->head.rb_node;
    struct rb_node *parent = NULL;
    struct timerqueue_node  *ptr;

    /* Make sure we don't add nodes that are already added */
    WARN_ON_ONCE(!RB_EMPTY_NODE(&node->node));

    while (*p) {
        parent = *p;
        ptr = rb_entry(parent, struct timerqueue_node, node);
        if (node->expires.tv64 < ptr->expires.tv64)
            p = &(*p)->rb_left;
        else
            p = &(*p)->rb_right;
    }
    rb_link_node(&node->node, parent, p);
    rb_insert_color(&node->node, &head->head);

    if (!head->next || node->expires.tv64 < head->next->expires.tv64) {
        head->next = node;
        return true;
    }
    return false;
}
EXPORT_SYMBOL_GPL(timerqueue_add);

/**
 * timerqueue_del - Removes a timer from the timerqueue.
 *
 * @head: head of timerqueue
 * @node: timer node to be removed
 *
 * Removes the timer node from the timerqueue.
 */
bool timerqueue_del(struct timerqueue_head *head, struct timerqueue_node *node)
{
    WARN_ON_ONCE(RB_EMPTY_NODE(&node->node));

    /* update next pointer */
    if (head->next == node) {
        struct rb_node *rbn = rb_next(&node->node);

        head->next = rbn ?
            rb_entry(rbn, struct timerqueue_node, node) : NULL;
    }
    rb_erase(&node->node, &head->head);
    RB_CLEAR_NODE(&node->node);
    return head->next != NULL;
}
EXPORT_SYMBOL_GPL(timerqueue_del);

/**
 * timerqueue_iterate_next - Returns the timer after the provided timer
 *
 * @node: Pointer to a timer.
 *
 * Provides the timer that is after the given node. This is used, when
 * necessary, to iterate through the list of timers in a timer list
 * without modifying the list.
 */
struct timerqueue_node *timerqueue_iterate_next(struct timerqueue_node *node)
{
    struct rb_node *next;

    if (!node)
        return NULL;
    next = rb_next(&node->node);
    if (!next)
        return NULL;
    return container_of(next, struct timerqueue_node, node);
}
EXPORT_SYMBOL_GPL(timerqueue_iterate_next);

    整个函数的调用过程:tick_program_event(注册clock_event_device)->hrtimer_inerrupt->__hrtimer_run_queues->__run_hrtimer

  (4)定时器中最重要的莫过于执行回调函数了,研发人员设计了这么复杂的流程、结构体,最终的目的不就是为了在正确的时间执行正确的回调函数么? 由于回调函数是异步执行的,它类似于一种“软件中断”,而且是处于非进程的上下文中,所以回调函数有以下3点需要注意:

    没有 current 指针、不允许访问用户空间。因为没有进程上下文,相关代码和被中断的进程没有任何联系。
    不能执行休眠(或可能引起休眠的函数)和调度。
    任何被访问的数据结构都应该针对并发访问进行保护,以防止竞争条件。

3、为什么hrtimer比timer_list精度高?

      (1)低分辨率定时器的计时单位基于jiffies值的计数,也就是说,它的精度只有1/HZ。假如内核配置的HZ是1000,那意味着系统中的低分辨率定时器的精度就是1ms; 那么问题来了,为了提高精度,为啥不把HZ值设置地更大了?比如10000、1000000等?提高时钟中断频率也会产生副作用,中断频率越高,系统的负担就增加了,处理器需要花时间来执行中断处理程序,中断处理器占用cpu时间越多。这样处理器执行其他工作的时间及越少,并且还会打乱处理器高速缓存(进程切换导致地)。所以选择时钟中断频率时要考虑多方面,要取得各方面的折中的一个合适频率。

   (2)大部分时间里,time wheel可以实现O(1)时间复杂度。但是当有进位发生时,不可预测的O(N)定时器级联迁移时间大大地影响了定时器的精度;刚好红黑树的增删改查时间复杂度可以控制在O(lgN),再加上硬件的计数进步,所以可以比较好地把精度控制在纳秒级别!

     

参考:

1、https://blog.csdn.net/droidphone/article/details/8051405  低分辨率定时器的原理

2、https://blog.csdn.net/hongzg1982/article/details/54881361  高精度定时器hrtimer的原理

3、https://cloud.tencent.com/developer/article/1603333?from=15425 内核低分辨率定时器的实现

4、https://zhuanlan.zhihu.com/p/83078387 高精度定时器原理简介

离线

页脚

Powered by FluxBB

本站由XREA提供空间支持