123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914 |
- //
- // detail/impl/io_uring_service.ipp
- // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- //
- // Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)
- //
- // Distributed under the Boost Software License, Version 1.0. (See accompanying
- // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
- //
- #ifndef ASIO_DETAIL_IMPL_IO_URING_SERVICE_IPP
- #define ASIO_DETAIL_IMPL_IO_URING_SERVICE_IPP
- #if defined(_MSC_VER) && (_MSC_VER >= 1200)
- # pragma once
- #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
- #include "asio/detail/config.hpp"
- #if defined(ASIO_HAS_IO_URING)
- #include <cstddef>
- #include <sys/eventfd.h>
- #include "asio/detail/io_uring_service.hpp"
- #include "asio/detail/reactor_op.hpp"
- #include "asio/detail/scheduler.hpp"
- #include "asio/detail/throw_error.hpp"
- #include "asio/error.hpp"
- #include "asio/detail/push_options.hpp"
- namespace asio {
- namespace detail {
- io_uring_service::io_uring_service(asio::execution_context& ctx)
- : execution_context_service_base<io_uring_service>(ctx),
- scheduler_(use_service<scheduler>(ctx)),
- mutex_(ASIO_CONCURRENCY_HINT_IS_LOCKING(
- REACTOR_REGISTRATION, scheduler_.concurrency_hint())),
- outstanding_work_(0),
- submit_sqes_op_(this),
- pending_sqes_(0),
- pending_submit_sqes_op_(false),
- shutdown_(false),
- timeout_(),
- registration_mutex_(mutex_.enabled()),
- reactor_(use_service<reactor>(ctx)),
- reactor_data_(),
- event_fd_(-1)
- {
- reactor_.init_task();
- init_ring();
- register_with_reactor();
- }
- io_uring_service::~io_uring_service()
- {
- if (ring_.ring_fd != -1)
- ::io_uring_queue_exit(&ring_);
- if (event_fd_ != -1)
- ::close(event_fd_);
- }
- void io_uring_service::shutdown()
- {
- mutex::scoped_lock lock(mutex_);
- shutdown_ = true;
- lock.unlock();
- op_queue<operation> ops;
- // Cancel all outstanding operations.
- while (io_object* io_obj = registered_io_objects_.first())
- {
- for (int i = 0; i < max_ops; ++i)
- {
- if (!io_obj->queues_[i].op_queue_.empty())
- {
- ops.push(io_obj->queues_[i].op_queue_);
- if (::io_uring_sqe* sqe = get_sqe())
- ::io_uring_prep_cancel(sqe, &io_obj->queues_[i], 0);
- }
- }
- io_obj->shutdown_ = true;
- registered_io_objects_.free(io_obj);
- }
- // Cancel the timeout operation.
- if (::io_uring_sqe* sqe = get_sqe())
- ::io_uring_prep_cancel(sqe, &timeout_, IOSQE_IO_DRAIN);
- submit_sqes();
- // Wait for all completions to come back.
- for (; outstanding_work_ > 0; --outstanding_work_)
- {
- ::io_uring_cqe* cqe = 0;
- if (::io_uring_wait_cqe(&ring_, &cqe) != 0)
- break;
- }
- timer_queues_.get_all_timers(ops);
- scheduler_.abandon_operations(ops);
- }
- void io_uring_service::notify_fork(
- asio::execution_context::fork_event fork_ev)
- {
- switch (fork_ev)
- {
- case asio::execution_context::fork_prepare:
- {
- // Cancel all outstanding operations. They will be restarted
- // after the fork completes.
- mutex::scoped_lock registration_lock(registration_mutex_);
- for (io_object* io_obj = registered_io_objects_.first();
- io_obj != 0; io_obj = io_obj->next_)
- {
- mutex::scoped_lock io_object_lock(io_obj->mutex_);
- for (int i = 0; i < max_ops; ++i)
- {
- if (!io_obj->queues_[i].op_queue_.empty()
- && !io_obj->queues_[i].cancel_requested_)
- {
- mutex::scoped_lock lock(mutex_);
- if (::io_uring_sqe* sqe = get_sqe())
- ::io_uring_prep_cancel(sqe, &io_obj->queues_[i], 0);
- }
- }
- }
- // Cancel the timeout operation.
- {
- mutex::scoped_lock lock(mutex_);
- if (::io_uring_sqe* sqe = get_sqe())
- ::io_uring_prep_cancel(sqe, &timeout_, IOSQE_IO_DRAIN);
- submit_sqes();
- }
- // Wait for all completions to come back, and post all completed I/O
- // queues to the scheduler. Note that some operations may have already
- // completed, or were explicitly cancelled. All others will be
- // automatically restarted.
- op_queue<operation> ops;
- for (; outstanding_work_ > 0; --outstanding_work_)
- {
- ::io_uring_cqe* cqe = 0;
- if (::io_uring_wait_cqe(&ring_, &cqe) != 0)
- break;
- if (void* ptr = ::io_uring_cqe_get_data(cqe))
- {
- if (ptr != this && ptr != &timer_queues_ && ptr != &timeout_)
- {
- io_queue* io_q = static_cast<io_queue*>(ptr);
- io_q->set_result(cqe->res);
- ops.push(io_q);
- }
- }
- }
- scheduler_.post_deferred_completions(ops);
- // Restart and eventfd operation.
- register_with_reactor();
- }
- break;
- case asio::execution_context::fork_parent:
- // Restart the timeout and eventfd operations.
- update_timeout();
- register_with_reactor();
- break;
- case asio::execution_context::fork_child:
- {
- // The child process gets a new io_uring instance.
- ::io_uring_queue_exit(&ring_);
- init_ring();
- register_with_reactor();
- }
- break;
- default:
- break;
- }
- }
- void io_uring_service::init_task()
- {
- scheduler_.init_task();
- }
- void io_uring_service::register_io_object(
- io_uring_service::per_io_object_data& io_obj)
- {
- io_obj = allocate_io_object();
- mutex::scoped_lock io_object_lock(io_obj->mutex_);
- io_obj->service_ = this;
- io_obj->shutdown_ = false;
- for (int i = 0; i < max_ops; ++i)
- {
- io_obj->queues_[i].io_object_ = io_obj;
- io_obj->queues_[i].cancel_requested_ = false;
- }
- }
- void io_uring_service::register_internal_io_object(
- io_uring_service::per_io_object_data& io_obj,
- int op_type, io_uring_operation* op)
- {
- io_obj = allocate_io_object();
- mutex::scoped_lock io_object_lock(io_obj->mutex_);
- io_obj->service_ = this;
- io_obj->shutdown_ = false;
- for (int i = 0; i < max_ops; ++i)
- {
- io_obj->queues_[i].io_object_ = io_obj;
- io_obj->queues_[i].cancel_requested_ = false;
- }
- io_obj->queues_[op_type].op_queue_.push(op);
- io_object_lock.unlock();
- mutex::scoped_lock lock(mutex_);
- if (::io_uring_sqe* sqe = get_sqe())
- {
- op->prepare(sqe);
- ::io_uring_sqe_set_data(sqe, &io_obj->queues_[op_type]);
- post_submit_sqes_op(lock);
- }
- else
- {
- asio::error_code ec(ENOBUFS,
- asio::error::get_system_category());
- asio::detail::throw_error(ec, "io_uring_get_sqe");
- }
- }
- void io_uring_service::register_buffers(const ::iovec* v, unsigned n)
- {
- int result = ::io_uring_register_buffers(&ring_, v, n);
- if (result < 0)
- {
- asio::error_code ec(-result,
- asio::error::get_system_category());
- asio::detail::throw_error(ec, "io_uring_register_buffers");
- }
- }
- void io_uring_service::unregister_buffers()
- {
- (void)::io_uring_unregister_buffers(&ring_);
- }
- void io_uring_service::start_op(int op_type,
- io_uring_service::per_io_object_data& io_obj,
- io_uring_operation* op, bool is_continuation)
- {
- if (!io_obj)
- {
- op->ec_ = asio::error::bad_descriptor;
- post_immediate_completion(op, is_continuation);
- return;
- }
- mutex::scoped_lock io_object_lock(io_obj->mutex_);
- if (io_obj->shutdown_)
- {
- io_object_lock.unlock();
- post_immediate_completion(op, is_continuation);
- return;
- }
- if (io_obj->queues_[op_type].op_queue_.empty())
- {
- if (op->perform(false))
- {
- io_object_lock.unlock();
- scheduler_.post_immediate_completion(op, is_continuation);
- }
- else
- {
- io_obj->queues_[op_type].op_queue_.push(op);
- io_object_lock.unlock();
- mutex::scoped_lock lock(mutex_);
- if (::io_uring_sqe* sqe = get_sqe())
- {
- op->prepare(sqe);
- ::io_uring_sqe_set_data(sqe, &io_obj->queues_[op_type]);
- scheduler_.work_started();
- post_submit_sqes_op(lock);
- }
- else
- {
- lock.unlock();
- io_obj->queues_[op_type].set_result(-ENOBUFS);
- post_immediate_completion(&io_obj->queues_[op_type], is_continuation);
- }
- }
- }
- else
- {
- io_obj->queues_[op_type].op_queue_.push(op);
- scheduler_.work_started();
- }
- }
- void io_uring_service::cancel_ops(io_uring_service::per_io_object_data& io_obj)
- {
- if (!io_obj)
- return;
- mutex::scoped_lock io_object_lock(io_obj->mutex_);
- op_queue<operation> ops;
- do_cancel_ops(io_obj, ops);
- io_object_lock.unlock();
- scheduler_.post_deferred_completions(ops);
- }
- void io_uring_service::cancel_ops_by_key(
- io_uring_service::per_io_object_data& io_obj,
- int op_type, void* cancellation_key)
- {
- if (!io_obj)
- return;
- mutex::scoped_lock io_object_lock(io_obj->mutex_);
- bool first = true;
- op_queue<operation> ops;
- op_queue<io_uring_operation> other_ops;
- while (io_uring_operation* op = io_obj->queues_[op_type].op_queue_.front())
- {
- io_obj->queues_[op_type].op_queue_.pop();
- if (op->cancellation_key_ == cancellation_key)
- {
- if (first)
- {
- other_ops.push(op);
- if (!io_obj->queues_[op_type].cancel_requested_)
- {
- io_obj->queues_[op_type].cancel_requested_ = true;
- mutex::scoped_lock lock(mutex_);
- if (::io_uring_sqe* sqe = get_sqe())
- {
- ::io_uring_prep_cancel(sqe, &io_obj->queues_[op_type], 0);
- submit_sqes();
- }
- }
- }
- else
- {
- op->ec_ = asio::error::operation_aborted;
- ops.push(op);
- }
- }
- else
- other_ops.push(op);
- first = false;
- }
- io_obj->queues_[op_type].op_queue_.push(other_ops);
- io_object_lock.unlock();
- scheduler_.post_deferred_completions(ops);
- }
- void io_uring_service::deregister_io_object(
- io_uring_service::per_io_object_data& io_obj)
- {
- if (!io_obj)
- return;
- mutex::scoped_lock io_object_lock(io_obj->mutex_);
- if (!io_obj->shutdown_)
- {
- op_queue<operation> ops;
- bool pending_cancelled_ops = do_cancel_ops(io_obj, ops);
- io_obj->shutdown_ = true;
- io_object_lock.unlock();
- scheduler_.post_deferred_completions(ops);
- if (pending_cancelled_ops)
- {
- // There are still pending operations. Prevent cleanup_io_object from
- // freeing the I/O object and let the last operation to complete free it.
- io_obj = 0;
- }
- else
- {
- // Leave io_obj set so that it will be freed by the subsequent call to
- // cleanup_io_object.
- }
- }
- else
- {
- // We are shutting down, so prevent cleanup_io_object from freeing
- // the I/O object and let the destructor free it instead.
- io_obj = 0;
- }
- }
- void io_uring_service::cleanup_io_object(
- io_uring_service::per_io_object_data& io_obj)
- {
- if (io_obj)
- {
- free_io_object(io_obj);
- io_obj = 0;
- }
- }
- void io_uring_service::run(long usec, op_queue<operation>& ops)
- {
- __kernel_timespec ts;
- int local_ops = 0;
- if (usec > 0)
- {
- ts.tv_sec = usec / 1000000;
- ts.tv_nsec = (usec % 1000000) * 1000;
- mutex::scoped_lock lock(mutex_);
- if (::io_uring_sqe* sqe = get_sqe())
- {
- ++local_ops;
- ::io_uring_prep_timeout(sqe, &ts, 0, 0);
- ::io_uring_sqe_set_data(sqe, &ts);
- submit_sqes();
- }
- }
- ::io_uring_cqe* cqe = 0;
- int result = (usec == 0)
- ? ::io_uring_peek_cqe(&ring_, &cqe)
- : ::io_uring_wait_cqe(&ring_, &cqe);
- if (local_ops > 0)
- {
- if (result != 0 || ::io_uring_cqe_get_data(cqe) != &ts)
- {
- mutex::scoped_lock lock(mutex_);
- if (::io_uring_sqe* sqe = get_sqe())
- {
- ++local_ops;
- ::io_uring_prep_timeout_remove(sqe, reinterpret_cast<__u64>(&ts), 0);
- ::io_uring_sqe_set_data(sqe, &ts);
- submit_sqes();
- }
- }
- }
- bool check_timers = false;
- int count = 0;
- while (result == 0 || local_ops > 0)
- {
- if (result == 0)
- {
- if (void* ptr = ::io_uring_cqe_get_data(cqe))
- {
- if (ptr == this)
- {
- // The io_uring service was interrupted.
- }
- else if (ptr == &timer_queues_)
- {
- check_timers = true;
- }
- else if (ptr == &timeout_)
- {
- check_timers = true;
- timeout_.tv_sec = 0;
- timeout_.tv_nsec = 0;
- }
- else if (ptr == &ts)
- {
- --local_ops;
- }
- else
- {
- io_queue* io_q = static_cast<io_queue*>(ptr);
- io_q->set_result(cqe->res);
- ops.push(io_q);
- }
- }
- ::io_uring_cqe_seen(&ring_, cqe);
- ++count;
- }
- result = (count < complete_batch_size || local_ops > 0)
- ? ::io_uring_peek_cqe(&ring_, &cqe) : -EAGAIN;
- }
- decrement(outstanding_work_, count);
- if (check_timers)
- {
- mutex::scoped_lock lock(mutex_);
- timer_queues_.get_ready_timers(ops);
- if (timeout_.tv_sec == 0 && timeout_.tv_nsec == 0)
- {
- timeout_ = get_timeout();
- if (::io_uring_sqe* sqe = get_sqe())
- {
- ::io_uring_prep_timeout(sqe, &timeout_, 0, 0);
- ::io_uring_sqe_set_data(sqe, &timeout_);
- push_submit_sqes_op(ops);
- }
- }
- }
- }
- void io_uring_service::interrupt()
- {
- mutex::scoped_lock lock(mutex_);
- if (::io_uring_sqe* sqe = get_sqe())
- {
- ::io_uring_prep_nop(sqe);
- ::io_uring_sqe_set_data(sqe, this);
- }
- submit_sqes();
- }
- void io_uring_service::init_ring()
- {
- int result = ::io_uring_queue_init(ring_size, &ring_, 0);
- if (result < 0)
- {
- ring_.ring_fd = -1;
- asio::error_code ec(-result,
- asio::error::get_system_category());
- asio::detail::throw_error(ec, "io_uring_queue_init");
- }
- #if !defined(ASIO_HAS_IO_URING_AS_DEFAULT)
- event_fd_ = ::eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
- if (event_fd_ < 0)
- {
- asio::error_code ec(-result,
- asio::error::get_system_category());
- ::io_uring_queue_exit(&ring_);
- asio::detail::throw_error(ec, "eventfd");
- }
- result = ::io_uring_register_eventfd(&ring_, event_fd_);
- if (result < 0)
- {
- ::close(event_fd_);
- ::io_uring_queue_exit(&ring_);
- asio::error_code ec(-result,
- asio::error::get_system_category());
- asio::detail::throw_error(ec, "io_uring_queue_init");
- }
- #endif // !defined(ASIO_HAS_IO_URING_AS_DEFAULT)
- }
- #if !defined(ASIO_HAS_IO_URING_AS_DEFAULT)
- class io_uring_service::event_fd_read_op :
- public reactor_op
- {
- public:
- event_fd_read_op(io_uring_service* s)
- : reactor_op(asio::error_code(),
- &event_fd_read_op::do_perform, event_fd_read_op::do_complete),
- service_(s)
- {
- }
- static status do_perform(reactor_op* base)
- {
- event_fd_read_op* o(static_cast<event_fd_read_op*>(base));
- for (;;)
- {
- // Only perform one read. The kernel maintains an atomic counter.
- uint64_t counter(0);
- errno = 0;
- int bytes_read = ::read(o->service_->event_fd_,
- &counter, sizeof(uint64_t));
- if (bytes_read < 0 && errno == EINTR)
- continue;
- break;
- }
- op_queue<operation> ops;
- o->service_->run(0, ops);
- o->service_->scheduler_.post_deferred_completions(ops);
- return not_done;
- }
- static void do_complete(void* /*owner*/, operation* base,
- const asio::error_code& /*ec*/,
- std::size_t /*bytes_transferred*/)
- {
- event_fd_read_op* o(static_cast<event_fd_read_op*>(base));
- delete o;
- }
- private:
- io_uring_service* service_;
- };
- #endif // !defined(ASIO_HAS_IO_URING_AS_DEFAULT)
- void io_uring_service::register_with_reactor()
- {
- #if !defined(ASIO_HAS_IO_URING_AS_DEFAULT)
- reactor_.register_internal_descriptor(reactor::read_op,
- event_fd_, reactor_data_, new event_fd_read_op(this));
- #endif // !defined(ASIO_HAS_IO_URING_AS_DEFAULT)
- }
- io_uring_service::io_object* io_uring_service::allocate_io_object()
- {
- mutex::scoped_lock registration_lock(registration_mutex_);
- return registered_io_objects_.alloc(
- ASIO_CONCURRENCY_HINT_IS_LOCKING(
- REACTOR_IO, scheduler_.concurrency_hint()));
- }
- void io_uring_service::free_io_object(io_uring_service::io_object* io_obj)
- {
- mutex::scoped_lock registration_lock(registration_mutex_);
- registered_io_objects_.free(io_obj);
- }
- bool io_uring_service::do_cancel_ops(
- per_io_object_data& io_obj, op_queue<operation>& ops)
- {
- bool cancel_op = false;
- for (int i = 0; i < max_ops; ++i)
- {
- if (io_uring_operation* first_op = io_obj->queues_[i].op_queue_.front())
- {
- cancel_op = true;
- io_obj->queues_[i].op_queue_.pop();
- while (io_uring_operation* op = io_obj->queues_[i].op_queue_.front())
- {
- op->ec_ = asio::error::operation_aborted;
- io_obj->queues_[i].op_queue_.pop();
- ops.push(op);
- }
- io_obj->queues_[i].op_queue_.push(first_op);
- }
- }
- if (cancel_op)
- {
- mutex::scoped_lock lock(mutex_);
- for (int i = 0; i < max_ops; ++i)
- {
- if (!io_obj->queues_[i].op_queue_.empty()
- && !io_obj->queues_[i].cancel_requested_)
- {
- io_obj->queues_[i].cancel_requested_ = true;
- if (::io_uring_sqe* sqe = get_sqe())
- ::io_uring_prep_cancel(sqe, &io_obj->queues_[i], 0);
- }
- }
- submit_sqes();
- }
- return cancel_op;
- }
- void io_uring_service::do_add_timer_queue(timer_queue_base& queue)
- {
- mutex::scoped_lock lock(mutex_);
- timer_queues_.insert(&queue);
- }
- void io_uring_service::do_remove_timer_queue(timer_queue_base& queue)
- {
- mutex::scoped_lock lock(mutex_);
- timer_queues_.erase(&queue);
- }
- void io_uring_service::update_timeout()
- {
- if (::io_uring_sqe* sqe = get_sqe())
- {
- ::io_uring_prep_timeout_remove(sqe, reinterpret_cast<__u64>(&timeout_), 0);
- ::io_uring_sqe_set_data(sqe, &timer_queues_);
- }
- }
- __kernel_timespec io_uring_service::get_timeout() const
- {
- __kernel_timespec ts;
- long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000);
- ts.tv_sec = usec / 1000000;
- ts.tv_nsec = usec ? (usec % 1000000) * 1000 : 1;
- return ts;
- }
- ::io_uring_sqe* io_uring_service::get_sqe()
- {
- ::io_uring_sqe* sqe = ::io_uring_get_sqe(&ring_);
- if (!sqe)
- {
- submit_sqes();
- sqe = ::io_uring_get_sqe(&ring_);
- }
- if (sqe)
- {
- ::io_uring_sqe_set_data(sqe, 0);
- ++pending_sqes_;
- }
- return sqe;
- }
- void io_uring_service::submit_sqes()
- {
- if (pending_sqes_ != 0)
- {
- int result = ::io_uring_submit(&ring_);
- if (result > 0)
- {
- pending_sqes_ -= result;
- increment(outstanding_work_, result);
- }
- }
- }
- void io_uring_service::post_submit_sqes_op(mutex::scoped_lock& lock)
- {
- if (pending_sqes_ >= submit_batch_size)
- {
- submit_sqes();
- }
- else if (pending_sqes_ != 0 && !pending_submit_sqes_op_)
- {
- pending_submit_sqes_op_ = true;
- lock.unlock();
- scheduler_.post_immediate_completion(&submit_sqes_op_, false);
- }
- }
- void io_uring_service::push_submit_sqes_op(op_queue<operation>& ops)
- {
- if (pending_sqes_ != 0 && !pending_submit_sqes_op_)
- {
- pending_submit_sqes_op_ = true;
- ops.push(&submit_sqes_op_);
- scheduler_.compensating_work_started();
- }
- }
- io_uring_service::submit_sqes_op::submit_sqes_op(io_uring_service* s)
- : operation(&io_uring_service::submit_sqes_op::do_complete),
- service_(s)
- {
- }
- void io_uring_service::submit_sqes_op::do_complete(void* owner, operation* base,
- const asio::error_code& /*ec*/, std::size_t /*bytes_transferred*/)
- {
- if (owner)
- {
- submit_sqes_op* o = static_cast<submit_sqes_op*>(base);
- mutex::scoped_lock lock(o->service_->mutex_);
- o->service_->submit_sqes();
- if (o->service_->pending_sqes_ != 0)
- o->service_->scheduler_.post_immediate_completion(o, true);
- else
- o->service_->pending_submit_sqes_op_ = false;
- }
- }
- io_uring_service::io_queue::io_queue()
- : operation(&io_uring_service::io_queue::do_complete)
- {
- }
- struct io_uring_service::perform_io_cleanup_on_block_exit
- {
- explicit perform_io_cleanup_on_block_exit(io_uring_service* s)
- : service_(s), io_object_to_free_(0), first_op_(0)
- {
- }
- ~perform_io_cleanup_on_block_exit()
- {
- if (io_object_to_free_)
- {
- mutex::scoped_lock lock(service_->mutex_);
- service_->free_io_object(io_object_to_free_);
- }
- if (first_op_)
- {
- // Post the remaining completed operations for invocation.
- if (!ops_.empty())
- service_->scheduler_.post_deferred_completions(ops_);
- // A user-initiated operation has completed, but there's no need to
- // explicitly call work_finished() here. Instead, we'll take advantage of
- // the fact that the scheduler will call work_finished() once we return.
- }
- else
- {
- // No user-initiated operations have completed, so we need to compensate
- // for the work_finished() call that the scheduler will make once this
- // operation returns.
- service_->scheduler_.compensating_work_started();
- }
- }
- io_uring_service* service_;
- io_object* io_object_to_free_;
- op_queue<operation> ops_;
- operation* first_op_;
- };
- operation* io_uring_service::io_queue::perform_io(int result)
- {
- perform_io_cleanup_on_block_exit io_cleanup(io_object_->service_);
- mutex::scoped_lock io_object_lock(io_object_->mutex_);
- if (result != -ECANCELED || cancel_requested_)
- {
- if (io_uring_operation* op = op_queue_.front())
- {
- if (result < 0)
- {
- op->ec_.assign(-result, asio::error::get_system_category());
- op->bytes_transferred_ = 0;
- }
- else
- {
- op->ec_.assign(0, op->ec_.category());
- op->bytes_transferred_ = static_cast<std::size_t>(result);
- }
- }
- while (io_uring_operation* op = op_queue_.front())
- {
- if (op->perform(io_cleanup.ops_.empty()))
- {
- op_queue_.pop();
- io_cleanup.ops_.push(op);
- }
- else
- break;
- }
- }
- cancel_requested_ = false;
- if (!op_queue_.empty())
- {
- io_uring_service* service = io_object_->service_;
- mutex::scoped_lock lock(service->mutex_);
- if (::io_uring_sqe* sqe = service->get_sqe())
- {
- op_queue_.front()->prepare(sqe);
- ::io_uring_sqe_set_data(sqe, this);
- service->post_submit_sqes_op(lock);
- }
- else
- {
- lock.unlock();
- while (io_uring_operation* op = op_queue_.front())
- {
- op->ec_ = asio::error::no_buffer_space;
- op_queue_.pop();
- io_cleanup.ops_.push(op);
- }
- }
- }
- // The last operation to complete on a shut down object must free it.
- if (io_object_->shutdown_)
- {
- io_cleanup.io_object_to_free_ = io_object_;
- for (int i = 0; i < max_ops; ++i)
- if (!io_object_->queues_[i].op_queue_.empty())
- io_cleanup.io_object_to_free_ = 0;
- }
- // The first operation will be returned for completion now. The others will
- // be posted for later by the io_cleanup object's destructor.
- io_cleanup.first_op_ = io_cleanup.ops_.front();
- io_cleanup.ops_.pop();
- return io_cleanup.first_op_;
- }
- void io_uring_service::io_queue::do_complete(void* owner, operation* base,
- const asio::error_code& ec, std::size_t bytes_transferred)
- {
- if (owner)
- {
- io_queue* io_q = static_cast<io_queue*>(base);
- int result = static_cast<int>(bytes_transferred);
- if (operation* op = io_q->perform_io(result))
- {
- op->complete(owner, ec, 0);
- }
- }
- }
- io_uring_service::io_object::io_object(bool locking)
- : mutex_(locking)
- {
- }
- } // namespace detail
- } // namespace asio
- #include "asio/detail/pop_options.hpp"
- #endif // defined(ASIO_HAS_IO_URING)
- #endif // ASIO_DETAIL_IMPL_IO_URING_SERVICE_IPP
|