io_uring_service.ipp 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914
  1. //
  2. // detail/impl/io_uring_service.ipp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef ASIO_DETAIL_IMPL_IO_URING_SERVICE_IPP
  11. #define ASIO_DETAIL_IMPL_IO_URING_SERVICE_IPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include "asio/detail/config.hpp"
  16. #if defined(ASIO_HAS_IO_URING)
  17. #include <cstddef>
  18. #include <sys/eventfd.h>
  19. #include "asio/detail/io_uring_service.hpp"
  20. #include "asio/detail/reactor_op.hpp"
  21. #include "asio/detail/scheduler.hpp"
  22. #include "asio/detail/throw_error.hpp"
  23. #include "asio/error.hpp"
  24. #include "asio/detail/push_options.hpp"
  25. namespace asio {
  26. namespace detail {
  27. io_uring_service::io_uring_service(asio::execution_context& ctx)
  28. : execution_context_service_base<io_uring_service>(ctx),
  29. scheduler_(use_service<scheduler>(ctx)),
  30. mutex_(ASIO_CONCURRENCY_HINT_IS_LOCKING(
  31. REACTOR_REGISTRATION, scheduler_.concurrency_hint())),
  32. outstanding_work_(0),
  33. submit_sqes_op_(this),
  34. pending_sqes_(0),
  35. pending_submit_sqes_op_(false),
  36. shutdown_(false),
  37. timeout_(),
  38. registration_mutex_(mutex_.enabled()),
  39. reactor_(use_service<reactor>(ctx)),
  40. reactor_data_(),
  41. event_fd_(-1)
  42. {
  43. reactor_.init_task();
  44. init_ring();
  45. register_with_reactor();
  46. }
  47. io_uring_service::~io_uring_service()
  48. {
  49. if (ring_.ring_fd != -1)
  50. ::io_uring_queue_exit(&ring_);
  51. if (event_fd_ != -1)
  52. ::close(event_fd_);
  53. }
  54. void io_uring_service::shutdown()
  55. {
  56. mutex::scoped_lock lock(mutex_);
  57. shutdown_ = true;
  58. lock.unlock();
  59. op_queue<operation> ops;
  60. // Cancel all outstanding operations.
  61. while (io_object* io_obj = registered_io_objects_.first())
  62. {
  63. for (int i = 0; i < max_ops; ++i)
  64. {
  65. if (!io_obj->queues_[i].op_queue_.empty())
  66. {
  67. ops.push(io_obj->queues_[i].op_queue_);
  68. if (::io_uring_sqe* sqe = get_sqe())
  69. ::io_uring_prep_cancel(sqe, &io_obj->queues_[i], 0);
  70. }
  71. }
  72. io_obj->shutdown_ = true;
  73. registered_io_objects_.free(io_obj);
  74. }
  75. // Cancel the timeout operation.
  76. if (::io_uring_sqe* sqe = get_sqe())
  77. ::io_uring_prep_cancel(sqe, &timeout_, IOSQE_IO_DRAIN);
  78. submit_sqes();
  79. // Wait for all completions to come back.
  80. for (; outstanding_work_ > 0; --outstanding_work_)
  81. {
  82. ::io_uring_cqe* cqe = 0;
  83. if (::io_uring_wait_cqe(&ring_, &cqe) != 0)
  84. break;
  85. }
  86. timer_queues_.get_all_timers(ops);
  87. scheduler_.abandon_operations(ops);
  88. }
  89. void io_uring_service::notify_fork(
  90. asio::execution_context::fork_event fork_ev)
  91. {
  92. switch (fork_ev)
  93. {
  94. case asio::execution_context::fork_prepare:
  95. {
  96. // Cancel all outstanding operations. They will be restarted
  97. // after the fork completes.
  98. mutex::scoped_lock registration_lock(registration_mutex_);
  99. for (io_object* io_obj = registered_io_objects_.first();
  100. io_obj != 0; io_obj = io_obj->next_)
  101. {
  102. mutex::scoped_lock io_object_lock(io_obj->mutex_);
  103. for (int i = 0; i < max_ops; ++i)
  104. {
  105. if (!io_obj->queues_[i].op_queue_.empty()
  106. && !io_obj->queues_[i].cancel_requested_)
  107. {
  108. mutex::scoped_lock lock(mutex_);
  109. if (::io_uring_sqe* sqe = get_sqe())
  110. ::io_uring_prep_cancel(sqe, &io_obj->queues_[i], 0);
  111. }
  112. }
  113. }
  114. // Cancel the timeout operation.
  115. {
  116. mutex::scoped_lock lock(mutex_);
  117. if (::io_uring_sqe* sqe = get_sqe())
  118. ::io_uring_prep_cancel(sqe, &timeout_, IOSQE_IO_DRAIN);
  119. submit_sqes();
  120. }
  121. // Wait for all completions to come back, and post all completed I/O
  122. // queues to the scheduler. Note that some operations may have already
  123. // completed, or were explicitly cancelled. All others will be
  124. // automatically restarted.
  125. op_queue<operation> ops;
  126. for (; outstanding_work_ > 0; --outstanding_work_)
  127. {
  128. ::io_uring_cqe* cqe = 0;
  129. if (::io_uring_wait_cqe(&ring_, &cqe) != 0)
  130. break;
  131. if (void* ptr = ::io_uring_cqe_get_data(cqe))
  132. {
  133. if (ptr != this && ptr != &timer_queues_ && ptr != &timeout_)
  134. {
  135. io_queue* io_q = static_cast<io_queue*>(ptr);
  136. io_q->set_result(cqe->res);
  137. ops.push(io_q);
  138. }
  139. }
  140. }
  141. scheduler_.post_deferred_completions(ops);
  142. // Restart and eventfd operation.
  143. register_with_reactor();
  144. }
  145. break;
  146. case asio::execution_context::fork_parent:
  147. // Restart the timeout and eventfd operations.
  148. update_timeout();
  149. register_with_reactor();
  150. break;
  151. case asio::execution_context::fork_child:
  152. {
  153. // The child process gets a new io_uring instance.
  154. ::io_uring_queue_exit(&ring_);
  155. init_ring();
  156. register_with_reactor();
  157. }
  158. break;
  159. default:
  160. break;
  161. }
  162. }
  163. void io_uring_service::init_task()
  164. {
  165. scheduler_.init_task();
  166. }
  167. void io_uring_service::register_io_object(
  168. io_uring_service::per_io_object_data& io_obj)
  169. {
  170. io_obj = allocate_io_object();
  171. mutex::scoped_lock io_object_lock(io_obj->mutex_);
  172. io_obj->service_ = this;
  173. io_obj->shutdown_ = false;
  174. for (int i = 0; i < max_ops; ++i)
  175. {
  176. io_obj->queues_[i].io_object_ = io_obj;
  177. io_obj->queues_[i].cancel_requested_ = false;
  178. }
  179. }
  180. void io_uring_service::register_internal_io_object(
  181. io_uring_service::per_io_object_data& io_obj,
  182. int op_type, io_uring_operation* op)
  183. {
  184. io_obj = allocate_io_object();
  185. mutex::scoped_lock io_object_lock(io_obj->mutex_);
  186. io_obj->service_ = this;
  187. io_obj->shutdown_ = false;
  188. for (int i = 0; i < max_ops; ++i)
  189. {
  190. io_obj->queues_[i].io_object_ = io_obj;
  191. io_obj->queues_[i].cancel_requested_ = false;
  192. }
  193. io_obj->queues_[op_type].op_queue_.push(op);
  194. io_object_lock.unlock();
  195. mutex::scoped_lock lock(mutex_);
  196. if (::io_uring_sqe* sqe = get_sqe())
  197. {
  198. op->prepare(sqe);
  199. ::io_uring_sqe_set_data(sqe, &io_obj->queues_[op_type]);
  200. post_submit_sqes_op(lock);
  201. }
  202. else
  203. {
  204. asio::error_code ec(ENOBUFS,
  205. asio::error::get_system_category());
  206. asio::detail::throw_error(ec, "io_uring_get_sqe");
  207. }
  208. }
  209. void io_uring_service::register_buffers(const ::iovec* v, unsigned n)
  210. {
  211. int result = ::io_uring_register_buffers(&ring_, v, n);
  212. if (result < 0)
  213. {
  214. asio::error_code ec(-result,
  215. asio::error::get_system_category());
  216. asio::detail::throw_error(ec, "io_uring_register_buffers");
  217. }
  218. }
  219. void io_uring_service::unregister_buffers()
  220. {
  221. (void)::io_uring_unregister_buffers(&ring_);
  222. }
  223. void io_uring_service::start_op(int op_type,
  224. io_uring_service::per_io_object_data& io_obj,
  225. io_uring_operation* op, bool is_continuation)
  226. {
  227. if (!io_obj)
  228. {
  229. op->ec_ = asio::error::bad_descriptor;
  230. post_immediate_completion(op, is_continuation);
  231. return;
  232. }
  233. mutex::scoped_lock io_object_lock(io_obj->mutex_);
  234. if (io_obj->shutdown_)
  235. {
  236. io_object_lock.unlock();
  237. post_immediate_completion(op, is_continuation);
  238. return;
  239. }
  240. if (io_obj->queues_[op_type].op_queue_.empty())
  241. {
  242. if (op->perform(false))
  243. {
  244. io_object_lock.unlock();
  245. scheduler_.post_immediate_completion(op, is_continuation);
  246. }
  247. else
  248. {
  249. io_obj->queues_[op_type].op_queue_.push(op);
  250. io_object_lock.unlock();
  251. mutex::scoped_lock lock(mutex_);
  252. if (::io_uring_sqe* sqe = get_sqe())
  253. {
  254. op->prepare(sqe);
  255. ::io_uring_sqe_set_data(sqe, &io_obj->queues_[op_type]);
  256. scheduler_.work_started();
  257. post_submit_sqes_op(lock);
  258. }
  259. else
  260. {
  261. lock.unlock();
  262. io_obj->queues_[op_type].set_result(-ENOBUFS);
  263. post_immediate_completion(&io_obj->queues_[op_type], is_continuation);
  264. }
  265. }
  266. }
  267. else
  268. {
  269. io_obj->queues_[op_type].op_queue_.push(op);
  270. scheduler_.work_started();
  271. }
  272. }
  273. void io_uring_service::cancel_ops(io_uring_service::per_io_object_data& io_obj)
  274. {
  275. if (!io_obj)
  276. return;
  277. mutex::scoped_lock io_object_lock(io_obj->mutex_);
  278. op_queue<operation> ops;
  279. do_cancel_ops(io_obj, ops);
  280. io_object_lock.unlock();
  281. scheduler_.post_deferred_completions(ops);
  282. }
  283. void io_uring_service::cancel_ops_by_key(
  284. io_uring_service::per_io_object_data& io_obj,
  285. int op_type, void* cancellation_key)
  286. {
  287. if (!io_obj)
  288. return;
  289. mutex::scoped_lock io_object_lock(io_obj->mutex_);
  290. bool first = true;
  291. op_queue<operation> ops;
  292. op_queue<io_uring_operation> other_ops;
  293. while (io_uring_operation* op = io_obj->queues_[op_type].op_queue_.front())
  294. {
  295. io_obj->queues_[op_type].op_queue_.pop();
  296. if (op->cancellation_key_ == cancellation_key)
  297. {
  298. if (first)
  299. {
  300. other_ops.push(op);
  301. if (!io_obj->queues_[op_type].cancel_requested_)
  302. {
  303. io_obj->queues_[op_type].cancel_requested_ = true;
  304. mutex::scoped_lock lock(mutex_);
  305. if (::io_uring_sqe* sqe = get_sqe())
  306. {
  307. ::io_uring_prep_cancel(sqe, &io_obj->queues_[op_type], 0);
  308. submit_sqes();
  309. }
  310. }
  311. }
  312. else
  313. {
  314. op->ec_ = asio::error::operation_aborted;
  315. ops.push(op);
  316. }
  317. }
  318. else
  319. other_ops.push(op);
  320. first = false;
  321. }
  322. io_obj->queues_[op_type].op_queue_.push(other_ops);
  323. io_object_lock.unlock();
  324. scheduler_.post_deferred_completions(ops);
  325. }
  326. void io_uring_service::deregister_io_object(
  327. io_uring_service::per_io_object_data& io_obj)
  328. {
  329. if (!io_obj)
  330. return;
  331. mutex::scoped_lock io_object_lock(io_obj->mutex_);
  332. if (!io_obj->shutdown_)
  333. {
  334. op_queue<operation> ops;
  335. bool pending_cancelled_ops = do_cancel_ops(io_obj, ops);
  336. io_obj->shutdown_ = true;
  337. io_object_lock.unlock();
  338. scheduler_.post_deferred_completions(ops);
  339. if (pending_cancelled_ops)
  340. {
  341. // There are still pending operations. Prevent cleanup_io_object from
  342. // freeing the I/O object and let the last operation to complete free it.
  343. io_obj = 0;
  344. }
  345. else
  346. {
  347. // Leave io_obj set so that it will be freed by the subsequent call to
  348. // cleanup_io_object.
  349. }
  350. }
  351. else
  352. {
  353. // We are shutting down, so prevent cleanup_io_object from freeing
  354. // the I/O object and let the destructor free it instead.
  355. io_obj = 0;
  356. }
  357. }
  358. void io_uring_service::cleanup_io_object(
  359. io_uring_service::per_io_object_data& io_obj)
  360. {
  361. if (io_obj)
  362. {
  363. free_io_object(io_obj);
  364. io_obj = 0;
  365. }
  366. }
  367. void io_uring_service::run(long usec, op_queue<operation>& ops)
  368. {
  369. __kernel_timespec ts;
  370. int local_ops = 0;
  371. if (usec > 0)
  372. {
  373. ts.tv_sec = usec / 1000000;
  374. ts.tv_nsec = (usec % 1000000) * 1000;
  375. mutex::scoped_lock lock(mutex_);
  376. if (::io_uring_sqe* sqe = get_sqe())
  377. {
  378. ++local_ops;
  379. ::io_uring_prep_timeout(sqe, &ts, 0, 0);
  380. ::io_uring_sqe_set_data(sqe, &ts);
  381. submit_sqes();
  382. }
  383. }
  384. ::io_uring_cqe* cqe = 0;
  385. int result = (usec == 0)
  386. ? ::io_uring_peek_cqe(&ring_, &cqe)
  387. : ::io_uring_wait_cqe(&ring_, &cqe);
  388. if (local_ops > 0)
  389. {
  390. if (result != 0 || ::io_uring_cqe_get_data(cqe) != &ts)
  391. {
  392. mutex::scoped_lock lock(mutex_);
  393. if (::io_uring_sqe* sqe = get_sqe())
  394. {
  395. ++local_ops;
  396. ::io_uring_prep_timeout_remove(sqe, reinterpret_cast<__u64>(&ts), 0);
  397. ::io_uring_sqe_set_data(sqe, &ts);
  398. submit_sqes();
  399. }
  400. }
  401. }
  402. bool check_timers = false;
  403. int count = 0;
  404. while (result == 0 || local_ops > 0)
  405. {
  406. if (result == 0)
  407. {
  408. if (void* ptr = ::io_uring_cqe_get_data(cqe))
  409. {
  410. if (ptr == this)
  411. {
  412. // The io_uring service was interrupted.
  413. }
  414. else if (ptr == &timer_queues_)
  415. {
  416. check_timers = true;
  417. }
  418. else if (ptr == &timeout_)
  419. {
  420. check_timers = true;
  421. timeout_.tv_sec = 0;
  422. timeout_.tv_nsec = 0;
  423. }
  424. else if (ptr == &ts)
  425. {
  426. --local_ops;
  427. }
  428. else
  429. {
  430. io_queue* io_q = static_cast<io_queue*>(ptr);
  431. io_q->set_result(cqe->res);
  432. ops.push(io_q);
  433. }
  434. }
  435. ::io_uring_cqe_seen(&ring_, cqe);
  436. ++count;
  437. }
  438. result = (count < complete_batch_size || local_ops > 0)
  439. ? ::io_uring_peek_cqe(&ring_, &cqe) : -EAGAIN;
  440. }
  441. decrement(outstanding_work_, count);
  442. if (check_timers)
  443. {
  444. mutex::scoped_lock lock(mutex_);
  445. timer_queues_.get_ready_timers(ops);
  446. if (timeout_.tv_sec == 0 && timeout_.tv_nsec == 0)
  447. {
  448. timeout_ = get_timeout();
  449. if (::io_uring_sqe* sqe = get_sqe())
  450. {
  451. ::io_uring_prep_timeout(sqe, &timeout_, 0, 0);
  452. ::io_uring_sqe_set_data(sqe, &timeout_);
  453. push_submit_sqes_op(ops);
  454. }
  455. }
  456. }
  457. }
  458. void io_uring_service::interrupt()
  459. {
  460. mutex::scoped_lock lock(mutex_);
  461. if (::io_uring_sqe* sqe = get_sqe())
  462. {
  463. ::io_uring_prep_nop(sqe);
  464. ::io_uring_sqe_set_data(sqe, this);
  465. }
  466. submit_sqes();
  467. }
  468. void io_uring_service::init_ring()
  469. {
  470. int result = ::io_uring_queue_init(ring_size, &ring_, 0);
  471. if (result < 0)
  472. {
  473. ring_.ring_fd = -1;
  474. asio::error_code ec(-result,
  475. asio::error::get_system_category());
  476. asio::detail::throw_error(ec, "io_uring_queue_init");
  477. }
  478. #if !defined(ASIO_HAS_IO_URING_AS_DEFAULT)
  479. event_fd_ = ::eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
  480. if (event_fd_ < 0)
  481. {
  482. asio::error_code ec(-result,
  483. asio::error::get_system_category());
  484. ::io_uring_queue_exit(&ring_);
  485. asio::detail::throw_error(ec, "eventfd");
  486. }
  487. result = ::io_uring_register_eventfd(&ring_, event_fd_);
  488. if (result < 0)
  489. {
  490. ::close(event_fd_);
  491. ::io_uring_queue_exit(&ring_);
  492. asio::error_code ec(-result,
  493. asio::error::get_system_category());
  494. asio::detail::throw_error(ec, "io_uring_queue_init");
  495. }
  496. #endif // !defined(ASIO_HAS_IO_URING_AS_DEFAULT)
  497. }
  498. #if !defined(ASIO_HAS_IO_URING_AS_DEFAULT)
  499. class io_uring_service::event_fd_read_op :
  500. public reactor_op
  501. {
  502. public:
  503. event_fd_read_op(io_uring_service* s)
  504. : reactor_op(asio::error_code(),
  505. &event_fd_read_op::do_perform, event_fd_read_op::do_complete),
  506. service_(s)
  507. {
  508. }
  509. static status do_perform(reactor_op* base)
  510. {
  511. event_fd_read_op* o(static_cast<event_fd_read_op*>(base));
  512. for (;;)
  513. {
  514. // Only perform one read. The kernel maintains an atomic counter.
  515. uint64_t counter(0);
  516. errno = 0;
  517. int bytes_read = ::read(o->service_->event_fd_,
  518. &counter, sizeof(uint64_t));
  519. if (bytes_read < 0 && errno == EINTR)
  520. continue;
  521. break;
  522. }
  523. op_queue<operation> ops;
  524. o->service_->run(0, ops);
  525. o->service_->scheduler_.post_deferred_completions(ops);
  526. return not_done;
  527. }
  528. static void do_complete(void* /*owner*/, operation* base,
  529. const asio::error_code& /*ec*/,
  530. std::size_t /*bytes_transferred*/)
  531. {
  532. event_fd_read_op* o(static_cast<event_fd_read_op*>(base));
  533. delete o;
  534. }
  535. private:
  536. io_uring_service* service_;
  537. };
  538. #endif // !defined(ASIO_HAS_IO_URING_AS_DEFAULT)
  539. void io_uring_service::register_with_reactor()
  540. {
  541. #if !defined(ASIO_HAS_IO_URING_AS_DEFAULT)
  542. reactor_.register_internal_descriptor(reactor::read_op,
  543. event_fd_, reactor_data_, new event_fd_read_op(this));
  544. #endif // !defined(ASIO_HAS_IO_URING_AS_DEFAULT)
  545. }
  546. io_uring_service::io_object* io_uring_service::allocate_io_object()
  547. {
  548. mutex::scoped_lock registration_lock(registration_mutex_);
  549. return registered_io_objects_.alloc(
  550. ASIO_CONCURRENCY_HINT_IS_LOCKING(
  551. REACTOR_IO, scheduler_.concurrency_hint()));
  552. }
  553. void io_uring_service::free_io_object(io_uring_service::io_object* io_obj)
  554. {
  555. mutex::scoped_lock registration_lock(registration_mutex_);
  556. registered_io_objects_.free(io_obj);
  557. }
  558. bool io_uring_service::do_cancel_ops(
  559. per_io_object_data& io_obj, op_queue<operation>& ops)
  560. {
  561. bool cancel_op = false;
  562. for (int i = 0; i < max_ops; ++i)
  563. {
  564. if (io_uring_operation* first_op = io_obj->queues_[i].op_queue_.front())
  565. {
  566. cancel_op = true;
  567. io_obj->queues_[i].op_queue_.pop();
  568. while (io_uring_operation* op = io_obj->queues_[i].op_queue_.front())
  569. {
  570. op->ec_ = asio::error::operation_aborted;
  571. io_obj->queues_[i].op_queue_.pop();
  572. ops.push(op);
  573. }
  574. io_obj->queues_[i].op_queue_.push(first_op);
  575. }
  576. }
  577. if (cancel_op)
  578. {
  579. mutex::scoped_lock lock(mutex_);
  580. for (int i = 0; i < max_ops; ++i)
  581. {
  582. if (!io_obj->queues_[i].op_queue_.empty()
  583. && !io_obj->queues_[i].cancel_requested_)
  584. {
  585. io_obj->queues_[i].cancel_requested_ = true;
  586. if (::io_uring_sqe* sqe = get_sqe())
  587. ::io_uring_prep_cancel(sqe, &io_obj->queues_[i], 0);
  588. }
  589. }
  590. submit_sqes();
  591. }
  592. return cancel_op;
  593. }
  594. void io_uring_service::do_add_timer_queue(timer_queue_base& queue)
  595. {
  596. mutex::scoped_lock lock(mutex_);
  597. timer_queues_.insert(&queue);
  598. }
  599. void io_uring_service::do_remove_timer_queue(timer_queue_base& queue)
  600. {
  601. mutex::scoped_lock lock(mutex_);
  602. timer_queues_.erase(&queue);
  603. }
  604. void io_uring_service::update_timeout()
  605. {
  606. if (::io_uring_sqe* sqe = get_sqe())
  607. {
  608. ::io_uring_prep_timeout_remove(sqe, reinterpret_cast<__u64>(&timeout_), 0);
  609. ::io_uring_sqe_set_data(sqe, &timer_queues_);
  610. }
  611. }
  612. __kernel_timespec io_uring_service::get_timeout() const
  613. {
  614. __kernel_timespec ts;
  615. long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000);
  616. ts.tv_sec = usec / 1000000;
  617. ts.tv_nsec = usec ? (usec % 1000000) * 1000 : 1;
  618. return ts;
  619. }
  620. ::io_uring_sqe* io_uring_service::get_sqe()
  621. {
  622. ::io_uring_sqe* sqe = ::io_uring_get_sqe(&ring_);
  623. if (!sqe)
  624. {
  625. submit_sqes();
  626. sqe = ::io_uring_get_sqe(&ring_);
  627. }
  628. if (sqe)
  629. {
  630. ::io_uring_sqe_set_data(sqe, 0);
  631. ++pending_sqes_;
  632. }
  633. return sqe;
  634. }
  635. void io_uring_service::submit_sqes()
  636. {
  637. if (pending_sqes_ != 0)
  638. {
  639. int result = ::io_uring_submit(&ring_);
  640. if (result > 0)
  641. {
  642. pending_sqes_ -= result;
  643. increment(outstanding_work_, result);
  644. }
  645. }
  646. }
  647. void io_uring_service::post_submit_sqes_op(mutex::scoped_lock& lock)
  648. {
  649. if (pending_sqes_ >= submit_batch_size)
  650. {
  651. submit_sqes();
  652. }
  653. else if (pending_sqes_ != 0 && !pending_submit_sqes_op_)
  654. {
  655. pending_submit_sqes_op_ = true;
  656. lock.unlock();
  657. scheduler_.post_immediate_completion(&submit_sqes_op_, false);
  658. }
  659. }
  660. void io_uring_service::push_submit_sqes_op(op_queue<operation>& ops)
  661. {
  662. if (pending_sqes_ != 0 && !pending_submit_sqes_op_)
  663. {
  664. pending_submit_sqes_op_ = true;
  665. ops.push(&submit_sqes_op_);
  666. scheduler_.compensating_work_started();
  667. }
  668. }
  669. io_uring_service::submit_sqes_op::submit_sqes_op(io_uring_service* s)
  670. : operation(&io_uring_service::submit_sqes_op::do_complete),
  671. service_(s)
  672. {
  673. }
  674. void io_uring_service::submit_sqes_op::do_complete(void* owner, operation* base,
  675. const asio::error_code& /*ec*/, std::size_t /*bytes_transferred*/)
  676. {
  677. if (owner)
  678. {
  679. submit_sqes_op* o = static_cast<submit_sqes_op*>(base);
  680. mutex::scoped_lock lock(o->service_->mutex_);
  681. o->service_->submit_sqes();
  682. if (o->service_->pending_sqes_ != 0)
  683. o->service_->scheduler_.post_immediate_completion(o, true);
  684. else
  685. o->service_->pending_submit_sqes_op_ = false;
  686. }
  687. }
  688. io_uring_service::io_queue::io_queue()
  689. : operation(&io_uring_service::io_queue::do_complete)
  690. {
  691. }
  692. struct io_uring_service::perform_io_cleanup_on_block_exit
  693. {
  694. explicit perform_io_cleanup_on_block_exit(io_uring_service* s)
  695. : service_(s), io_object_to_free_(0), first_op_(0)
  696. {
  697. }
  698. ~perform_io_cleanup_on_block_exit()
  699. {
  700. if (io_object_to_free_)
  701. {
  702. mutex::scoped_lock lock(service_->mutex_);
  703. service_->free_io_object(io_object_to_free_);
  704. }
  705. if (first_op_)
  706. {
  707. // Post the remaining completed operations for invocation.
  708. if (!ops_.empty())
  709. service_->scheduler_.post_deferred_completions(ops_);
  710. // A user-initiated operation has completed, but there's no need to
  711. // explicitly call work_finished() here. Instead, we'll take advantage of
  712. // the fact that the scheduler will call work_finished() once we return.
  713. }
  714. else
  715. {
  716. // No user-initiated operations have completed, so we need to compensate
  717. // for the work_finished() call that the scheduler will make once this
  718. // operation returns.
  719. service_->scheduler_.compensating_work_started();
  720. }
  721. }
  722. io_uring_service* service_;
  723. io_object* io_object_to_free_;
  724. op_queue<operation> ops_;
  725. operation* first_op_;
  726. };
  727. operation* io_uring_service::io_queue::perform_io(int result)
  728. {
  729. perform_io_cleanup_on_block_exit io_cleanup(io_object_->service_);
  730. mutex::scoped_lock io_object_lock(io_object_->mutex_);
  731. if (result != -ECANCELED || cancel_requested_)
  732. {
  733. if (io_uring_operation* op = op_queue_.front())
  734. {
  735. if (result < 0)
  736. {
  737. op->ec_.assign(-result, asio::error::get_system_category());
  738. op->bytes_transferred_ = 0;
  739. }
  740. else
  741. {
  742. op->ec_.assign(0, op->ec_.category());
  743. op->bytes_transferred_ = static_cast<std::size_t>(result);
  744. }
  745. }
  746. while (io_uring_operation* op = op_queue_.front())
  747. {
  748. if (op->perform(io_cleanup.ops_.empty()))
  749. {
  750. op_queue_.pop();
  751. io_cleanup.ops_.push(op);
  752. }
  753. else
  754. break;
  755. }
  756. }
  757. cancel_requested_ = false;
  758. if (!op_queue_.empty())
  759. {
  760. io_uring_service* service = io_object_->service_;
  761. mutex::scoped_lock lock(service->mutex_);
  762. if (::io_uring_sqe* sqe = service->get_sqe())
  763. {
  764. op_queue_.front()->prepare(sqe);
  765. ::io_uring_sqe_set_data(sqe, this);
  766. service->post_submit_sqes_op(lock);
  767. }
  768. else
  769. {
  770. lock.unlock();
  771. while (io_uring_operation* op = op_queue_.front())
  772. {
  773. op->ec_ = asio::error::no_buffer_space;
  774. op_queue_.pop();
  775. io_cleanup.ops_.push(op);
  776. }
  777. }
  778. }
  779. // The last operation to complete on a shut down object must free it.
  780. if (io_object_->shutdown_)
  781. {
  782. io_cleanup.io_object_to_free_ = io_object_;
  783. for (int i = 0; i < max_ops; ++i)
  784. if (!io_object_->queues_[i].op_queue_.empty())
  785. io_cleanup.io_object_to_free_ = 0;
  786. }
  787. // The first operation will be returned for completion now. The others will
  788. // be posted for later by the io_cleanup object's destructor.
  789. io_cleanup.first_op_ = io_cleanup.ops_.front();
  790. io_cleanup.ops_.pop();
  791. return io_cleanup.first_op_;
  792. }
  793. void io_uring_service::io_queue::do_complete(void* owner, operation* base,
  794. const asio::error_code& ec, std::size_t bytes_transferred)
  795. {
  796. if (owner)
  797. {
  798. io_queue* io_q = static_cast<io_queue*>(base);
  799. int result = static_cast<int>(bytes_transferred);
  800. if (operation* op = io_q->perform_io(result))
  801. {
  802. op->complete(owner, ec, 0);
  803. }
  804. }
  805. }
  806. io_uring_service::io_object::io_object(bool locking)
  807. : mutex_(locking)
  808. {
  809. }
  810. } // namespace detail
  811. } // namespace asio
  812. #include "asio/detail/pop_options.hpp"
  813. #endif // defined(ASIO_HAS_IO_URING)
  814. #endif // ASIO_DETAIL_IMPL_IO_URING_SERVICE_IPP