以下是eventfd
的基本使用方法:
使用eventfd
系统调用创建一个事件文件描述符:
int efd = eventfd(0, 0);
if (efd == -1) {
perror("eventfd");
exit(EXIT_FAILURE);
}
// 使用write函数向eventfd写入一个值,以通知等待该事件的线程或进程
uint64_t u = 1; // 发送的事件计数
if (write(efd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
perror("write");
}
// 使用read函数从eventfd读取事件值。每次读取后,事件计数将减少:
uint64_t u;
if (read(efd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
perror("read");
}
eventfd
函数的参数如下:
int eventfd(unsigned int initval, int flags);
initval
是事件计数的初始值。创建的eventfd
文件描述符将开始于这个值。通常,初始化为0表示没有事件发生;可以根据需要设置为其他值,以便在开始时有特定数量的事件。
flags
:用于设置eventfd
的行为标志。可选值包括:
0
:使用默认设置。EFD_SEMAPHORE
:将eventfd
视为信号量,每次读取将计数器的值减1,若不包含此flag,读取时将计数器清零。EFD_CLOEXEC
:在执行exec
时关闭eventfd
。EFD_NONBLOCK
:使得对eventfd
的操作变为非阻塞。即,如果没有事件可以读取,read
将立即返回,而不会阻塞。
其实现如下:
SYSCALL_DEFINE2(eventfd2, unsigned int, count, int, flags)
{
int fd, error;
struct file *file;
error = get_unused_fd_flags(flags & EFD_SHARED_FCNTL_FLAGS);
if (error < 0)
return error;
fd = error;
// 在匿名节点文件系统(anon_inodefs)中创建一个文件结构体
file = eventfd_file_create(count, flags);
if (IS_ERR(file)) {
error = PTR_ERR(file);
goto err_put_unused_fd;
}
fd_install(fd, file); // 安装文件描述符,将fd和file关联起来
return fd;
err_put_unused_fd:
put_unused_fd(fd);
return error;
}
eventfd_file_create
函数实现如下:
struct file *eventfd_file_create(unsigned int count, int flags)
{
struct file *file;
struct eventfd_ctx *ctx;
/* Check the EFD_* constants for consistency. */
BUILD_BUG_ON(EFD_CLOEXEC != O_CLOEXEC);
BUILD_BUG_ON(EFD_NONBLOCK != O_NONBLOCK);
if (flags & ~EFD_FLAGS_SET)
return ERR_PTR(-EINVAL);
ctx = kmalloc(sizeof(*ctx), GFP_KERNEL);
if (!ctx)
return ERR_PTR(-ENOMEM);
kref_init(&ctx->kref);
init_waitqueue_head(&ctx->wqh); // 初始化等待队列
ctx->count = count; // 初始化计数器值
ctx->flags = flags;
// 这里将ctx作为私有数据赋值给file结构体中的private_data字段
// 这样通过fd可以在进程的fd表中找到相应file结构体,最终找到eventfd_ctx
file = anon_inode_getfile("[eventfd]", &eventfd_fops, ctx,
O_RDWR | (flags & EFD_SHARED_FCNTL_FLAGS));
if (IS_ERR(file))
eventfd_free_ctx(ctx);
return file;
}
当对eventfd进行读时,会调用anon_inode_getfile
注册的eventfd_read
接口:
static ssize_t eventfd_read(struct file *file, char __user *buf, size_t count,
loff_t *ppos)
{
struct eventfd_ctx *ctx = file->private_data;
ssize_t res;
__u64 cnt;
if (count < sizeof(cnt))
return -EINVAL;
// /* 如果count非0,将其放在cnt中返回给用户进程 */
res = eventfd_ctx_read(ctx, file->f_flags & O_NONBLOCK, &cnt);
if (res < 0)
return res;
return put_user(cnt, (__u64 __user *) buf) ? -EFAULT : sizeof(cnt);
}
进而调用到eventfd_ctx_read
:
ssize_t eventfd_ctx_read(struct eventfd_ctx *ctx, int no_wait, __u64 *cnt)
{
ssize_t res;
DECLARE_WAITQUEUE(wait, current);
spin_lock_irq(&ctx->wqh.lock);
*cnt = 0;
res = -EAGAIN;
if (ctx->count > 0) /* 如果count大于0,读进程不阻塞 */
res = 0;
else if (!no_wait) { /* count小于0,阻塞方式读 */
__add_wait_queue(&ctx->wqh, &wait); /* 将等待队列项添加到eventfd的等待队列头中 */
for (;;) {
set_current_state(TASK_INTERRUPTIBLE);
if (ctx->count > 0) {
res = 0;
break;
}
if (signal_pending(current)) {
res = -ERESTARTSYS;
break;
}
spin_unlock_irq(&ctx->wqh.lock);
schedule();
spin_lock_irq(&ctx->wqh.lock);
}
/* 跳出,说明此时count大于0了, 将当前进程从eventfd的等待队列中删除 */
__remove_wait_queue(&ctx->wqh, &wait);
__set_current_state(TASK_RUNNING); /* 设置运行状态 */
}
if (likely(res == 0)) {
eventfd_ctx_do_read(ctx, cnt); /* 读取counter */
if (waitqueue_active(&ctx->wqh)) /* 如果eventfd上有阻塞的写进程,将其唤醒 */
wake_up_locked_poll(&ctx->wqh, POLLOUT);
}
spin_unlock_irq(&ctx->wqh.lock);
return res;
}
在eventfd_ctx_do_read
函数中,会依据读取方式执行真正的读取动作:
- 信号量方式:count计数器减1,返回1
- 非信号量方式:count计数器置0,返回置0前计数器值
static void eventfd_ctx_do_read(struct eventfd_ctx *ctx, __u64 *cnt)
{
*cnt = (ctx->flags & EFD_SEMAPHORE) ? 1 : ctx->count;
ctx->count -= *cnt;
}
写event fd时会调用:
static ssize_t eventfd_write(struct file *file, const char __user *buf, size_t count,
loff_t *ppos)
{
struct eventfd_ctx *ctx = file->private_data;
ssize_t res;
__u64 ucnt;
DECLARE_WAITQUEUE(wait, current);
if (count < sizeof(ucnt))
return -EINVAL;
if (copy_from_user(&ucnt, buf, sizeof(ucnt)))
return -EFAULT;
if (ucnt == ULLONG_MAX)
return -EINVAL;
spin_lock_irq(&ctx->wqh.lock);
res = -EAGAIN;
if (ULLONG_MAX - ctx->count > ucnt) // ucnt+ctx->count < ULLONG_MAX (即内核计数器不会溢出)
res = sizeof(ucnt);
else if (!(file->f_flags & O_NONBLOCK)) { // 计数器溢出,并且eventfd打开方式为阻塞
__add_wait_queue(&ctx->wqh, &wait);
for (res = 0;;) {
set_current_state(TASK_INTERRUPTIBLE); // 阻塞自身执行
if (ULLONG_MAX - ctx->count > ucnt) { // 被唤醒后再次判断是否会溢出
res = sizeof(ucnt);
break;
}
if (signal_pending(current)) {
res = -ERESTARTSYS;
break;
}
spin_unlock_irq(&ctx->wqh.lock);
schedule();
spin_lock_irq(&ctx->wqh.lock);
}
__remove_wait_queue(&ctx->wqh, &wait); /* 不会溢出,跳出循环,从等待队列中退出 */
__set_current_state(TASK_RUNNING); /* 设置运行状态 */
}
if (likely(res > 0)) {
ctx->count += ucnt; /* 增加内核计数器 */
if (waitqueue_active(&ctx->wqh)) /* 如果eventfd上有阻塞的读进程,将其唤醒 */
wake_up_locked_poll(&ctx->wqh, POLLIN);
}
spin_unlock_irq(&ctx->wqh.lock);
return res;
}