最近,异想天开,想用D实现一个web服务器(似乎已经想这件事好久了,只不过之前是C++),自然而然得开始研究epoll。早就听说过epoll的大名,只不过网上的教程似乎没多少,并且感觉也没怎么把用法给讲完整。好在,通过几天的学习,也算是有所积累,因此想通过这篇post记录下,尽量把细节给讲清楚,希望它对各位有所价值。
Linux平台也有异步IO,比较通用的是POSIX AIO,只不过这货就是新开个线程来处理IO罢了,比较适合用来处理相对耗时的磁盘IO。
同属IO复用,除了epoll,我们也能选择select和poll,之间的性能比较需要视场景而定,通常对于Web服务这种场景,epoll会更加适合,若想深究,请大家阅读一下源码,自然明了,也就2~3KLoC。
Changelog
- [2017-07-05] 一些格式的调整以及错误修正
- [2015-01-24] 添加“epoll的使用模式”一节
- [2015-08-13] 补充EPOLLHUP事件细节
0x00 epoll函数接口
创建epoll实例
int epoll_create1(int flags);
函数参数:
- flags: 当前版本只支持EPOLL_CLOEXEC标志(请注意不支持EPOLL_NONBLOCK标志)
其实我们也能够通过epoll_create(int size)这个函数来创建epoll实例,只不过这个函数中的size在2.6.27内核开始就不必要了,原因请看如下代码片段:
SYSCALL_DEFINE1(epoll_create, int, size) {
if (size <= 0)
return -EINVAL;
return sys_epoll_create1(0);
}
根据惯例,如果返回-1,则标志出现了问题,我们可以读取errno来定位错误,有如下errno会被设置:
- EINVAL : 无效的标志
- EMFILE : 用户打开的文件超过了限制
- ENFILE : 系统打开的文件超过了限制
- ENOMEM : 没有足够的内存完成当前操作
管理epoll事件
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
函数参数:
- epfd : epoll实例的fd
- op : 操作标志,下文会描述
- fd : 监控对象的fd
- event : 事件的内容,下文描述
op可以有3个值,分别为:
- EPOLL_CTL_ADD : 添加监听的事件
- EPOLL_CTL_DEL : 删除监听的事件
- EPOLL_CTL_MOD : 修改监听的事件
event是一个如下结构体的一个实例:
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
其中,data是一个联合体,能够存储fd或其它数据,我们需要根据自己的需求定制。events表示监控的事件的集合,是一个状态值,通过状态位来表示,可以设置如下事件:
- EPOLLERR : 文件上发上了一个错误。这个事件是一直监控的,即使没有明确指定
- EPOLLHUP : 文件被挂断。这个事件是一直监控的,即使没有明确指定
- EPOLLRDHUP : 对端关闭连接或者shutdown写入半连接
- EPOLLET : 开启边缘触发,默认的是水平触发,所以我们并未看到EPOLLLT
- EPOLLONESHOT : 一个事件发生并读取后,文件自动不再监控
- EPOLLIN : 文件可读
- EPOLLPRI : 文件有紧急数据可读
- EPOLLOUT : 文件可写
- EPOLLWAKEUP : 如果EPOLLONESHOT和EPOLLET清除了,并且进程拥有CAP_BLOCK_SUSPEND权限,那么这个标志能够保证事件在挂起或者处理的时候,系统不会挂起或休眠
注意一下,EPOLLHUP并不代表对端结束了连接,这一点需要和EPOLLRDHUP区分。通常情况下EPOLLHUP表示的是本端挂断,造成这种事件出现的原因有很多,其中一种便是出现错误,更加细致的应该是和RST联系在一起,不过目前相关文档并不是很全面,本文会进一步跟进。
根据惯例,如果返回-1,则标志出现了问题,我们可以读取errno来定位错误,有如下errno会被设置:
- EBADF : epfd或者fd不是一个有效的文件描述符
- EEXIST : op为EPOLL_CTL_ADD,但fd已经被监控
- EINVAL : epfd是无效的epoll文件描述符
- ENOENT : op为EPOLL_CTL_MOD或者EPOLL_CTL_DEL,并且fd未被监控
- ENOMEM : 没有足够的内存完成当前操作
- ENOSPC : epoll实例超过了/proc/sys/fs/epoll/max_user_watches中限制的监听数量
等待epoll事件
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
函数参数:
- epfd : epoll实例的fd
- events : 储存事件的数组首地址
- maxevents : 最大事件的数量
- timeout : 等待的最长时间
函数返回就绪事件的数量,如果返回-1,则标志出现了问题,我们可以读取errno来定位错误,有如下errno会被设置:
- EBADF : epfd不是一个有效的文件描述符
- EFAULT : events指向的内存无权访问
- EINTR : 在请求事件发生或者过期之前,调用被信号打断
- EINVAL : epfd是无效的epoll文件描述符
0x01 关于水平触发和边缘触发
用英文来表示,水平触发为Level Trigger,边缘触发为Edge Trigger,不过很多文章也将LT翻译为条件触发,有点搞不清为何这么翻译。
那么为什么在这里突兀得提及ET和LT呢?是这样的,想必各位应该已经注意到EPOLLET了,这个就代表ET事件,而epoll默认采取的是LT,也就是说在能够正确使用epoll之前,我们必须弄明白ET和LT,尤其是准备直接使用nonblocking和ET的朋友。
LT和ET原本应该是用于脉冲信号的,可能用它来解释更加形象。Level和Edge指的就是触发点,Level为只要处于水平,那么就一直触发,而Edge则为上升沿和下降沿的时候触发。听起来到时挺玄乎的,那么怎么区分这个Level和Edge呢?很简单,0->1这种类型的事件就是Edge,而Level则正好相反,1->1这种类型就是,由此可见,当缓冲区有数据可取的时候,ET会触发一次事件,之后就不会再触发,而LT只要我们没有取完缓冲区的数据,就会一直触发。
为了加深大家的印象,我们用个段子来描述:
- LT 水平触发
- 儿子:“妈妈,我收到了5000元压岁钱。”
- 妈妈:“恩,省着点花!”
- 儿子:“妈妈,我今天买了个ipad,花了3000元。”
- 妈妈:“噢,这东西真贵。”
- 儿子:“妈妈,我今天买好多吃的,还剩1000元。”
- 妈妈:“用完了这些钱,我可不会再给你了。”
- 儿子:“妈妈,那1000元我没花,零花钱够用了。”
- 妈妈:“恩,这才是明智的做法!”
- 儿子:“妈妈,那1000元我没花,我要攒起来。”
- 妈妈:“恩,加油!”
是不是没完没了?只要儿子手中还有钱,他就会一直汇报,这就是LT模式。有钱就是1,没钱就是0,那么只要儿子还有钱,这种事件就是1->1类型事件,自然是LT。
- ET 边缘触发
- 儿子:“妈妈,我收到了5000元压岁钱。”
- 妈妈:“恩,省着点花!”
- 儿子:“……”
- 妈妈:“你倒是说话啊?压岁钱呢?!”
这个就是ET模式,简洁得有点过头,但很高效!虽然妈妈可能并不这么认为。。。儿子从没钱到有钱,是一个0->1的过程,因此为ET。儿子和妈妈说过自己拿到了压岁钱就完事了,至于怎么花钱,还剩多少钱,一概不说,有钱就是这么任性!
我们将上述的儿子换做缓冲区,而钱换成数据,那么就是epoll中的ET和LT了,所以说编程也是源自生活的。还有一点需要强调ET模式只能应用于设置了O_NONBLOCK的fd,而LT则同时支持同步和异步。使用得当ET效率比LT高,但是LT更加易用,不容易出错。
0x02 epoll的使用模式
解释了这个多,我们应该怎么来用epoll呢?简单的几个函数,用起来可着实不轻松。好在,这里有一个大概的模式供大家参考,如下为伪代码:
epfd = epoll_init1(0);
event.events = EPOLLET | EPOLLIN;
event.data.fd = serverfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, serverfd, &event);
// 主循环
while(true) {
// 这里的timeout很重要,实际使用中灵活调整
count = epoll_wait(epfd, &events, MAXEVENTS, timeout);
for(i = 0; i < count; ++i) {
if(events[i].events & EPOLLERR || events[i].events & EPOLLHUP)
// 处理错误
if(events[i].data.fd == serverfd)
// 为接入的连接注册事件
else if(events[i].events & EPOLLIN)
// 处理可读的缓冲区
read(events[i].data.fd, buf, len);
event.events = EPOLLET | EPOLLOUT;
event.data.fd = events[i].data.fd;
epoll_ctl(epfd, EPOLL_CTL_MOD, events[i].data.fd, &event);
else
// 处理可写的缓冲区
write(events[i].data.fd, buf, len);
// 后续可以关闭fd或者MOD至EPOLLOUT
}
}
使用上述的框架,我们可以完成很多事情,但是内部的细节,比如错误处理,信号处理等,还是不能大意,需要完善。
0x03 epoll实例 —— 啰嗦的echo man
接下来,让我们来看个示例吧。这只是一个hello world级别的代码,无论是你发送什么数据给它,它只会回复“it's echo man”。使用的是ET模式,相信对于大家应该有些许参考价值。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <netdb.h>
#include <sys/epoll.h>
#include <string.h>
#define MAXEVENTS 64
int create_and_bind (int port) {
int sfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if(sfd == -1) {
return -1;
}
struct sockaddr_in sa;
bzero(&sa, sizeof(sa));
sa.sin_family = AF_INET;
sa.sin_port = htons(port);
sa.sin_addr.s_addr = htonl(INADDR_ANY);
if(bind(sfd, (struct sockaddr*)&sa, sizeof(struct sockaddr)) == -1) {
return -1;
}
return sfd;
}
int make_socket_non_blocking (int sfd) {
int flags = fcntl (sfd, F_GETFL, 0);
if (flags == -1) {
return -1;
}
if(fcntl (sfd, F_SETFL, flags | O_NONBLOCK) == -1) {
return -1;
}
return 0;
}
/* 此函数用于读取参数或者错误提示 */
int read_param(int argc, char *argv[]) {
if (argc != 2) {
fprintf (stderr, "Usage: %s [port]\n", argv[0]);
exit (EXIT_FAILURE);
}
return atoi(argv[1]);
}
int main (int argc, char *argv[]) {
int sfd, s;
int efd;
struct epoll_event event;
struct epoll_event *events;
int port = read_param(argc, argv);
/* 创建并绑定socket */
sfd = create_and_bind (port);
if (sfd == -1) {
perror("create_and_bind");
abort ();
}
/* 设置sfd为非阻塞 */
s = make_socket_non_blocking (sfd);
if (s == -1) {
perror("make_socket_non_blocking");
abort ();
}
/* SOMAXCONN 为系统默认的backlog */
s = listen (sfd, SOMAXCONN);
if (s == -1) {
perror ("listen");
abort ();
}
efd = epoll_create1 (0);
if (efd == -1) {
perror ("epoll_create");
abort ();
}
event.data.fd = sfd;
/* 设置ET模式 */
event.events = EPOLLIN | EPOLLET;
s = epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event);
if (s == -1) {
perror ("epoll_ctl");
abort ();
}
/* 创建事件数组并清零 */
events = calloc (MAXEVENTS, sizeof event);
/* 开始事件循环 */
while (1) {
int n, i;
n = epoll_wait (efd, events, MAXEVENTS, -1);
for (i = 0; i < n; i++) {
if (events[i].events & (EPOLLERR | EPOLLHUP)) {
/* 监控到错误或者挂起 */
fprintf (stderr, "epoll error\n");
close (events[i].data.fd);
continue;
}
if(events[i].events & EPOLLIN) {
if (sfd == events[i].data.fd) {
/* 处理新接入的socket */
while (1) {
struct sockaddr_in sa;
socklen_t len = sizeof(sa);
char hbuf[INET_ADDRSTRLEN];
int infd = accept (sfd, (struct sockaddr*)&sa, &len);
if (infd == -1) {
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
/* 资源暂时不可读,再来一遍 */
break;
} else {
perror ("accept");
break;
}
}
inet_ntop(AF_INET, &sa.sin_addr, hbuf, sizeof(hbuf));
printf("Accepted connection on descriptor %d "
"(host=%s, port=%d)\n", infd, hbuf, sa.sin_port);
/* 设置接入的socket为非阻塞 */
s = make_socket_non_blocking (infd);
if (s == -1) abort ();
/* 为新接入的socket注册事件 */
event.data.fd = infd;
event.events = EPOLLIN | EPOLLET;
s = epoll_ctl (efd, EPOLL_CTL_ADD, infd, &event);
if (s == -1) {
perror ("epoll_ctl");
abort ();
}
}
//continue;
} else {
/* 接入的socket有数据可读 */
while (1) {
ssize_t count;
char buf[512];
count = read (events[i].data.fd, buf, sizeof buf);
if (count == -1) {
if (errno != EAGAIN) {
perror ("read");
close(events[i].data.fd);
}
break;
} else if (count == 0) {
/* 数据读取完毕,结束 */
close(events[i].data.fd);
printf ("Closed connection on descriptor %d\n", events[i].data.fd);
break;
}
/* 输出到stdout */
s = write (1, buf, count);
if (s == -1) {
perror ("write");
abort ();
}
event.events = EPOLLOUT | EPOLLET;
epoll_ctl(efd, EPOLL_CTL_MOD, events[i].data.fd, &event);
}
}
} else if((events[i].events & EPOLLOUT) && (events[i].data.fd != sfd)) {
/* 接入的socket有数据可写 */
write(events[i].data.fd, "it's echo man\n", 14);
event.events = EPOLLET | EPOLLIN;
epoll_ctl(efd, EPOLL_CTL_MOD, events[i].data.fd, &event);
}
}
}
free (events);
close (sfd);
return EXIT_SUCCESS;
}
我们可以通过ncat命令和它聊天:
[codesun@lucode ~]$ ncat 127.0.0.1 8000
hello
it's echo man
ncat和echo_man通信的时候其实用的是长连接(除非我们自己CTRL+C)。对于长连接这种东西,需要一定的处理策略。一般而言,我们会采用如下几种策略来处理:
- 心跳或者超时
- 特殊字符,标记数据传输完毕
- 协议中添加length,这个比较常规
0x04 使用建议
本段是本文发布2年后补充的,是本人的一些使用经验。
- 使用epoll一定要加定时器,否则后患无穷
- 如果多个线程观察的fd相同(通常是server socket fd),据说epoll_wait会有惊群问题(accept那个问题早就解决了),但我暂时没有发现
- 联合体data中的那个ptr是很有用的,只不过这就意味着你将该对象的生命周期交给了epoll,不排除会有潜在bug的影响,需要辅以timeout
- 多线程环境下使用epoll,多考虑EPOLLONESHOT
- EPOLLLT也是一个不错的选择,除非你的框架能够确保每次事件触发后,都读/写至EAGAIN
- epoll和kqueue很像,可以通过封装统一二者,虽然后者看似更加强大,其实IOCP也可统一,只不过这样的代价很大
- 使用前请仔细阅读man 7 epoll,勿做傻事
0xFF 总结
Linux的epoll用法看似简单,但是内部包含的细节和坑是难以想象的,还是需要各位去一一“品味”。
若想深入了解,还是建议阅读源码,切勿做管中窥豹之人。
epoll和select/poll本质上是一样的,性能上提升较大,都属于I/O多路复用模型(I/O Multiplexing Model),和堵塞,非堵塞两大I/O模型是并列的。只是对于事件处理函数角度而言,看起来是异步的(实际上是同步的回调而已)。和epoll组合使用大多数是非堵塞I/O方式,这时候就是同步非堵塞,也可以有堵塞I/O的情况。比如多线程server可以在accept上采用堵塞I/O,accept后的socket用epoll管理非堵塞的读写事件。
嘿嘿,谢谢补充!
关于ET和LT的段子很形象,赞! :)
AIO只有windows才有吧,linux没有这个。
很明显是有的
Windows特有的是IOCP,AIO在Linux下有2套实现
对epoll_wait函数的说明:“如果函数返回获得的时间的数量”中的“时间”应该是“事件”吧(因为后面要传时间,有时候还是有点误解)。而且,如果两字也不要写比较好。
确实是我不小心写错了,已修正。
`count` 是 `ssize_t` 类型, 后面用 `count == -1` OK 吗?
在x86_64/amd64平台上,ssize_t可以理解为int64_t,具体细节你看下头文件中的typedef。
你这个例子好像是tcp/ip网络编程里的,是吗?
是的,然后呢?