0%

libevent笔记--定时事件分析

笔记重点:

  • 定时事件的处理依赖两个结构,一个是小根堆,一个是common_timeout的结构。
  • 小根堆根据超时时间进行排序,最近超时的为小根堆的根节点。
  • common_timeout_list结构是为了处理超时时间相同的一系列事件的集合,以防止小根堆随超时事件的增多而过多增加的情况,对common-timeout的理解是依据《管理超时event》这篇博客进行分析的。

一. common_timeout_list结构的理解

1. 使用common_timeout_list结构的理由

所以要在小根堆之外再次使用common-timeout的结构,是因为超时事件在小根堆中,是按照超时时间的先后次序进行一个排序,对于超时时间相同的情况,插入到小根堆只会让堆变大,而失去了堆排序的基本作用,所以这个时候,需要将相同的超时时间集合一起来,然后在放入到小根堆中的一个节点中。

2. 一个时间:

这个时间是在使用event_add函数时的第二个参数,common_timeout系统使用时时间做了特殊的处理,这里的struct timeval表示时间在windows下是64位,在linux下是32位,在common-timeout中,使用了32位中的21~32位的12位用来保存common-timeout中需要的额外信息,即12位中高4位全为1表示common-timeout时间,12位中的低8位表示在common-timeout结构中超时事件集合中本事件所在的索引位置

3. 一个结构:
1
2
3
4
5
6
7
8
9
10
struct common_timeout_list {
//相同的超时事件列表
struct event_list events;
//超时时间
struct timeval duration;
//放在小根堆上的超时事件
struct event timeout_event;
//所属的调度器
struct event_base *base;
};

这个结构就是在event_base中的表示一系列超时时间的集合。

4. 一个函数:
1
2
3
4
5
const struct timeval *
event_base_init_common_timeout(struct event_base *base,
const struct timeval *duration){
……
}

这个函数的作用就是使用一个时间,在调度器中创建common_timeout_list结构,并创建common_timeout_list结构中的timeout_event事件,并将给的时间的高32~21位初始化,就是说初始化上面的一个结构和一个时间。

5. 添加超时时间
  • eventadd====>event_add_nolock====>event_queue_insert_timeout====>insert_common_timeout_inorder
  • insert_common_timeout_inorder函数是从列表的最后开始搜索超时时间点小于当前事件时间点的节点,什么情况下有可能导致有大于当前事件时间点的节点?
  • eventadd_nolock中如果当前事件是事件队列的第一个事件,那么需要调用common_timeout_schedule函数,将common_timeout_list结构中的timeout_event事件放入小根堆中进行调度
6. 超时事件激活

common_timeout_callback函数
该函数是通过小根堆上的超时处理执行的回调,判断事件列表的第一个节点是否超时,如果超时则处理激活这个事件(就是放入激活队列)
删除这个事件,当队列中还有事件时,重新将common_timeout_list结构中的timeout_event事件放入到小根堆中监听。
注:小根堆中事件超时时是否会删除这个事件,所以common_timeout事件回调后还要重新加入到小根堆?

结论:
以上之所以我开始讲common_timeout_list结构,是因为当大量重复的的定时事件时,需要使用common_timeout_list将相同的事件集合起来,因为它们的超时时间是相同的,并且创建一个内部超时事件放到小根堆上,用来监听这个集合的超时,这样就不用把所有的事件加入到小根堆中,而用一个事件代替,这就是设计思路。

二. 定时器增删减

1.小根堆

要理解小根堆的原理,可以参考这篇笔记,这里要讲的是利用小根堆的特性,根节点的事件是最接近now的时间,也就是最先超时的。

2.添加定时事件到调度器

eventadd ====> event_add_nolock ====> eventqueue_insert_timeout
【event.c/3363】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static void
event_queue_insert_timeout(struct event_base *base, struct event *ev)
{
……
//如果是common_timeout_list
if (is_common_timeout(&ev->ev_timeout, base)) {
struct common_timeout_list *ctl =
get_common_timeout_list(base, &ev->ev_timeout);
//插入到common_timeout_list集合中
insert_common_timeout_inorder(ctl, ev);
} else {
//插入到小根堆中
min_heap_push_(&base->timeheap, ev);
}
}

这里需要注意的地方就是event_add_nolock
函数中,将定时的时间计算成绝对时间,也就是超时的时间点,然后在将超时事件加入到小根堆中。

3.定时事件的触发

定时事件的触发,是利用调度器的超时等待功能来实现的,首先计算小根堆中最先超时的事件需要等待的超时时间,然后传入到调度器的调度接口等待超时。接口超时完成以后,将超时的事件加入到激活队列中,等待执行。
【event.c】1878行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
int event_base_loop(struct event_base *base, int flags)
{
……
while (!done) {
……
tv_p = &tv;
if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
//计算最近超时事件所需的时间
timeout_next(base, &tv_p);
} else {
evutil_timerclear(&tv);
}
……
clear_time_cache(base);
//调度等待超时
res = evsel->dispatch(base, tv_p);
……
//将已经超时的事件加入到激活队列中
timeout_process(base);
if (N_ACTIVE_CALLBACKS(base)) {
//激活队列事件执行
int n = event_process_active(base);
if ((flags & EVLOOP_ONCE)
&& N_ACTIVE_CALLBACKS(base) == 0
&& n != 0)
done = 1;
} else if (flags & EVLOOP_NONBLOCK)
done = 1;
}
……
}

4.超时回调执行

event_process_active ====> event_process_active_signal_queue ====>event_persist_closure

  • 这里在阅读代码的时候要注意,timeout_process函数中,将超时事件从监听队列中删除了,然后将超时事件添加到激活队列。而在event_process_active_signal_queue函数中,对待定时回调事件(持久的超时事件),调用event_persist_closure函数在将超时事件加回到监听队列中。
  • 定时事件的处理中,还应该注意对于超时事件的处理,设定一定的超时时间,而在超时时间到来之前,事件已经得到处理,那么下次再处理的超时时间点是现在时间+超时时间,如果回调的原因是已经超时的时候,是超时时间点+超时时间,代码中并没有简单的用现在时间+超时时间进行处理。(套接字的读写超时时的处理)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    static inline void
    event_persist_closure(struct event_base *base, struct event *ev)
    {
    ……
    if (ev->ev_io_timeout.tv_sec || ev->ev_io_timeout.tv_usec) {
    ……
    gettime(base, &now);
    if (is_common_timeout(&ev->ev_timeout, base)) {
    //在event_add的第二个参数的超时值
    delay = ev->ev_io_timeout;
    usec_mask = delay.tv_usec & ~MICROSECONDS_MASK;
    delay.tv_usec &= MICROSECONDS_MASK;
    if (ev->ev_res & EV_TIMEOUT) {
    relative_to = ev->ev_timeout;
    relative_to.tv_usec &= MICROSECONDS_MASK;
    } else {
    relative_to = now;
    }
    } else {
    //在event_add的第二个参数的超时值
    delay = ev->ev_io_timeout;
    if (ev->ev_res & EV_TIMEOUT) {
    //超时时用超时时间点
    relative_to = ev->ev_timeout;
    } else {
    //用当前时间
    relative_to = now;
    }
    }
    evutil_timeradd(&relative_to, &delay, &run_at);
    ……
    //使用绝对时间将持久时间加入到监听队列
    event_add_nolock_(ev, &run_at, 1);
    }
    ……
    }
    5.删除超时事件(2018-01-12)
    调用的路径如下:
    event_base_loop
    ====>timeout_process
             ====>event_del_nolock_
                  ====>event_queue_remove_timeout
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    static void
    event_queue_remove_timeout(struct event_base *base, struct event *ev)
    {
    EVENT_BASE_ASSERT_LOCKED(base);
    if (EVUTIL_FAILURE_CHECK(!(ev->ev_flags & EVLIST_TIMEOUT))) {
    event_errx(1, "%s: %p(fd "EV_SOCK_FMT") not on queue %x", __func__,
    ev, EV_SOCK_ARG(ev->ev_fd), EVLIST_TIMEOUT);
    return;
    }
    DECR_EVENT_COUNT(base, ev->ev_flags);
    ev->ev_flags &= ~EVLIST_TIMEOUT;

    if (is_common_timeout(&ev->ev_timeout, base)) {
    struct common_timeout_list *ctl = get_common_timeout_list(base, &ev->ev_timeout);
    TAILQ_REMOVE(&ctl->events, ev, ev_timeout_pos.ev_next_with_common_timeout);
    } else {
    //在这里删掉小根堆上
    min_heap_erase_(&base->timeheap, ev);
    }
    }