diff --git a/util/fdmon-io_uring.c b/util/fdmon-io_uring.c index 16054c5ede3425837d0e7426ce58d72e09e8e2af..2da59b9dd2309753982b03474f416c1bff47057b 100644 --- a/util/fdmon-io_uring.c +++ b/util/fdmon-io_uring.c @@ -52,9 +52,10 @@ enum { FDMON_IO_URING_ENTRIES = 128, /* sq/cq ring size */ /* AioHandler::flags */ - FDMON_IO_URING_PENDING = (1 << 0), - FDMON_IO_URING_ADD = (1 << 1), - FDMON_IO_URING_REMOVE = (1 << 2), + FDMON_IO_URING_PENDING = (1 << 0), + FDMON_IO_URING_ADD = (1 << 1), + FDMON_IO_URING_REMOVE = (1 << 2), + FDMON_IO_URING_DELETE_AIO_HANDLER = (1 << 3), }; static inline int poll_events_from_pfd(int pfd_events) @@ -187,20 +188,6 @@ static void add_poll_remove_sqe(AioContext *ctx, AioHandler *node) io_uring_sqe_set_data(sqe, NULL); } -/* Add a timeout that self-cancels when another cqe becomes ready */ -static void add_timeout_sqe(AioContext *ctx, int64_t ns) -{ - struct io_uring_sqe *sqe; - struct __kernel_timespec ts = { - .tv_sec = ns / NANOSECONDS_PER_SECOND, - .tv_nsec = ns % NANOSECONDS_PER_SECOND, - }; - - sqe = get_sqe(ctx); - io_uring_prep_timeout(sqe, &ts, 1, 0); - io_uring_sqe_set_data(sqe, NULL); -} - /* Add sqes from ctx->submit_list for submission */ static void fill_sq_ring(AioContext *ctx) { @@ -218,6 +205,16 @@ static void fill_sq_ring(AioContext *ctx) if (flags & FDMON_IO_URING_REMOVE) { add_poll_remove_sqe(ctx, node); } + if (flags & FDMON_IO_URING_DELETE_AIO_HANDLER) { + /* + * process_cqe() sets this flag after ADD and REMOVE have been + * cleared. They cannot be set again, so they must be clear. + */ + assert(!(flags & FDMON_IO_URING_ADD)); + assert(!(flags & FDMON_IO_URING_REMOVE)); + + QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted); + } } } @@ -241,7 +238,12 @@ static bool process_cqe(AioContext *ctx, */ flags = qatomic_fetch_and(&node->flags, ~FDMON_IO_URING_REMOVE); if (flags & FDMON_IO_URING_REMOVE) { - QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted); + if (flags & FDMON_IO_URING_PENDING) { + /* Still on ctx->submit_list, defer deletion until fill_sq_ring() */ + qatomic_or(&node->flags, FDMON_IO_URING_DELETE_AIO_HANDLER); + } else { + QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted); + } return false; } @@ -275,13 +277,24 @@ static int process_cq_ring(AioContext *ctx, AioHandlerList *ready_list) static int fdmon_io_uring_wait(AioContext *ctx, AioHandlerList *ready_list, int64_t timeout) { + struct __kernel_timespec ts; unsigned wait_nr = 1; /* block until at least one cqe is ready */ int ret; if (timeout == 0) { wait_nr = 0; /* non-blocking */ } else if (timeout > 0) { - add_timeout_sqe(ctx, timeout); + /* Add a timeout that self-cancels when another cqe becomes ready */ + struct io_uring_sqe *sqe; + + ts = (struct __kernel_timespec){ + .tv_sec = timeout / NANOSECONDS_PER_SECOND, + .tv_nsec = timeout % NANOSECONDS_PER_SECOND, + }; + + sqe = get_sqe(ctx); + io_uring_prep_timeout(sqe, &ts, 1, 0); + io_uring_sqe_set_data(sqe, NULL); } fill_sq_ring(ctx); @@ -347,10 +360,13 @@ void fdmon_io_uring_destroy(AioContext *ctx) unsigned flags = qatomic_fetch_and(&node->flags, ~(FDMON_IO_URING_PENDING | FDMON_IO_URING_ADD | - FDMON_IO_URING_REMOVE)); + FDMON_IO_URING_REMOVE | + FDMON_IO_URING_DELETE_AIO_HANDLER)); - if (flags & FDMON_IO_URING_REMOVE) { - QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted); + if ((flags & FDMON_IO_URING_REMOVE) || + (flags & FDMON_IO_URING_DELETE_AIO_HANDLER)) { + QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, + node, node_deleted); } QSLIST_REMOVE_HEAD_RCU(&ctx->submit_list, node_submitted);