poll, select, pselect6, ppoll系统调用

selectpoll

系统调用号

poll为7,select为23。

函数原型

内核接口

asmlinkage long sys_poll(struct pollfd __user *ufds, unsigned int nfds, int timeout);
asmlinkage long sys_select(int n, fd_set __user *inp, fd_set __user *outp, fd_set __user *exp, struct timeval __user *tvp);

glibc封装

poll

#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);

select

#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

简介

selectpoll都是为了实现IO多路复用的功能。

一般来说,对硬盘上的文件的读取都不会阻塞。但是,对管道、套接字、伪终端等文件的读取,是可能产生阻塞的。举个例子来说,如果我们读取stdin

int fd = STDIN_FILENO; // stdin
char buf[64];
read(fd, buf, 64); // blocks here
process_read_content(buf);

那么,在执行read时,如果我们一直不向终端输入,那么这里会始终阻塞着,程序永远不会执行到之后的process_read_context(buf)。在这种情况下,这种行为是符合逻辑的,因为我们之后的语句是依赖读入的内容buf的。所以除非我们收到了buf的内容,否则就不应该执行之后的指令。

但是,如果有多个文件需要读入,就产生了问题。假设我们有一个nfd个元素的文件描述符数组fds,我们需要对他们读入,并彼此独立地分别处理每个读入的内容。

  • 方案一

    void process_fds(int *fds, int nfd) {
        for (int i = 0; i < nfd; i++) {
            char buf[64];
            read(fds[i], buf, 64);
            process_read_content(buf);
        }
    }
    

    这个方案能完成我们的需求,但是效率实在是太低了。由于是按顺序依次处理读入的内容,如果fds[0]始终没有输入,但是fds[1]早就有了输入。我们明明可以先处理fds[1]的输入的,但是由于进程阻塞在了fds[0]read操作中,我们的时间就这样被白白浪费了。

  • 方案二

    既然每个文件描述符处理读入是互相独立的,我们就可以创建nfd个线程,每个线程中处理其读入。

    这种方案确实可以解决我们方案一中的问题,但是线程的创建、线程之间的切换是非常耗费时间的。

为了更高效地解决这个问题,我们可以增加一个新的操作——判断某个文件描述符是否可以读入。我们可以遍历文件描述符,判断是否有已经可以读入的,如果有的话就直接处理,如果没有的话就再次遍历。这样几乎没有耗费的时间。

我们甚至可以想出更高效的方案,在主线程中查询是否有可以读取的文件描述符,然后把可以读取的文件描述符给别的线程执行。

select就为我们提供了一个类似的解决方案。我们给select传入需要检测IO状态(可以读入、可以写入等)的文件描述符集合,select立即返回,告诉我们哪些文件描述符的IO已经准备就绪。

poll的功能和select类似,但解决了一些select的缺点。具体请见下面的用法一节。

用法

select

select的函数签名为

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

readfds, writefdsexceptfds是文件描述符集合,分别用于:

  • readfds

    已经准备好供读取的文件描述符集合,即read操作不会阻塞。

  • writefds

    已经准备好供写入的文件描述符集合,即write操作不会阻塞。

  • exceptfds

    其余条件的文件描述符集合。包括:

    • TCP套接字有带外数据
    • 处于包模式下的伪终端的主端检测到从端的状态变化
    • cgroup.events文件被修改

如果对相应的状态变化不感兴趣,在对应的参数中传递NULL即可。

我们可以用以下几个函数操作fd_set类型的变量:

void FD_CLR(int fd, fd_set *set);
int  FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);
  • FD_ZERO

    set清空

  • FD_SET

    fd放入set

  • FD_CLR

    fdset中移除

  • FD_ISSET

    判断fd是否处于set

nfdsreadfds, writefds, exceptfds中,数值最大的文件描述符加1。如readfds包含文件描述符4, 6, 7,writefds包含文件描述符5,exceptfds为空,则nfds为8。

timeout为超时参数,其结构为

struct timeval {
    time_t      tv_sec;         /* seconds */
    suseconds_t tv_usec;        /* microseconds */
};

如果timeout指针为NULL,则select将一直等待,直到有一个文件描述符准备好。如果tv_sectv_usec均为0,则select将立即返回。否则,select如果等待达到timeout的时间,还没有任何文件描述符准备好,就返回。

当函数返回之后,会有如下变化:

  • 返回值为readfds, writefds, exceptfds中准备好的文件描述符的总数
  • readfds, writefds, exceptfds中会只保留已经处于准备好状态的文件描述符。我们可以通过FD_ISSET去查看哪些文件描述符准备好。(正因如此,如果我们在一个循环中使用select,那在每次使用之前,需要复制一遍各集合,或用FD_CLR清空后重新添加)
  • select可能会更新timeout参数。

综上,如果我们要用select,按第三个方案来实现我们的功能,其写法为

void process_fds(int *fds, int nfd) {
    fd_set rset;
    FD_ZERO(&rset);
    int maxfd = -1;
    for (int i = 0; i < nfd; i++) {
        FD_SET(fds[i], &rset);
        if (fds[i] > maxfd) {
            maxfd = fds[i];
        }
    }
    while (1) {
        fd_set tmp_rset;
        memcpy(&tmp_rset, &rset, sizeof(fd_set));
        if (select(maxfd + 1, &tmp_rset, NULL, NULL, NULL) <= 0) {
            break;
        }
        for (int i = 0; i < nfd; i++) {
            if (FD_ISSET(fds[i], &tmp_rset)) {
                FD_CLR(fds[i], &rset);
                char buf[64];
                read(fds[i], buf, 64);
                process_read_content(buf);
            }
        }
    }
}

此外,值得注意的是,glibc的封装要求我们的文件描述符的值不能超过FD_SETSIZE,也就是1024。

poll

poll的函数签名为

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

select不同的是,它并不是把文件描述符放在fd_set结构体中,而是放在一个struct pollfd类型组成的数组中,nfds为该数组的长度。

struct pollfd的定义为

struct pollfd {
    int   fd;         /* file descriptor */
    short events;     /* requested events */
    short revents;    /* returned events */
};

fd是文件描述符,events是用户感兴趣的事件(类似于select中的readfds, writefdsexceptfds),由用户填写;revents是实际发生的事件,由内核填写。

eventsrevents是位掩码,其可以包含的标志位有

  • POLLIN:存在数据可以读入(相当于select中的readfds

  • POLLPRI:存在其他条件满足(相当于select中的exceptfds

  • POLLOUT:存在数据可以写入(相当于select中的writefds

  • POLLRDHUP

    流套接字对端关闭连接。

    需定义_GNU_SOURCE宏。

  • POLLERR

    出错。

    只可由revents包含,不可由events包含

  • POLLHUP

    挂起。

    只可由revents包含,不可由events包含

  • POLLNVAL

    由于fd未打开,请求无效。

    只可由revents包含,不可由events包含

events为0时,revents只可返回POLLERR, POLLHUPPOLLNVAL(将events置为0类似于select中的FD_CLR)。若其不为0,则可以返回events中包含的事件,以及POLLERR, POLLHUPPOLLNVAL。如果返回的revents为0,则表示什么都没发生,可能超时了,或者别的文件描述符中发生了用户感兴趣的事。

timeout参数表示其最多等待时间(以毫秒为单位)。如果其为负,则表示poll无限等待;如果其为0,则表示poll立即返回。

如果在超时范围内,任何一个用户感兴趣的事件发生了,poll将会返回,返回值为产生用户感兴趣事件的文件描述符个数;如果超时了,没有任何一个用户感兴趣的事件发生,则poll将会返回0。

综上,如果我们要用poll,按第三个方案来实现我们的功能,其写法为

void process_fds(int *fds, int nfd) {
    struct pollfd *pollfds = (struct pollfd *)malloc(nfd * sizeof(struct pollfd));
    for (int i = 0; i < nfd; i++) {
        pollfds[i].fd = fds[i];
        pollfds[i].events = POLLIN;
    }
    while (1) {
        if (poll(pollfds, nfd, -1) <= 0) {
            break;
        }
        for (int i = 0; i < nfd; i++) {
            if (pollfds[i].revents & POLLIN) {
                pollfds[i].events = 0;
                char buf[64];
                read(fds[i], buf, 64);
                process_read_content(buf);
            }
        }
    }
}

select不同的是,其可以包含的文件描述符个数无上限。

根据上述的讨论,pollselect的区别在于

  • poll文件描述符个数无上限,select文件描述符其值上限为FD_SETSIZE
  • poll感兴趣的事件种类更多
  • poll不需要在每次调用前都复制一遍fd_set,也就是poll不会改变传入的fds
  • poll超时参数精度为毫秒,select超时参数精度为微秒,select更精确。

实现

select

首先我们来看看fd_set与和其相关的函数是怎么实现的。在Linux内核的include/linux/types.h中可以看到

typedef __kernel_fd_set		fd_set;

而在include/uapi/linux/posix_types.h中可以看到

#define __FD_SETSIZE	1024

typedef struct {
	unsigned long fds_bits[__FD_SETSIZE / (8 * sizeof(long))];
} __kernel_fd_set;

所以,这其实就是一个长度为1024字节的位数组。同时我们也明白了,为什么select要求文件描述符的值不能超过FD_SETSIZE

同时,我们也可以在glibc的源码misc/sys/select.h中看到和其相关的函数的定义

#define	FD_SET(fd, fdsetp)	__FD_SET (fd, fdsetp)
#define	FD_CLR(fd, fdsetp)	__FD_CLR (fd, fdsetp)
#define	FD_ISSET(fd, fdsetp)	__FD_ISSET (fd, fdsetp)
#define	FD_ZERO(fdsetp)		__FD_ZERO (fdsetp)

其实现则位于bits/select.h

#define __FD_ZERO(s) \
  do {									      \
    unsigned int __i;							      \
    fd_set *__arr = (s);						      \
    for (__i = 0; __i < sizeof (fd_set) / sizeof (__fd_mask); ++__i)	      \
      __FDS_BITS (__arr)[__i] = 0;					      \
  } while (0)
#define __FD_SET(d, s) \
  ((void) (__FDS_BITS (s)[__FD_ELT(d)] |= __FD_MASK(d)))
#define __FD_CLR(d, s) \
  ((void) (__FDS_BITS (s)[__FD_ELT(d)] &= ~__FD_MASK(d)))
#define __FD_ISSET(d, s) \
  ((__FDS_BITS (s)[__FD_ELT (d)] & __FD_MASK (d)) != 0)

简单来说,就是:

  • FD_ZERO将整个位数组清0(不用memset的原因是,这可能需要在之前声明memset的原型,并且这个数组其实并不大)
  • FD_SET将该文件描述符对应的比特位置1
  • FD_CLR将该文件描述符对应的比特位置0
  • FD_ISSET判断该文件描述符对应的比特位是否为1

接着,我们来看看select内部的实现。其实现均位于Linux内核源码的fs/select.c文件中。

首先,在core_sys_select函数里,使用了一个fd_set_bits的结构体,其定义为:

typedef struct {
	unsigned long *in, *out, *ex;
	unsigned long *res_in, *res_out, *res_ex;
} fd_set_bits;

一共六个位数组,前三个是存储我们传入的参数的,后三个存储结果,在最终再复制进前三个中。

select的实现,最主要的就是do_select函数,其内容非常长,但也十分重要:

static int do_select(int n, fd_set_bits *fds, struct timespec64 *end_time)
{
	ktime_t expire, *to = NULL;
	struct poll_wqueues table;
	poll_table *wait;
	int retval, i, timed_out = 0;
	u64 slack = 0;
	__poll_t busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;
	unsigned long busy_start = 0;

	rcu_read_lock();
	retval = max_select_fd(n, fds);
	rcu_read_unlock();

	if (retval < 0)
		return retval;
	n = retval;

	poll_initwait(&table);
	wait = &table.pt;
	if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
		wait->_qproc = NULL;
		timed_out = 1;
	}

	if (end_time && !timed_out)
		slack = select_estimate_accuracy(end_time);

	retval = 0;
	for (;;) {
		unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
		bool can_busy_loop = false;

		inp = fds->in; outp = fds->out; exp = fds->ex;
		rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;

		for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
			unsigned long in, out, ex, all_bits, bit = 1, j;
			unsigned long res_in = 0, res_out = 0, res_ex = 0;
			__poll_t mask;

			in = *inp++; out = *outp++; ex = *exp++;
			all_bits = in | out | ex;
			if (all_bits == 0) {
				i += BITS_PER_LONG;
				continue;
			}

			for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) {
				struct fd f;
				if (i >= n)
					break;
				if (!(bit & all_bits))
					continue;
				f = fdget(i);
				if (f.file) {
					wait_key_set(wait, in, out, bit,
						     busy_flag);
					mask = vfs_poll(f.file, wait);

					fdput(f);
					if ((mask & POLLIN_SET) && (in & bit)) {
						res_in |= bit;
						retval++;
						wait->_qproc = NULL;
					}
					if ((mask & POLLOUT_SET) && (out & bit)) {
						res_out |= bit;
						retval++;
						wait->_qproc = NULL;
					}
					if ((mask & POLLEX_SET) && (ex & bit)) {
						res_ex |= bit;
						retval++;
						wait->_qproc = NULL;
					}
					/* got something, stop busy polling */
					if (retval) {
						can_busy_loop = false;
						busy_flag = 0;

					/*
					 * only remember a returned
					 * POLL_BUSY_LOOP if we asked for it
					 */
					} else if (busy_flag & mask)
						can_busy_loop = true;

				}
			}
			if (res_in)
				*rinp = res_in;
			if (res_out)
				*routp = res_out;
			if (res_ex)
				*rexp = res_ex;
			cond_resched();
		}
		wait->_qproc = NULL;
		if (retval || timed_out || signal_pending(current))
			break;
		if (table.error) {
			retval = table.error;
			break;
		}

		/* only if found POLL_BUSY_LOOP sockets && not out of time */
		if (can_busy_loop && !need_resched()) {
			if (!busy_start) {
				busy_start = busy_loop_current_time();
				continue;
			}
			if (!busy_loop_timeout(busy_start))
				continue;
		}
		busy_flag = 0;

		/*
		 * If this is the first loop and we have a timeout
		 * given, then we convert to ktime_t and set the to
		 * pointer to the expiry value.
		 */
		if (end_time && !to) {
			expire = timespec64_to_ktime(*end_time);
			to = &expire;
		}

		if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,
					   to, slack))
			timed_out = 1;
	}

	poll_freewait(&table);

	return retval;
}

fd_set_bits类型的fds,其表示输入的字段复制于系统调用的输入,其表示输出的字段在调用前被清空。

其主体部分为两层嵌套的循环。在最外层循环中,每一轮循环处理BITS_PER_LONG,也就是一般来说64个文件描述符。这是因为我们的数据是用long的位数组来存储的,所以这样分批次效率更高。在内循环中,我们遍历这个long整型的每个字节,其每个字节对应一个文件描述符。

也就是说,我们从0开始,一直到我们传入系统调用的参数n,也就是最大的文件描述符的值,遍历每个文件描述符,在在53行看到,通过bit & all_bits,判断当前的文件描述符是否在我们之前传入的in, outex集合中。对于存在集合中的,最终调用了vfs_poll来查询单个文件的状态,其实现位于fs/poll.h中:

static inline __poll_t vfs_poll(struct file *file, struct poll_table_struct *pt)
{
	if (unlikely(!file->f_op->poll))
		return DEFAULT_POLLMASK;
	return file->f_op->poll(file, pt);
}

依旧是函数指针模拟多态。

最终,再把res_in, res_out, res_ex复制回原本的in, out, ex即可。

总的来说,select的步骤是,对于输入的参数nfds,把值从0到nfds - 1的所有相关的文件描述符都查询一遍,对于每个文件描述符,调用vfs_poll查询状态。

poll

poll的实现位于fs/select.c,其核心do_poll依然很长,但也十分重要:

static int do_poll(struct poll_list *list, struct poll_wqueues *wait, struct timespec64 *end_time)
{
	poll_table* pt = &wait->pt;
	ktime_t expire, *to = NULL;
	int timed_out = 0, count = 0;
	u64 slack = 0;
	__poll_t busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;
	unsigned long busy_start = 0;

	/* Optimise the no-wait case */
	if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
		pt->_qproc = NULL;
		timed_out = 1;
	}

	if (end_time && !timed_out)
		slack = select_estimate_accuracy(end_time);

	for (;;) {
		struct poll_list *walk;
		bool can_busy_loop = false;

		for (walk = list; walk != NULL; walk = walk->next) {
			struct pollfd * pfd, * pfd_end;

			pfd = walk->entries;
			pfd_end = pfd + walk->len;
			for (; pfd != pfd_end; pfd++) {
				/*
				 * Fish for events. If we found one, record it
				 * and kill poll_table->_qproc, so we don't
				 * needlessly register any other waiters after
				 * this. They'll get immediately deregistered
				 * when we break out and return.
				 */
				if (do_pollfd(pfd, pt, &can_busy_loop,
					      busy_flag)) {
					count++;
					pt->_qproc = NULL;
					/* found something, stop busy polling */
					busy_flag = 0;
					can_busy_loop = false;
				}
			}
		}
		/*
		 * All waiters have already been registered, so don't provide
		 * a poll_table->_qproc to them on the next loop iteration.
		 */
		pt->_qproc = NULL;
		if (!count) {
			count = wait->error;
			if (signal_pending(current))
				count = -ERESTARTNOHAND;
		}
		if (count || timed_out)
			break;

		/* only if found POLL_BUSY_LOOP sockets && not out of time */
		if (can_busy_loop && !need_resched()) {
			if (!busy_start) {
				busy_start = busy_loop_current_time();
				continue;
			}
			if (!busy_loop_timeout(busy_start))
				continue;
		}
		busy_flag = 0;

		/*
		 * If this is the first loop and we have a timeout
		 * given, then we convert to ktime_t and set the to
		 * pointer to the expiry value.
		 */
		if (end_time && !to) {
			expire = timespec64_to_ktime(*end_time);
			to = &expire;
		}

		if (!poll_schedule_timeout(wait, TASK_INTERRUPTIBLE, to, slack))
			timed_out = 1;
	}
	return count;
}

传入的参数list的类型是struct poll_list的指针:

struct poll_list {
	struct poll_list *next;
	int len;
	struct pollfd entries[0];
};

也就是说,这里是一个链表。其entries字段是一个变长数组。我们传入do_polllist参数是将传入系统调用的fds,也就是由nfdsstruct pollfd类型的实例组成的数组,在之前的do_sys_poll函数中,被分成长度为POLLFD_PER_PAGE的若干个部分,然后再将每个部分用链表串联起来。这样做的原因应该就是保证每次处理的一批不会超过页大小,尽量减少换页。

do_poll函数中我们可以看到,对每个链表项,遍历了其entries数组,也就相当于对我们传入的fds进行遍历。对每一个文件描述符,使用do_pollfd(pfd, pt, &can_busy_loop, busy_flag)来进行真正的poll操作,而do_pollfd,实际上就是调用的vfs_poll

pselect6ppoll

系统调用号

pselect6为270,ppoll为271。

函数签名

内核接口

asmlinkage long sys_pselect6(int, fd_set __user *, fd_set __user *, fd_set __user *, struct __kernel_timespec __user *, void __user *);
asmlinkage long sys_ppoll(struct pollfd __user *, unsigned int, struct __kernel_timespec __user *, const sigset_t __user *, size_t);

glibc封装

pselect6

#include <sys/select.h>
int pselect(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, const struct timespec *timeout,  const sigset_t *sigmask);

ppoll

#define _GNU_SOURCE
#include <signal.h>
#include <poll.h>
int ppoll(struct pollfd *fds, nfds_t nfds, const struct timespec *tmo_p, const sigset_t *sigmask);

简介

pselectselect的区别主要在于:

  • 超时精度
    • select使用的是struct timeval结构体,精确到微秒
    • pselect使用的是struct timespec结构体,精确到纳秒
  • 超时参数
    • select可能会更新其超时参数timeout
    • pselect6系统调用可能会更新其超时参数,glibc的封装pselect不会更新其超时参数
  • 信号掩码
    • pselect可以设置信号掩码,若其为NULL,则行为与select相同

ppollpoll的区别主要在于:

  • 超时精度
    • poll使用的超时精度为毫秒
    • ppoll使用的是struct timespec结构体,精确到纳秒
  • 信号掩码
    • ppoll可以设置信号掩码,若其为NULL,则行为与poll相同

pselectppoll可以看作执行selectpoll前后设置信号掩码。之所以需要这两个单独的系统调用,是因为如果我们的需求是,要么接收到特定的信号,要么某个文件描述符准备好,然后执行后续的操作。如果我们分为两步操作,但是接受信号实际上是在判断是否接收到信号之后,也就是判断结果为没接收到信号,而且在调用select之前,那么就可能陷入无限等待。pselectppoll进行信号判断的时候则是使用原子操作,所以不会产生这样的竞争条件。

实现

pselect6ppoll在Linux内核中的实现与selectpoll类似。有一点需要说明的是,pselect6接受的最后一个参数是void *类型的,这是因为它实际上需要的类型为

struct {
    const kernel_sigset_t *ss;   /* Pointer to signal set */
    size_t ss_len;               /* Size (in bytes) of object
                                    pointed to by 'ss' */
};

本来其实可以把这两个字段变成两个函数的参数的,但由于Linux x86_64的ABI要求系统调用至多只能接受6个参数,所以最后一个参数只能是这样的结构体了。

值得注意的是,在glibc的封装中,以pselect为例:

我们在glibc源码的sysdeps/unix/sysv/linux/pselect.c中可以看到:

int
__pselect (int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,
	   const struct timespec *timeout, const sigset_t *sigmask)
{
  /* The Linux kernel can in some situations update the timeout value.
     We do not want that so use a local variable.  */
  struct timespec tval;
  if (timeout != NULL)
    {
      tval = *timeout;
      timeout = &tval;
    }

  /* Note: the system call expects 7 values but on most architectures
     we can only pass in 6 directly.  If there is an architecture with
     support for more parameters a new version of this file needs to
     be created.  */
  struct
  {
    __syscall_ulong_t ss;
    __syscall_ulong_t ss_len;
  } data;

  data.ss = (__syscall_ulong_t) (uintptr_t) sigmask;
  data.ss_len = _NSIG / 8;

  return SYSCALL_CANCEL (pselect6, nfds, readfds, writefds, exceptfds,
                         timeout, &data);
}

通过设置一个局部变量tval,使得传入的参数timeout不会被内核修改。ppoll也进行了类似的操作。