页次: 1
定时器都知道吧?个人认为是linux最核心的功能之一了!比如线程sleep(5000),5s后再唤醒执行,cpu是怎么知道5s的时间到了?还有nginx这种反向代理每隔一段时间都要检测客户端的是否还在,如果掉线了就没必要再分配资源维护连接关系啦。那么间隔固定时间检测心跳的定时机制是怎么实现的了?
1、(1) linux系统和时间相关最核心的变量就是jiffies!在include\linux\raid\pq.h中的定义如下:
# define jiffies raid6_jiffies()
#define HZ 1000
//返回当前时间的毫秒值,比如时间戳
static inline uint32_t raid6_jiffies(void)
{
struct timeval tv;
gettimeofday(&tv, NULL);
return tv.tv_sec*1000 //tv_sec是秒,乘以1000转成毫秒
+ tv.tv_usec/1000;//tv_usec是微秒,除以1000转成毫秒
}
这段代码的信息量很大,最核心的有两点:
有个宏定义是HZ,值是1000;既然HZ在这里和时间相关,猜都能猜到应该是毫秒单位,因为1s=1000ms;正规的定义:HZ表示1秒产生时钟中断的次数,这里是每秒产生1000次时钟中断,也就是每次时钟中断的间隔是1毫秒!
gettimeofday函数把当前时间戳的值存放在了timeval结构体,两个字段分别是秒和微妙单位;最后raid6_jiffies统一转成毫秒单位返回。所以,这不就是个时间戳么?
继续深入gettimeofday函数,发现最终是通过系统调用获取当前时间的!核心的结构体就是vsyscall_gtod_data!
如果是32位系统,jiffies最大值也就是2^32,超过这个值会产生溢出,回绕到0重新开始计数!所以为了处理回绕问题,linux内核专门提供了比较的方法:
#define time_after(a,b) \
(typecheck(unsigned long, a) && \
typecheck(unsigned long, b) && \
((long)(b) - (long)(a) < 0))
#define time_before(a,b) time_after(b,a)
#define time_after_eq(a,b) \
(typecheck(unsigned long, a) && \
typecheck(unsigned long, b) && \
((long)(a) - (long)(b) >= 0))
#define time_before_eq(a,b) time_after_eq(b,a)
(2) 定时器、定时器,本质就是在特定的时间干特定的活!比如社畜码农早上7:20起床,7:50上班车,9点打开电脑开始搬砖等!在linux系统内核,怎么把特定时间和特定的动作关联在一起了?C++可以创建一个类,成员变量是时间,成员函数是特定的动作;linux内核用C写的,用结构体也能完成同样的功能。这一切都是用timer_list结构体实现的,如下:
struct timer_list {
/*
* All fields that change during normal runtime grouped to the
* same cacheline
*/
struct hlist_node entry;//链表结构,串接timer_list
unsigned long expires;//到期时间,一般用jiffies+5*HZ:表示5秒后触发定时器的回到函数
void (*function)(unsigned long);//定时器时间到后的回调函数
unsigned long data;//回调函数的参数
u32 flags;
#ifdef CONFIG_TIMER_STATS
int start_pid;
void *start_site;
char start_comm[16];
#endif
#ifdef CONFIG_LOCKDEP
struct lockdep_map lockdep_map;
#endif
};
结构体定义好了,该怎么用了?先看一段demo代码,形象理解一下timer的用法:
#include <linux/module.h>
#include <linux/timer.h>
#include <linux/jiffies.h>
void time_pre(struct timer_list *timer);
struct timer_list mytimer;
// DEFINE_TIMER(mytimer, time_pre);
void time_pre(struct timer_list *timer)
{
printk("%s\n", __func__);
mytimer.expires = jiffies + 500 * HZ/1000; // 500ms 运行一次
mod_timer(&mytimer, mytimer.expires); // 2.2 如果需要周期性执行任务,在定时器回调函数中添加 mod_timer
}
// 驱动接口
int __init chr_init(void)
{
timer_setup(&mytimer, time_pre, 0); // 1. 初始化
mytimer.expires = jiffies + 500 * HZ/1000; //0.5秒触发一次
add_timer(&mytimer); // 2.1 向内核中添加定时器
printk("init success\n");
return 0;
}
void __exit chr_exit(void)
{
if(timer_pending(&mytimer))
{
del_timer(&mytimer); // 3.释放定时器
}
printk("exit Success \n");
}
module_init(chr_init);
module_exit(chr_exit);
MODULE_LICENSE("GPL");
MODULE_AUTHOR("XXX");
MODULE_DESCRIPTION("a simple char device example");
大概的思路: 生成timer_list结构体并初始化,然后用add_timer注册刚才初始化的timer,等expire时间到后就调用callback回调函数!思路很简单,从这里能看到核心的函数时add_timer和mod_timer函数,而这两个函数最终都调用了__mod_timer函数;从函数的源码看,用的都是队列来组织timer的,传说中的红黑树了?
上面的这些定时器都是依赖HZ的,这种定时器称之为低分辨率定时器,从名字就能看出来精度不高。低分辨率定时器使用的timer wheel机制来管理系统中定时器。在timer wheel的机制下,系统中的定时器不是使用单一的链表进行管理。为了达到高效访问,并且消耗极小的cpu资源,linux系统采用了五个链表数组来进行管理(原理和进程调度的O(1)算法类似,把原来单一的队列按照优先级分成若干个):五个数组之间就像手表一样存在分秒进位的操作。在tv1中存放这timer_jiffies到timer_jiffies+256,也就是说tv1存放定时器范围为0-255。如果在每一个tick上可能有多个相同的定时器都要处理,这时候就使用链表将相同的定时器串在一起超时的时候处理。tv2有64个单元,每个单元有256个tick,因此tv2的超时范围为256-256*64-1(2^14 -1), 就这样一次类推到tv3, tv4, tv5上,各个tv的范围如下:
数组 idx范围
tv1 0--2^8-1
tv2 2^8--2^14-1
tv3 2^14--2^20-1
tv4 2^20--2^26-1
tv5 2^26--2^32-1
整个timer wheel图示如下:
2、(1)低精度定时器已经不适用于某些要求高的场景了(比如看门狗、usb、ethernet、块设备、kvm等子系统);为了提高精度,同时兼容低版本的linux内核,需要重新设计新的定时器,取名为hrtimer。和hrtimer配套的结构体有好几个,为了直观感受这些结构体的关系,这里用下图表示:
(2)hrtimer结构体和timer_list类似,都有expire字段和回调函数字段。可以发现结构体之间地关系比较复杂,甚至还互相嵌套,怎么复杂的结构体,linux内核是怎么使用的了?
不管是哪中定时器,首先要生成定时器的实例,主要记录expire时间和回调函数,所以先调用__hrtimer_init方法初始化定时器,代码如下:
static void __hrtimer_init(struct hrtimer *timer, clockid_t clock_id,
enum hrtimer_mode mode)
{
struct hrtimer_cpu_base *cpu_base;
int base;
memset(timer, 0, sizeof(struct hrtimer));
//初始化hrtimer的base字段
cpu_base = raw_cpu_ptr(&hrtimer_bases);
if (clock_id == CLOCK_REALTIME && mode != HRTIMER_MODE_ABS)
clock_id = CLOCK_MONOTONIC;
base = hrtimer_clockid_to_base(clock_id);
timer->base = &cpu_base->clock_base[base];
//初始化红黑树的node节点
timerqueue_init(&timer->node);
#ifdef CONFIG_TIMER_STATS
timer->start_site = NULL;
timer->start_pid = -1;
memset(timer->start_comm, 0, TASK_COMM_LEN);
#endif
}
核心功能就是给base字段的属性赋值,然后初始化红黑树的node节点!随后就是把该节点加入红黑树了,便于后续动态快速地增删改查定时器!构建红黑树的函数是hrtimer_start_range_ns,代码如下:
/**
* hrtimer_start_range_ns - (re)start an hrtimer on the current CPU
* @timer: the timer to be added
* @tim: expiry time
* @delta_ns: "slack" range for the timer
* @mode: expiry mode: absolute (HRTIMER_MODE_ABS) or
* relative (HRTIMER_MODE_REL)
*/
void hrtimer_start_range_ns(struct hrtimer *timer, ktime_t tim,
u64 delta_ns, const enum hrtimer_mode mode)
{
struct hrtimer_clock_base *base, *new_base;
unsigned long flags;
int leftmost;
base = lock_hrtimer_base(timer, &flags);
/* Remove an active timer from the queue:
最终是调用timerqueue_del函数从红黑树删除
*/
remove_hrtimer(timer, base, true);
/* 如果是相对时间,则需要加上当前时间,因为内部是使用绝对时间 */
if (mode & HRTIMER_MODE_REL)
tim = ktime_add_safe(tim, base->get_time());
tim = hrtimer_update_lowres(timer, tim, mode);
/* 设置到期的时间范围 */
hrtimer_set_expires_range_ns(timer, tim, delta_ns);
/* Switch the timer base, if necessary: */
new_base = switch_hrtimer_base(timer, base, mode & HRTIMER_MODE_PINNED);
timer_stats_hrtimer_set_start_info(timer);
/* 把hrtime按到期时间排序,加入到对应时间基准系统的红黑树中 */
/* 如果该定时器的是最早到期的,将会返回true
最终调用的是timerqueue_add函数
*/
leftmost = enqueue_hrtimer(timer, new_base);
if (!leftmost)
goto unlock;
if (!hrtimer_is_hres_active(timer)) {
/*
* Kick to reschedule the next tick to handle the new timer
* on dynticks target.
*/
if (new_base->cpu_base->nohz_active)
wake_up_nohz_cpu(new_base->cpu_base->cpu);
} else {
hrtimer_reprogram(timer, new_base);
}
unlock:
unlock_hrtimer_base(timer, &flags);
}
(3)定时器的红黑树建好后,该怎么用了?既然和时间相关,必然绕不开的机制:时钟中断!计算机主板上有种特殊的硬件,每间隔相同的时长就会给cpu发出脉冲信号,作用相当于计算机的“脉搏”;cpu收到这个信号后可以做出某些动作回应,这个机制就是时钟中断。最常见的时钟中断动作就是进程切换了!然而除此之外,时钟中断还有个非常重要的作用:触发和管理定时器!回顾一下上述的结构体定义和使用流程,会发现多个进程可能会需要在同一时间触发定时器,图示如下:比如在第1s的时候,进程A和B都有定时器需要被触发;再比如第7s的时候,进程A、E、F也都有定时器需要被触发,操作系统都是怎么知道应该在什么时候触发哪些定时器的了?
https://img2022.cnblogs.com/blog/2052730/202201/2052730-20220128120520815-1974947758.png
此时红黑树的作用就凸显了:每次发生时钟中断,除了必要的进程/线程切换,还需要检查红黑树,看看最左边节点的expire是不是已经到了,如果还没有就不处理,等下一个时钟中断再检查;如果已经到了,就执行该节点的回调函数,同时删除该节点;这个过程是在hrtimer_interrupt中执行的,该函数代码如下:
/*
* High resolution timer interrupt
* Called with interrupts disabled
*/
void hrtimer_interrupt(struct clock_event_device *dev)
{
struct hrtimer_cpu_base *cpu_base = this_cpu_ptr(&hrtimer_bases);
ktime_t expires_next, now, entry_time, delta;
int retries = 0;
BUG_ON(!cpu_base->hres_active);
cpu_base->nr_events++;
dev->next_event.tv64 = KTIME_MAX;
raw_spin_lock(&cpu_base->lock);
entry_time = now = hrtimer_update_base(cpu_base);
retry:
cpu_base->in_hrtirq = 1;
/*
* We set expires_next to KTIME_MAX here with cpu_base->lock
* held to prevent that a timer is enqueued in our queue via
* the migration code. This does not affect enqueueing of
* timers which run their callback and need to be requeued on
* this CPU.
*/
cpu_base->expires_next.tv64 = KTIME_MAX;
/*查看红黑树的最下节点,如果到期就执行回调函数,并删除该节点*/
__hrtimer_run_queues(cpu_base, now);
/* Reevaluate the clock bases for the next expiry
找到下一个到期的定时器
*/
expires_next = __hrtimer_get_next_event(cpu_base);
/*
* Store the new expiry value so the migration code can verify
* against it.
*/
cpu_base->expires_next = expires_next;
cpu_base->in_hrtirq = 0;
raw_spin_unlock(&cpu_base->lock);
/* Reprogramming necessary ? */
if (!tick_program_event(expires_next, 0)) {
cpu_base->hang_detected = 0;
return;
}
/*
* The next timer was already expired due to:
* - tracing
* - long lasting callbacks
* - being scheduled away when running in a VM
*
* We need to prevent that we loop forever in the hrtimer
* interrupt routine. We give it 3 attempts to avoid
* overreacting on some spurious event.
*
* Acquire base lock for updating the offsets and retrieving
* the current time.
*/
raw_spin_lock(&cpu_base->lock);
now = hrtimer_update_base(cpu_base);
cpu_base->nr_retries++;
if (++retries < 3)
goto retry;
/*
* Give the system a chance to do something else than looping
* here. We stored the entry time, so we know exactly how long
* we spent here. We schedule the next event this amount of
* time away.
*/
cpu_base->nr_hangs++;
cpu_base->hang_detected = 1;
raw_spin_unlock(&cpu_base->lock);
delta = ktime_sub(now, entry_time);
if ((unsigned int)delta.tv64 > cpu_base->max_hang_time)
cpu_base->max_hang_time = (unsigned int) delta.tv64;
/*
* Limit it to a sensible value as we enforce a longer
* delay. Give the CPU at least 100ms to catch up.
*/
if (delta.tv64 > 100 * NSEC_PER_MSEC)
expires_next = ktime_add_ns(now, 100 * NSEC_PER_MSEC);
else
expires_next = ktime_add(now, delta);
tick_program_event(expires_next, 1);
printk_once(KERN_WARNING "hrtimer: interrupt took %llu ns\n",
ktime_to_ns(delta));
}
最重要的就是__hrtimer_run_queues以及继续调用的__run_hrtimer函数了,代码如下,重要部分加了中文注释:
/*
* The write_seqcount_barrier()s in __run_hrtimer() split the thing into 3
* distinct sections:
*
* - queued: the timer is queued
* - callback: the timer is being ran
* - post: the timer is inactive or (re)queued
*
* On the read side we ensure we observe timer->state and cpu_base->running
* from the same section, if anything changed while we looked at it, we retry.
* This includes timer->base changing because sequence numbers alone are
* insufficient for that.
*
* The sequence numbers are required because otherwise we could still observe
* a false negative if the read side got smeared over multiple consequtive
* __run_hrtimer() invocations.
*/
static void __run_hrtimer(struct hrtimer_cpu_base *cpu_base,
struct hrtimer_clock_base *base,
struct hrtimer *timer, ktime_t *now)
{
enum hrtimer_restart (*fn)(struct hrtimer *);
int restart;
lockdep_assert_held(&cpu_base->lock);
debug_deactivate(timer);
cpu_base->running = timer;
/*
* Separate the ->running assignment from the ->state assignment.
*
* As with a regular write barrier, this ensures the read side in
* hrtimer_active() cannot observe cpu_base->running == NULL &&
* timer->state == INACTIVE.
*/
raw_write_seqcount_barrier(&cpu_base->seq);
/*调用timerqueue_del从红黑树删除节点*/
__remove_hrtimer(timer, base, HRTIMER_STATE_INACTIVE, 0);
timer_stats_account_hrtimer(timer);
/*最重要的回调函数*/
fn = timer->function;
/*
* Clear the 'is relative' flag for the TIME_LOW_RES case. If the
* timer is restarted with a period then it becomes an absolute
* timer. If its not restarted it does not matter.
*/
if (IS_ENABLED(CONFIG_TIME_LOW_RES))
timer->is_rel = false;
/*
* Because we run timers from hardirq context, there is no chance
* they get migrated to another cpu, therefore its safe to unlock
* the timer base.
定时器是被硬件层面的时钟中断触发的,所以这个回调函数肯定是当前cpu执行的
*/
raw_spin_unlock(&cpu_base->lock);
trace_hrtimer_expire_entry(timer, now);
//终于执行了回调函数
restart = fn(timer);
trace_hrtimer_expire_exit(timer);
raw_spin_lock(&cpu_base->lock);
/*
* Note: We clear the running state after enqueue_hrtimer and
* we do not reprogram the event hardware. Happens either in
* hrtimer_start_range_ns() or in hrtimer_interrupt()
*
* Note: Because we dropped the cpu_base->lock above,
* hrtimer_start_range_ns() can have popped in and enqueued the timer
* for us already.
*/
if (restart != HRTIMER_NORESTART &&
!(timer->state & HRTIMER_STATE_ENQUEUED))
enqueue_hrtimer(timer, base);//调用timerqueue_add把timer加入红黑树
/*
* Separate the ->running assignment from the ->state assignment.
*
* As with a regular write barrier, this ensures the read side in
* hrtimer_active() cannot observe cpu_base->running == NULL &&
* timer->state == INACTIVE.
*/
raw_write_seqcount_barrier(&cpu_base->seq);
WARN_ON_ONCE(cpu_base->running != timer);
cpu_base->running = NULL;
}
static void __hrtimer_run_queues(struct hrtimer_cpu_base *cpu_base, ktime_t now)
{
struct hrtimer_clock_base *base = cpu_base->clock_base;
unsigned int active = cpu_base->active_bases;
/*遍历各个时间基准系统,查询每个hrtimer_clock_base对应红黑树的左下节点,
判断它的时间是否到期,如果到期,通过__run_hrtimer函数,对到期定时器进行处理,
包括:调用定时器的回调函数、从红黑树中移除该定时器、
根据回调函数的返回值决定是否重新启动该定时器*/
for (; active; base++, active >>= 1) {
struct timerqueue_node *node;
ktime_t basenow;
if (!(active & 0x01))
continue;
basenow = ktime_add(now, base->offset);
/*
返回红黑树中的左下节点,之所以可以在while循环中使用该函数,
是因为__run_hrtimer会在移除旧的左下节点时,
新的左下节点会被更新到base->active->next字段中,
使得循环可以继续执行,直到没有新的到期定时器为止
*/
while ((node = timerqueue_getnext(&base->active))) {
struct hrtimer *timer;
timer = container_of(node, struct hrtimer, node);
/*
* The immediate goal for using the softexpires is
* minimizing wakeups, not running timers at the
* earliest interrupt after their soft expiration.
* This allows us to avoid using a Priority Search
* Tree, which can answer a stabbing querry for
* overlapping intervals and instead use the simple
* BST we already have.
* We don't add extra wakeups by delaying timers that
* are right-of a not yet expired timer, because that
* timer will have to trigger a wakeup anyway.
*/
if (basenow.tv64 < hrtimer_get_softexpires_tv64(timer))
break;
__run_hrtimer(cpu_base, base, timer, &basenow);
}
}
}
lib\timerqueue.c文件中比较重要的3个工具函数:都是红黑树常规的操作!
/**
* timerqueue_add - Adds timer to timerqueue.
*
* @head: head of timerqueue
* @node: timer node to be added
*
* Adds the timer node to the timerqueue, sorted by the
* node's expires value.
*/
bool timerqueue_add(struct timerqueue_head *head, struct timerqueue_node *node)
{
struct rb_node **p = &head->head.rb_node;
struct rb_node *parent = NULL;
struct timerqueue_node *ptr;
/* Make sure we don't add nodes that are already added */
WARN_ON_ONCE(!RB_EMPTY_NODE(&node->node));
while (*p) {
parent = *p;
ptr = rb_entry(parent, struct timerqueue_node, node);
if (node->expires.tv64 < ptr->expires.tv64)
p = &(*p)->rb_left;
else
p = &(*p)->rb_right;
}
rb_link_node(&node->node, parent, p);
rb_insert_color(&node->node, &head->head);
if (!head->next || node->expires.tv64 < head->next->expires.tv64) {
head->next = node;
return true;
}
return false;
}
EXPORT_SYMBOL_GPL(timerqueue_add);
/**
* timerqueue_del - Removes a timer from the timerqueue.
*
* @head: head of timerqueue
* @node: timer node to be removed
*
* Removes the timer node from the timerqueue.
*/
bool timerqueue_del(struct timerqueue_head *head, struct timerqueue_node *node)
{
WARN_ON_ONCE(RB_EMPTY_NODE(&node->node));
/* update next pointer */
if (head->next == node) {
struct rb_node *rbn = rb_next(&node->node);
head->next = rbn ?
rb_entry(rbn, struct timerqueue_node, node) : NULL;
}
rb_erase(&node->node, &head->head);
RB_CLEAR_NODE(&node->node);
return head->next != NULL;
}
EXPORT_SYMBOL_GPL(timerqueue_del);
/**
* timerqueue_iterate_next - Returns the timer after the provided timer
*
* @node: Pointer to a timer.
*
* Provides the timer that is after the given node. This is used, when
* necessary, to iterate through the list of timers in a timer list
* without modifying the list.
*/
struct timerqueue_node *timerqueue_iterate_next(struct timerqueue_node *node)
{
struct rb_node *next;
if (!node)
return NULL;
next = rb_next(&node->node);
if (!next)
return NULL;
return container_of(next, struct timerqueue_node, node);
}
EXPORT_SYMBOL_GPL(timerqueue_iterate_next);
整个函数的调用过程:tick_program_event(注册clock_event_device)->hrtimer_inerrupt->__hrtimer_run_queues->__run_hrtimer
(4)定时器中最重要的莫过于执行回调函数了,研发人员设计了这么复杂的流程、结构体,最终的目的不就是为了在正确的时间执行正确的回调函数么? 由于回调函数是异步执行的,它类似于一种“软件中断”,而且是处于非进程的上下文中,所以回调函数有以下3点需要注意:
没有 current 指针、不允许访问用户空间。因为没有进程上下文,相关代码和被中断的进程没有任何联系。
不能执行休眠(或可能引起休眠的函数)和调度。
任何被访问的数据结构都应该针对并发访问进行保护,以防止竞争条件。
3、为什么hrtimer比timer_list精度高?
(1)低分辨率定时器的计时单位基于jiffies值的计数,也就是说,它的精度只有1/HZ。假如内核配置的HZ是1000,那意味着系统中的低分辨率定时器的精度就是1ms; 那么问题来了,为了提高精度,为啥不把HZ值设置地更大了?比如10000、1000000等?提高时钟中断频率也会产生副作用,中断频率越高,系统的负担就增加了,处理器需要花时间来执行中断处理程序,中断处理器占用cpu时间越多。这样处理器执行其他工作的时间及越少,并且还会打乱处理器高速缓存(进程切换导致地)。所以选择时钟中断频率时要考虑多方面,要取得各方面的折中的一个合适频率。
(2)大部分时间里,time wheel可以实现O(1)时间复杂度。但是当有进位发生时,不可预测的O(N)定时器级联迁移时间大大地影响了定时器的精度;刚好红黑树的增删改查时间复杂度可以控制在O(lgN),再加上硬件的计数进步,所以可以比较好地把精度控制在纳秒级别!
参考:
1、https://blog.csdn.net/droidphone/article/details/8051405 低分辨率定时器的原理
2、https://blog.csdn.net/hongzg1982/article/details/54881361 高精度定时器hrtimer的原理
3、https://cloud.tencent.com/developer/article/1603333?from=15425 内核低分辨率定时器的实现
4、https://zhuanlan.zhihu.com/p/83078387 高精度定时器原理简介
离线
页次: 1