poll, select, pselect6, ppoll系统调用
select与poll
系统调用号
poll为7,select为23。
函数原型
内核接口
asmlinkage long sys_poll(struct pollfd __user *ufds, unsigned int nfds, int timeout);
asmlinkage long sys_select(int n, fd_set __user *inp, fd_set __user *outp, fd_set __user *exp, struct timeval __user *tvp);
glibc封装
poll
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
select
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
简介
select与poll都是为了实现IO多路复用的功能。
一般来说,对硬盘上的文件的读取都不会阻塞。但是,对管道、套接字、伪终端等文件的读取,是可能产生阻塞的。举个例子来说,如果我们读取stdin:
int fd = STDIN_FILENO; // stdin
char buf[64];
read(fd, buf, 64); // blocks here
process_read_content(buf);
那么,在执行read时,如果我们一直不向终端输入,那么这里会始终阻塞着,程序永远不会执行到之后的process_read_context(buf)。在这种情况下,这种行为是符合逻辑的,因为我们之后的语句是依赖读入的内容buf的。所以除非我们收到了buf的内容,否则就不应该执行之后的指令。
但是,如果有多个文件需要读入,就产生了问题。假设我们有一个nfd个元素的文件描述符数组fds,我们需要对他们读入,并彼此独立地分别处理每个读入的内容。
-
方案一
void process_fds(int *fds, int nfd) { for (int i = 0; i < nfd; i++) { char buf[64]; read(fds[i], buf, 64); process_read_content(buf); } }这个方案能完成我们的需求,但是效率实在是太低了。由于是按顺序依次处理读入的内容,如果
fds[0]始终没有输入,但是fds[1]早就有了输入。我们明明可以先处理fds[1]的输入的,但是由于进程阻塞在了fds[0]的read操作中,我们的时间就这样被白白浪费了。 -
方案二
既然每个文件描述符处理读入是互相独立的,我们就可以创建
nfd个线程,每个线程中处理其读入。这种方案确实可以解决我们方案一中的问题,但是线程的创建、线程之间的切换是非常耗费时间的。
为了更高效地解决这个问题,我们可以增加一个新的操作——判断某个文件描述符是否可以读入。我们可以遍历文件描述符,判断是否有已经可以读入的,如果有的话就直接处理,如果没有的话就再次遍历。这样几乎没有耗费的时间。
我们甚至可以想出更高效的方案,在主线程中查询是否有可以读取的文件描述符,然后把可以读取的文件描述符给别的线程执行。
select就为我们提供了一个类似的解决方案。我们给select传入需要检测IO状态(可以读入、可以写入等)的文件描述符集合,select立即返回,告诉我们哪些文件描述符的IO已经准备就绪。
poll的功能和select类似,但解决了一些select的缺点。具体请见下面的用法一节。
用法
select
select的函数签名为
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
readfds, writefds和exceptfds是文件描述符集合,分别用于:
-
readfds已经准备好供读取的文件描述符集合,即
read操作不会阻塞。 -
writefds已经准备好供写入的文件描述符集合,即
write操作不会阻塞。 -
exceptfds其余条件的文件描述符集合。包括:
- TCP套接字有带外数据
- 处于包模式下的伪终端的主端检测到从端的状态变化
cgroup.events文件被修改
如果对相应的状态变化不感兴趣,在对应的参数中传递NULL即可。
我们可以用以下几个函数操作fd_set类型的变量:
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);
-
FD_ZERO将
set清空 -
FD_SET将
fd放入set中 -
FD_CLR将
fd从set中移除 -
FD_ISSET判断
fd是否处于set中
nfds为readfds, writefds, exceptfds中,数值最大的文件描述符加1。如readfds包含文件描述符4, 6, 7,writefds包含文件描述符5,exceptfds为空,则nfds为8。
timeout为超时参数,其结构为
struct timeval {
time_t tv_sec; /* seconds */
suseconds_t tv_usec; /* microseconds */
};
如果timeout指针为NULL,则select将一直等待,直到有一个文件描述符准备好。如果tv_sec和tv_usec均为0,则select将立即返回。否则,select如果等待达到timeout的时间,还没有任何文件描述符准备好,就返回。
当函数返回之后,会有如下变化:
- 返回值为
readfds,writefds,exceptfds中准备好的文件描述符的总数 readfds,writefds,exceptfds中会只保留已经处于准备好状态的文件描述符。我们可以通过FD_ISSET去查看哪些文件描述符准备好。(正因如此,如果我们在一个循环中使用select,那在每次使用之前,需要复制一遍各集合,或用FD_CLR清空后重新添加)select可能会更新timeout参数。
综上,如果我们要用select,按第三个方案来实现我们的功能,其写法为
void process_fds(int *fds, int nfd) {
fd_set rset;
FD_ZERO(&rset);
int maxfd = -1;
for (int i = 0; i < nfd; i++) {
FD_SET(fds[i], &rset);
if (fds[i] > maxfd) {
maxfd = fds[i];
}
}
while (1) {
fd_set tmp_rset;
memcpy(&tmp_rset, &rset, sizeof(fd_set));
if (select(maxfd + 1, &tmp_rset, NULL, NULL, NULL) <= 0) {
break;
}
for (int i = 0; i < nfd; i++) {
if (FD_ISSET(fds[i], &tmp_rset)) {
FD_CLR(fds[i], &rset);
char buf[64];
read(fds[i], buf, 64);
process_read_content(buf);
}
}
}
}
此外,值得注意的是,glibc的封装要求我们的文件描述符的值不能超过FD_SETSIZE,也就是1024。
poll
poll的函数签名为
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
与select不同的是,它并不是把文件描述符放在fd_set结构体中,而是放在一个struct pollfd类型组成的数组中,nfds为该数组的长度。
struct pollfd的定义为
struct pollfd {
int fd; /* file descriptor */
short events; /* requested events */
short revents; /* returned events */
};
fd是文件描述符,events是用户感兴趣的事件(类似于select中的readfds, writefds和exceptfds),由用户填写;revents是实际发生的事件,由内核填写。
events与revents是位掩码,其可以包含的标志位有
-
POLLIN:存在数据可以读入(相当于select中的readfds) -
POLLPRI:存在其他条件满足(相当于select中的exceptfds) -
POLLOUT:存在数据可以写入(相当于select中的writefds) -
POLLRDHUP流套接字对端关闭连接。
需定义
_GNU_SOURCE宏。 -
POLLERR出错。
只可由
revents包含,不可由events包含 -
POLLHUP挂起。
只可由
revents包含,不可由events包含 -
POLLNVAL由于
fd未打开,请求无效。只可由
revents包含,不可由events包含
当events为0时,revents只可返回POLLERR, POLLHUP和POLLNVAL(将events置为0类似于select中的FD_CLR)。若其不为0,则可以返回events中包含的事件,以及POLLERR, POLLHUP和POLLNVAL。如果返回的revents为0,则表示什么都没发生,可能超时了,或者别的文件描述符中发生了用户感兴趣的事。
timeout参数表示其最多等待时间(以毫秒为单位)。如果其为负,则表示poll无限等待;如果其为0,则表示poll立即返回。
如果在超时范围内,任何一个用户感兴趣的事件发生了,poll将会返回,返回值为产生用户感兴趣事件的文件描述符个数;如果超时了,没有任何一个用户感兴趣的事件发生,则poll将会返回0。
综上,如果我们要用poll,按第三个方案来实现我们的功能,其写法为
void process_fds(int *fds, int nfd) {
struct pollfd *pollfds = (struct pollfd *)malloc(nfd * sizeof(struct pollfd));
for (int i = 0; i < nfd; i++) {
pollfds[i].fd = fds[i];
pollfds[i].events = POLLIN;
}
while (1) {
if (poll(pollfds, nfd, -1) <= 0) {
break;
}
for (int i = 0; i < nfd; i++) {
if (pollfds[i].revents & POLLIN) {
pollfds[i].events = 0;
char buf[64];
read(fds[i], buf, 64);
process_read_content(buf);
}
}
}
}
与select不同的是,其可以包含的文件描述符个数无上限。
根据上述的讨论,poll与select的区别在于
poll文件描述符个数无上限,select文件描述符其值上限为FD_SETSIZEpoll感兴趣的事件种类更多poll不需要在每次调用前都复制一遍fd_set,也就是poll不会改变传入的fds。poll超时参数精度为毫秒,select超时参数精度为微秒,select更精确。
实现
select
首先我们来看看fd_set与和其相关的函数是怎么实现的。在Linux内核的include/linux/types.h中可以看到
typedef __kernel_fd_set fd_set;
而在include/uapi/linux/posix_types.h中可以看到
#define __FD_SETSIZE 1024
typedef struct {
unsigned long fds_bits[__FD_SETSIZE / (8 * sizeof(long))];
} __kernel_fd_set;
所以,这其实就是一个长度为1024字节的位数组。同时我们也明白了,为什么select要求文件描述符的值不能超过FD_SETSIZE。
同时,我们也可以在glibc的源码misc/sys/select.h中看到和其相关的函数的定义
#define FD_SET(fd, fdsetp) __FD_SET (fd, fdsetp)
#define FD_CLR(fd, fdsetp) __FD_CLR (fd, fdsetp)
#define FD_ISSET(fd, fdsetp) __FD_ISSET (fd, fdsetp)
#define FD_ZERO(fdsetp) __FD_ZERO (fdsetp)
其实现则位于bits/select.h:
#define __FD_ZERO(s) \
do { \
unsigned int __i; \
fd_set *__arr = (s); \
for (__i = 0; __i < sizeof (fd_set) / sizeof (__fd_mask); ++__i) \
__FDS_BITS (__arr)[__i] = 0; \
} while (0)
#define __FD_SET(d, s) \
((void) (__FDS_BITS (s)[__FD_ELT(d)] |= __FD_MASK(d)))
#define __FD_CLR(d, s) \
((void) (__FDS_BITS (s)[__FD_ELT(d)] &= ~__FD_MASK(d)))
#define __FD_ISSET(d, s) \
((__FDS_BITS (s)[__FD_ELT (d)] & __FD_MASK (d)) != 0)
简单来说,就是:
FD_ZERO将整个位数组清0(不用memset的原因是,这可能需要在之前声明memset的原型,并且这个数组其实并不大)FD_SET将该文件描述符对应的比特位置1FD_CLR将该文件描述符对应的比特位置0FD_ISSET判断该文件描述符对应的比特位是否为1
接着,我们来看看select内部的实现。其实现均位于Linux内核源码的fs/select.c文件中。
首先,在core_sys_select函数里,使用了一个fd_set_bits的结构体,其定义为:
typedef struct {
unsigned long *in, *out, *ex;
unsigned long *res_in, *res_out, *res_ex;
} fd_set_bits;
一共六个位数组,前三个是存储我们传入的参数的,后三个存储结果,在最终再复制进前三个中。
select的实现,最主要的就是do_select函数,其内容非常长,但也十分重要:
static int do_select(int n, fd_set_bits *fds, struct timespec64 *end_time)
{
ktime_t expire, *to = NULL;
struct poll_wqueues table;
poll_table *wait;
int retval, i, timed_out = 0;
u64 slack = 0;
__poll_t busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;
unsigned long busy_start = 0;
rcu_read_lock();
retval = max_select_fd(n, fds);
rcu_read_unlock();
if (retval < 0)
return retval;
n = retval;
poll_initwait(&table);
wait = &table.pt;
if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
wait->_qproc = NULL;
timed_out = 1;
}
if (end_time && !timed_out)
slack = select_estimate_accuracy(end_time);
retval = 0;
for (;;) {
unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
bool can_busy_loop = false;
inp = fds->in; outp = fds->out; exp = fds->ex;
rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
unsigned long in, out, ex, all_bits, bit = 1, j;
unsigned long res_in = 0, res_out = 0, res_ex = 0;
__poll_t mask;
in = *inp++; out = *outp++; ex = *exp++;
all_bits = in | out | ex;
if (all_bits == 0) {
i += BITS_PER_LONG;
continue;
}
for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) {
struct fd f;
if (i >= n)
break;
if (!(bit & all_bits))
continue;
f = fdget(i);
if (f.file) {
wait_key_set(wait, in, out, bit,
busy_flag);
mask = vfs_poll(f.file, wait);
fdput(f);
if ((mask & POLLIN_SET) && (in & bit)) {
res_in |= bit;
retval++;
wait->_qproc = NULL;
}
if ((mask & POLLOUT_SET) && (out & bit)) {
res_out |= bit;
retval++;
wait->_qproc = NULL;
}
if ((mask & POLLEX_SET) && (ex & bit)) {
res_ex |= bit;
retval++;
wait->_qproc = NULL;
}
/* got something, stop busy polling */
if (retval) {
can_busy_loop = false;
busy_flag = 0;
/*
* only remember a returned
* POLL_BUSY_LOOP if we asked for it
*/
} else if (busy_flag & mask)
can_busy_loop = true;
}
}
if (res_in)
*rinp = res_in;
if (res_out)
*routp = res_out;
if (res_ex)
*rexp = res_ex;
cond_resched();
}
wait->_qproc = NULL;
if (retval || timed_out || signal_pending(current))
break;
if (table.error) {
retval = table.error;
break;
}
/* only if found POLL_BUSY_LOOP sockets && not out of time */
if (can_busy_loop && !need_resched()) {
if (!busy_start) {
busy_start = busy_loop_current_time();
continue;
}
if (!busy_loop_timeout(busy_start))
continue;
}
busy_flag = 0;
/*
* If this is the first loop and we have a timeout
* given, then we convert to ktime_t and set the to
* pointer to the expiry value.
*/
if (end_time && !to) {
expire = timespec64_to_ktime(*end_time);
to = &expire;
}
if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,
to, slack))
timed_out = 1;
}
poll_freewait(&table);
return retval;
}
fd_set_bits类型的fds,其表示输入的字段复制于系统调用的输入,其表示输出的字段在调用前被清空。
其主体部分为两层嵌套的循环。在最外层循环中,每一轮循环处理BITS_PER_LONG,也就是一般来说64个文件描述符。这是因为我们的数据是用long的位数组来存储的,所以这样分批次效率更高。在内循环中,我们遍历这个long整型的每个字节,其每个字节对应一个文件描述符。
也就是说,我们从0开始,一直到我们传入系统调用的参数n,也就是最大的文件描述符的值,遍历每个文件描述符,在在53行看到,通过bit & all_bits,判断当前的文件描述符是否在我们之前传入的in, out或ex集合中。对于存在集合中的,最终调用了vfs_poll来查询单个文件的状态,其实现位于fs/poll.h中:
static inline __poll_t vfs_poll(struct file *file, struct poll_table_struct *pt)
{
if (unlikely(!file->f_op->poll))
return DEFAULT_POLLMASK;
return file->f_op->poll(file, pt);
}
依旧是函数指针模拟多态。
最终,再把res_in, res_out, res_ex复制回原本的in, out, ex即可。
总的来说,select的步骤是,对于输入的参数nfds,把值从0到nfds - 1的所有相关的文件描述符都查询一遍,对于每个文件描述符,调用vfs_poll查询状态。
poll
poll的实现位于fs/select.c,其核心do_poll依然很长,但也十分重要:
static int do_poll(struct poll_list *list, struct poll_wqueues *wait, struct timespec64 *end_time)
{
poll_table* pt = &wait->pt;
ktime_t expire, *to = NULL;
int timed_out = 0, count = 0;
u64 slack = 0;
__poll_t busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;
unsigned long busy_start = 0;
/* Optimise the no-wait case */
if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
pt->_qproc = NULL;
timed_out = 1;
}
if (end_time && !timed_out)
slack = select_estimate_accuracy(end_time);
for (;;) {
struct poll_list *walk;
bool can_busy_loop = false;
for (walk = list; walk != NULL; walk = walk->next) {
struct pollfd * pfd, * pfd_end;
pfd = walk->entries;
pfd_end = pfd + walk->len;
for (; pfd != pfd_end; pfd++) {
/*
* Fish for events. If we found one, record it
* and kill poll_table->_qproc, so we don't
* needlessly register any other waiters after
* this. They'll get immediately deregistered
* when we break out and return.
*/
if (do_pollfd(pfd, pt, &can_busy_loop,
busy_flag)) {
count++;
pt->_qproc = NULL;
/* found something, stop busy polling */
busy_flag = 0;
can_busy_loop = false;
}
}
}
/*
* All waiters have already been registered, so don't provide
* a poll_table->_qproc to them on the next loop iteration.
*/
pt->_qproc = NULL;
if (!count) {
count = wait->error;
if (signal_pending(current))
count = -ERESTARTNOHAND;
}
if (count || timed_out)
break;
/* only if found POLL_BUSY_LOOP sockets && not out of time */
if (can_busy_loop && !need_resched()) {
if (!busy_start) {
busy_start = busy_loop_current_time();
continue;
}
if (!busy_loop_timeout(busy_start))
continue;
}
busy_flag = 0;
/*
* If this is the first loop and we have a timeout
* given, then we convert to ktime_t and set the to
* pointer to the expiry value.
*/
if (end_time && !to) {
expire = timespec64_to_ktime(*end_time);
to = &expire;
}
if (!poll_schedule_timeout(wait, TASK_INTERRUPTIBLE, to, slack))
timed_out = 1;
}
return count;
}
传入的参数list的类型是struct poll_list的指针:
struct poll_list {
struct poll_list *next;
int len;
struct pollfd entries[0];
};
也就是说,这里是一个链表。其entries字段是一个变长数组。我们传入do_poll的list参数是将传入系统调用的fds,也就是由nfds个struct pollfd类型的实例组成的数组,在之前的do_sys_poll函数中,被分成长度为POLLFD_PER_PAGE的若干个部分,然后再将每个部分用链表串联起来。这样做的原因应该就是保证每次处理的一批不会超过页大小,尽量减少换页。
在do_poll函数中我们可以看到,对每个链表项,遍历了其entries数组,也就相当于对我们传入的fds进行遍历。对每一个文件描述符,使用do_pollfd(pfd, pt, &can_busy_loop, busy_flag)来进行真正的poll操作,而do_pollfd,实际上就是调用的vfs_poll。
pselect6与ppoll
系统调用号
pselect6为270,ppoll为271。
函数签名
内核接口
asmlinkage long sys_pselect6(int, fd_set __user *, fd_set __user *, fd_set __user *, struct __kernel_timespec __user *, void __user *);
asmlinkage long sys_ppoll(struct pollfd __user *, unsigned int, struct __kernel_timespec __user *, const sigset_t __user *, size_t);
glibc封装
pselect6
#include <sys/select.h>
int pselect(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, const struct timespec *timeout, const sigset_t *sigmask);
ppoll
#define _GNU_SOURCE
#include <signal.h>
#include <poll.h>
int ppoll(struct pollfd *fds, nfds_t nfds, const struct timespec *tmo_p, const sigset_t *sigmask);
简介
pselect与select的区别主要在于:
- 超时精度
select使用的是struct timeval结构体,精确到微秒pselect使用的是struct timespec结构体,精确到纳秒
- 超时参数
select可能会更新其超时参数timeoutpselect6系统调用可能会更新其超时参数,glibc的封装pselect不会更新其超时参数
- 信号掩码
pselect可以设置信号掩码,若其为NULL,则行为与select相同
ppoll与poll的区别主要在于:
- 超时精度
poll使用的超时精度为毫秒ppoll使用的是struct timespec结构体,精确到纳秒
- 信号掩码
ppoll可以设置信号掩码,若其为NULL,则行为与poll相同
pselect与ppoll可以看作执行select或poll前后设置信号掩码。之所以需要这两个单独的系统调用,是因为如果我们的需求是,要么接收到特定的信号,要么某个文件描述符准备好,然后执行后续的操作。如果我们分为两步操作,但是接受信号实际上是在判断是否接收到信号之后,也就是判断结果为没接收到信号,而且在调用select之前,那么就可能陷入无限等待。pselect与ppoll进行信号判断的时候则是使用原子操作,所以不会产生这样的竞争条件。
实现
pselect6与ppoll在Linux内核中的实现与select和poll类似。有一点需要说明的是,pselect6接受的最后一个参数是void *类型的,这是因为它实际上需要的类型为
struct {
const kernel_sigset_t *ss; /* Pointer to signal set */
size_t ss_len; /* Size (in bytes) of object
pointed to by 'ss' */
};
本来其实可以把这两个字段变成两个函数的参数的,但由于Linux x86_64的ABI要求系统调用至多只能接受6个参数,所以最后一个参数只能是这样的结构体了。
值得注意的是,在glibc的封装中,以pselect为例:
我们在glibc源码的sysdeps/unix/sysv/linux/pselect.c中可以看到:
int
__pselect (int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,
const struct timespec *timeout, const sigset_t *sigmask)
{
/* The Linux kernel can in some situations update the timeout value.
We do not want that so use a local variable. */
struct timespec tval;
if (timeout != NULL)
{
tval = *timeout;
timeout = &tval;
}
/* Note: the system call expects 7 values but on most architectures
we can only pass in 6 directly. If there is an architecture with
support for more parameters a new version of this file needs to
be created. */
struct
{
__syscall_ulong_t ss;
__syscall_ulong_t ss_len;
} data;
data.ss = (__syscall_ulong_t) (uintptr_t) sigmask;
data.ss_len = _NSIG / 8;
return SYSCALL_CANCEL (pselect6, nfds, readfds, writefds, exceptfds,
timeout, &data);
}
通过设置一个局部变量tval,使得传入的参数timeout不会被内核修改。ppoll也进行了类似的操作。