0%

游戏开发-epoll笔记

网上有视频和一些博客虽然对epoll的原理进行了讲解,但仍然有些不详细的地方,重新梳理epoll的实现,并整理笔记记录一下;参考的文章很多粗制滥造,所以现在看文章都非常注意了,介绍原理的文章中讲到了一个核心点,就是有poll,select,为什么还要有epoll,答案是提高应用程序的效率,此文章首先讲到了select的实现方式,归根结底将轮询socket的状态,通过增加另外的数据结构来优化这部分的阻塞。因此是增强版的poll和select。

参考文章:

1. epoll实现原理概述

从内核角度看,到达网卡的数据如何以优雅的姿态,通知应用层的程序,实现的方式有很多。epoll的实现就是通过一个中间层,将socket套接字和进程相关联,通过列表索引到相关的进程,通过红黑树索引相关的socket,以方便阻塞进程,将cpu出让给其他进程,当有数据要处理时又容易唤醒相关的进程,以方便处理数据。而这里唤醒进程的方式又导致epoll存在两种模式,一种是水平触发(LT),一种是边缘触发(ET);这两种模式差异是水平触发在于socket的缓冲区还有未处理的数据时放回到就绪队列中,重新出发处理事件。epoll的实现重要的三个数据结构分别是,epitem,这个是实际存放socket套接字,进程信息等的节点;eventpoll,这个是epoll的处理接口;rb_root_cached,这个是保存epitem的红黑树。

2. epoll重要的数据结构

2.1 epitem结构说明

接下来通过分析linux内核中核心的数据结构和算法,简单描述epoll的实现。
epitem结构用来关联socket以及socket关心的事件,所相关的进程信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 列出几个重要的字段
struct epitem {
// ......
// 指向socket套接字(套接字也可以理解为就是一个文件描述符)
struct epoll_filefd ffd;
// ......

// 包含所有的等待列表
struct eppoll_entry *pwqlist;

// 所属的eventpoll结构
// 这里是每一个epoll程序都有eventpoll结构题
struct eventpoll *ep;

// 所属的文件集合(列表)
struct hlist_node fllink;

// 关联的进程-唤醒的就是这个进程(这里是进程的引用)
struct wakeup_source __rcu *ws;

// 这一个socket当前感兴趣的事件
struct epoll_event event;
};
2.2 eventpoll数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
struct eventpoll {
// 当前数据结构的互斥锁
struct mutex mtx;

/* Wait queue used by sys_epoll_wait() */
// epoll_wait调用查询的队列
wait_queue_head_t wq;

/* Wait queue used by file->poll() */
wait_queue_head_t poll_wait;

// 这就是就绪队列
struct list_head rdllist;

// rdllist和ovflist的锁
rwlock_t lock;

// 红黑树-主要作用是通过socket索引到epitem
struct rb_root_cached rbr;

// epitem链表(包含当前eventpoll的所有epitem)
struct epitem *ovflist;
// ......
};

3. epoll的核心算法

3.1 创建eventpoll(epoll_create系统调用)

这里最主要的是创建并返回了eventpoll的句柄。这个eventpoll结构体主要是管理epitem的。以下函数创建一个eventpoll结构并关联到一个文件句柄上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
static int do_epoll_create(int flags)
{
int error, fd;
struct eventpoll *ep = NULL;
struct file *file;

/* Check the EPOLL_* constant for consistency. */
BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);

if (flags & ~EPOLL_CLOEXEC)
return -EINVAL;
// 创建eventpoll结构体
error = ep_alloc(&ep);
if (error < 0)
return error;
// 创建一个文件描述符,这个描述符在下面要与file和eventpoll信息相关联
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
if (fd < 0) {
error = fd;
goto out_free_ep;
}
// 文件句柄和eventpoll相关联
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
if (IS_ERR(file)) {
error = PTR_ERR(file);
goto out_free_fd;
}
// 成功的话返回句柄
// 这里eventpoll管理file
ep->file = file;
// file安装到文件句柄上
fd_install(fd, file);
return fd;

out_free_fd:
put_unused_fd(fd);
out_free_ep:
ep_clear_and_put(ep);
return error;
}

// 系统调用epoll_create-实际调用上面的函数
SYSCALL_DEFINE1(epoll_create, int, size)
{
if (size <= 0)
return -EINVAL;

return do_epoll_create(0);
}

3.2 epoll_ctl系统调用

这个系统调用主要是针对socket(fd)增加不同的监听事件,然后将这个fd增加到红黑树里,以便能够快速查询到。
以下的代码通过eventpoll结构体,mutex来锁住红黑树,对红黑树进行操作。
主要是三种操作EPOLL_CTL_ADD,EPOLL_CTL_DEL,EPOLL_CTL_MOD。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
int do_epoll_ctl(int epfd, int op, int fd, struct epoll_event *epds,
bool nonblock)
{
int error;
int full_check = 0;
struct fd f, tf;
struct eventpoll *ep;
struct epitem *epi;
struct eventpoll *tep = NULL;

error = -EBADF;
f = fdget(epfd);
if (!f.file)
goto error_return;

/* Get the "struct file *" for the target file */
tf = fdget(fd);
if (!tf.file)
goto error_fput;

// ......
// 通过epfd句柄找到file里的私有数据就是eventpoll结构体
ep = f.file->private_data;
// ......
// 锁住红黑树结构体
error = epoll_mutex_lock(&ep->mtx, 0, nonblock);
// ......

// 尝试在红黑树中找到这个socket代表的epitem
epi = ep_find(ep, tf.file, fd);

error = -EINVAL;
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
// 默认增加(EPOLLERR | EPOLLHUP)事件
epds->events |= EPOLLERR | EPOLLHUP;
// ep_insert主要是向红黑树中插入节点
error = ep_insert(ep, epds, tf.file, fd, full_check);
} else
error = -EEXIST;
break;
case EPOLL_CTL_DEL:
if (epi) {
// 红黑树中删除节点
ep_remove_safe(ep, epi);
error = 0;
} else {
error = -ENOENT;
}
break;
case EPOLL_CTL_MOD:
if (epi) {
// 编辑节点
if (!(epi->event.events & EPOLLEXCLUSIVE)) {
epds->events |= EPOLLERR | EPOLLHUP;
error = ep_modify(ep, epi, epds);
}
} else
error = -ENOENT;
break;
}
// 解锁
mutex_unlock(&ep->mtx);

error_tgt_fput:
if (full_check) {
clear_tfile_check_list();
loop_check_gen++;
mutex_unlock(&epnested_mutex);
}

fdput(tf);
error_fput:
fdput(f);
error_return:

return error;
}

// 系统调用epoll_ctl-调用上面的函数
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
struct epoll_event epds;

if (ep_op_has_event(op) &&
copy_from_user(&epds, event, sizeof(struct epoll_event)))
return -EFAULT;

return do_epoll_ctl(epfd, op, fd, &epds, false);
}

3.3 epoll_wait系统调用

主要调用了ep_poll来查询就绪队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// 列出这个函数主要是向说明epoll_wait调用主要是调用了ep_poll函数(接下来会列出来)
static int do_epoll_wait(int epfd, struct epoll_event __user *events,
int maxevents, struct timespec64 *to)
{
int error;
struct fd f;
struct eventpoll *ep;
// ......
ep = f.file->private_data;

/* Time to fish for events ... */
error = ep_poll(ep, events, maxevents, to);

error_fput:
fdput(f);
return error;
}

// epoll_wait系统调用-调用了上面的函数
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
{
struct timespec64 to;

return do_epoll_wait(epfd, events, maxevents,
ep_timeout_to_timespec(&to, timeout));
}

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, struct timespec64 *timeout)
{
int res, eavail, timed_out = 0;
u64 slack = 0;
wait_queue_entry_t wait;
ktime_t expires, *to = NULL;

lockdep_assert_irqs_enabled();
// ......

// ep_events_available查询eventpoll结构体rdllist上是否有有效的事件
eavail = ep_events_available(ep);

while (1) {
if (eavail) {
// 如果有效的事件则返回给用户层
// 否则就等待超时
// 这个函数的实现有点长,实际通过函数名可以看出就是找出所有的事件给用户。
res = ep_send_events(ep, events, maxevents);
if (res)
return res;
}

if (timed_out)
return 0;

// ......
}
}

// 这个函数主要是查询ep结构的rdllist列表中是否有就绪事件
static inline int ep_events_available(struct eventpoll *ep)
{
return !list_empty_careful(&ep->rdllist) ||
READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR;
}

结语: epoll的两个关键点未在文中详细描述,一个是中断程序是如何更改就绪队列的,另外一个就是有事件在就绪队列中用户层获得的事件(从ep_send_events获得的实现细节)。第一个问题就是do_epoll_ctl函数的实现细节,将感兴趣的套接字的事件插入到红黑树中,这个是ep_insert函数中的细节,另外是唤醒操作,就是将就绪的节点插入到就绪队列中,这样do_epoll_wait就能在阻塞后能够要么超时返回,要么立即就能返回已有的套接字的事件信息,为后续操作提供基础。