io_uring_service.ipp 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916
  1. //
  2. // detail/impl/io_uring_service.ipp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2024 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef BOOST_ASIO_DETAIL_IMPL_IO_URING_SERVICE_IPP
  11. #define BOOST_ASIO_DETAIL_IMPL_IO_URING_SERVICE_IPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <boost/asio/detail/config.hpp>
  16. #if defined(BOOST_ASIO_HAS_IO_URING)
  17. #include <cstddef>
  18. #include <sys/eventfd.h>
  19. #include <boost/asio/detail/io_uring_service.hpp>
  20. #include <boost/asio/detail/reactor_op.hpp>
  21. #include <boost/asio/detail/scheduler.hpp>
  22. #include <boost/asio/detail/throw_error.hpp>
  23. #include <boost/asio/error.hpp>
  24. #include <boost/asio/detail/push_options.hpp>
  25. namespace boost {
  26. namespace asio {
  27. namespace detail {
  28. io_uring_service::io_uring_service(boost::asio::execution_context& ctx)
  29. : execution_context_service_base<io_uring_service>(ctx),
  30. scheduler_(use_service<scheduler>(ctx)),
  31. mutex_(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
  32. REACTOR_REGISTRATION, scheduler_.concurrency_hint())),
  33. outstanding_work_(0),
  34. submit_sqes_op_(this),
  35. pending_sqes_(0),
  36. pending_submit_sqes_op_(false),
  37. shutdown_(false),
  38. timeout_(),
  39. registration_mutex_(mutex_.enabled()),
  40. reactor_(use_service<reactor>(ctx)),
  41. reactor_data_(),
  42. event_fd_(-1)
  43. {
  44. reactor_.init_task();
  45. init_ring();
  46. register_with_reactor();
  47. }
  48. io_uring_service::~io_uring_service()
  49. {
  50. if (ring_.ring_fd != -1)
  51. ::io_uring_queue_exit(&ring_);
  52. if (event_fd_ != -1)
  53. ::close(event_fd_);
  54. }
  55. void io_uring_service::shutdown()
  56. {
  57. mutex::scoped_lock lock(mutex_);
  58. shutdown_ = true;
  59. lock.unlock();
  60. op_queue<operation> ops;
  61. // Cancel all outstanding operations.
  62. while (io_object* io_obj = registered_io_objects_.first())
  63. {
  64. for (int i = 0; i < max_ops; ++i)
  65. {
  66. if (!io_obj->queues_[i].op_queue_.empty())
  67. {
  68. ops.push(io_obj->queues_[i].op_queue_);
  69. if (::io_uring_sqe* sqe = get_sqe())
  70. ::io_uring_prep_cancel(sqe, &io_obj->queues_[i], 0);
  71. }
  72. }
  73. io_obj->shutdown_ = true;
  74. registered_io_objects_.free(io_obj);
  75. }
  76. // Cancel the timeout operation.
  77. if (::io_uring_sqe* sqe = get_sqe())
  78. ::io_uring_prep_cancel(sqe, &timeout_, IOSQE_IO_DRAIN);
  79. submit_sqes();
  80. // Wait for all completions to come back.
  81. for (; outstanding_work_ > 0; --outstanding_work_)
  82. {
  83. ::io_uring_cqe* cqe = 0;
  84. if (::io_uring_wait_cqe(&ring_, &cqe) != 0)
  85. break;
  86. }
  87. timer_queues_.get_all_timers(ops);
  88. scheduler_.abandon_operations(ops);
  89. }
  90. void io_uring_service::notify_fork(
  91. boost::asio::execution_context::fork_event fork_ev)
  92. {
  93. switch (fork_ev)
  94. {
  95. case boost::asio::execution_context::fork_prepare:
  96. {
  97. // Cancel all outstanding operations. They will be restarted
  98. // after the fork completes.
  99. mutex::scoped_lock registration_lock(registration_mutex_);
  100. for (io_object* io_obj = registered_io_objects_.first();
  101. io_obj != 0; io_obj = io_obj->next_)
  102. {
  103. mutex::scoped_lock io_object_lock(io_obj->mutex_);
  104. for (int i = 0; i < max_ops; ++i)
  105. {
  106. if (!io_obj->queues_[i].op_queue_.empty()
  107. && !io_obj->queues_[i].cancel_requested_)
  108. {
  109. mutex::scoped_lock lock(mutex_);
  110. if (::io_uring_sqe* sqe = get_sqe())
  111. ::io_uring_prep_cancel(sqe, &io_obj->queues_[i], 0);
  112. }
  113. }
  114. }
  115. // Cancel the timeout operation.
  116. {
  117. mutex::scoped_lock lock(mutex_);
  118. if (::io_uring_sqe* sqe = get_sqe())
  119. ::io_uring_prep_cancel(sqe, &timeout_, IOSQE_IO_DRAIN);
  120. submit_sqes();
  121. }
  122. // Wait for all completions to come back, and post all completed I/O
  123. // queues to the scheduler. Note that some operations may have already
  124. // completed, or were explicitly cancelled. All others will be
  125. // automatically restarted.
  126. op_queue<operation> ops;
  127. for (; outstanding_work_ > 0; --outstanding_work_)
  128. {
  129. ::io_uring_cqe* cqe = 0;
  130. if (::io_uring_wait_cqe(&ring_, &cqe) != 0)
  131. break;
  132. if (void* ptr = ::io_uring_cqe_get_data(cqe))
  133. {
  134. if (ptr != this && ptr != &timer_queues_ && ptr != &timeout_)
  135. {
  136. io_queue* io_q = static_cast<io_queue*>(ptr);
  137. io_q->set_result(cqe->res);
  138. ops.push(io_q);
  139. }
  140. }
  141. }
  142. scheduler_.post_deferred_completions(ops);
  143. // Restart and eventfd operation.
  144. register_with_reactor();
  145. }
  146. break;
  147. case boost::asio::execution_context::fork_parent:
  148. // Restart the timeout and eventfd operations.
  149. update_timeout();
  150. register_with_reactor();
  151. break;
  152. case boost::asio::execution_context::fork_child:
  153. {
  154. // The child process gets a new io_uring instance.
  155. ::io_uring_queue_exit(&ring_);
  156. init_ring();
  157. register_with_reactor();
  158. }
  159. break;
  160. default:
  161. break;
  162. }
  163. }
  164. void io_uring_service::init_task()
  165. {
  166. scheduler_.init_task();
  167. }
  168. void io_uring_service::register_io_object(
  169. io_uring_service::per_io_object_data& io_obj)
  170. {
  171. io_obj = allocate_io_object();
  172. mutex::scoped_lock io_object_lock(io_obj->mutex_);
  173. io_obj->service_ = this;
  174. io_obj->shutdown_ = false;
  175. for (int i = 0; i < max_ops; ++i)
  176. {
  177. io_obj->queues_[i].io_object_ = io_obj;
  178. io_obj->queues_[i].cancel_requested_ = false;
  179. }
  180. }
  181. void io_uring_service::register_internal_io_object(
  182. io_uring_service::per_io_object_data& io_obj,
  183. int op_type, io_uring_operation* op)
  184. {
  185. io_obj = allocate_io_object();
  186. mutex::scoped_lock io_object_lock(io_obj->mutex_);
  187. io_obj->service_ = this;
  188. io_obj->shutdown_ = false;
  189. for (int i = 0; i < max_ops; ++i)
  190. {
  191. io_obj->queues_[i].io_object_ = io_obj;
  192. io_obj->queues_[i].cancel_requested_ = false;
  193. }
  194. io_obj->queues_[op_type].op_queue_.push(op);
  195. io_object_lock.unlock();
  196. mutex::scoped_lock lock(mutex_);
  197. if (::io_uring_sqe* sqe = get_sqe())
  198. {
  199. op->prepare(sqe);
  200. ::io_uring_sqe_set_data(sqe, &io_obj->queues_[op_type]);
  201. post_submit_sqes_op(lock);
  202. }
  203. else
  204. {
  205. boost::system::error_code ec(ENOBUFS,
  206. boost::asio::error::get_system_category());
  207. boost::asio::detail::throw_error(ec, "io_uring_get_sqe");
  208. }
  209. }
  210. void io_uring_service::register_buffers(const ::iovec* v, unsigned n)
  211. {
  212. int result = ::io_uring_register_buffers(&ring_, v, n);
  213. if (result < 0)
  214. {
  215. boost::system::error_code ec(-result,
  216. boost::asio::error::get_system_category());
  217. boost::asio::detail::throw_error(ec, "io_uring_register_buffers");
  218. }
  219. }
  220. void io_uring_service::unregister_buffers()
  221. {
  222. (void)::io_uring_unregister_buffers(&ring_);
  223. }
  224. void io_uring_service::start_op(int op_type,
  225. io_uring_service::per_io_object_data& io_obj,
  226. io_uring_operation* op, bool is_continuation)
  227. {
  228. if (!io_obj)
  229. {
  230. op->ec_ = boost::asio::error::bad_descriptor;
  231. post_immediate_completion(op, is_continuation);
  232. return;
  233. }
  234. mutex::scoped_lock io_object_lock(io_obj->mutex_);
  235. if (io_obj->shutdown_)
  236. {
  237. io_object_lock.unlock();
  238. post_immediate_completion(op, is_continuation);
  239. return;
  240. }
  241. if (io_obj->queues_[op_type].op_queue_.empty())
  242. {
  243. if (op->perform(false))
  244. {
  245. io_object_lock.unlock();
  246. scheduler_.post_immediate_completion(op, is_continuation);
  247. }
  248. else
  249. {
  250. io_obj->queues_[op_type].op_queue_.push(op);
  251. io_object_lock.unlock();
  252. mutex::scoped_lock lock(mutex_);
  253. if (::io_uring_sqe* sqe = get_sqe())
  254. {
  255. op->prepare(sqe);
  256. ::io_uring_sqe_set_data(sqe, &io_obj->queues_[op_type]);
  257. scheduler_.work_started();
  258. post_submit_sqes_op(lock);
  259. }
  260. else
  261. {
  262. lock.unlock();
  263. io_obj->queues_[op_type].set_result(-ENOBUFS);
  264. post_immediate_completion(&io_obj->queues_[op_type], is_continuation);
  265. }
  266. }
  267. }
  268. else
  269. {
  270. io_obj->queues_[op_type].op_queue_.push(op);
  271. scheduler_.work_started();
  272. }
  273. }
  274. void io_uring_service::cancel_ops(io_uring_service::per_io_object_data& io_obj)
  275. {
  276. if (!io_obj)
  277. return;
  278. mutex::scoped_lock io_object_lock(io_obj->mutex_);
  279. op_queue<operation> ops;
  280. do_cancel_ops(io_obj, ops);
  281. io_object_lock.unlock();
  282. scheduler_.post_deferred_completions(ops);
  283. }
  284. void io_uring_service::cancel_ops_by_key(
  285. io_uring_service::per_io_object_data& io_obj,
  286. int op_type, void* cancellation_key)
  287. {
  288. if (!io_obj)
  289. return;
  290. mutex::scoped_lock io_object_lock(io_obj->mutex_);
  291. bool first = true;
  292. op_queue<operation> ops;
  293. op_queue<io_uring_operation> other_ops;
  294. while (io_uring_operation* op = io_obj->queues_[op_type].op_queue_.front())
  295. {
  296. io_obj->queues_[op_type].op_queue_.pop();
  297. if (op->cancellation_key_ == cancellation_key)
  298. {
  299. if (first)
  300. {
  301. other_ops.push(op);
  302. if (!io_obj->queues_[op_type].cancel_requested_)
  303. {
  304. io_obj->queues_[op_type].cancel_requested_ = true;
  305. mutex::scoped_lock lock(mutex_);
  306. if (::io_uring_sqe* sqe = get_sqe())
  307. {
  308. ::io_uring_prep_cancel(sqe, &io_obj->queues_[op_type], 0);
  309. submit_sqes();
  310. }
  311. }
  312. }
  313. else
  314. {
  315. op->ec_ = boost::asio::error::operation_aborted;
  316. ops.push(op);
  317. }
  318. }
  319. else
  320. other_ops.push(op);
  321. first = false;
  322. }
  323. io_obj->queues_[op_type].op_queue_.push(other_ops);
  324. io_object_lock.unlock();
  325. scheduler_.post_deferred_completions(ops);
  326. }
  327. void io_uring_service::deregister_io_object(
  328. io_uring_service::per_io_object_data& io_obj)
  329. {
  330. if (!io_obj)
  331. return;
  332. mutex::scoped_lock io_object_lock(io_obj->mutex_);
  333. if (!io_obj->shutdown_)
  334. {
  335. op_queue<operation> ops;
  336. bool pending_cancelled_ops = do_cancel_ops(io_obj, ops);
  337. io_obj->shutdown_ = true;
  338. io_object_lock.unlock();
  339. scheduler_.post_deferred_completions(ops);
  340. if (pending_cancelled_ops)
  341. {
  342. // There are still pending operations. Prevent cleanup_io_object from
  343. // freeing the I/O object and let the last operation to complete free it.
  344. io_obj = 0;
  345. }
  346. else
  347. {
  348. // Leave io_obj set so that it will be freed by the subsequent call to
  349. // cleanup_io_object.
  350. }
  351. }
  352. else
  353. {
  354. // We are shutting down, so prevent cleanup_io_object from freeing
  355. // the I/O object and let the destructor free it instead.
  356. io_obj = 0;
  357. }
  358. }
  359. void io_uring_service::cleanup_io_object(
  360. io_uring_service::per_io_object_data& io_obj)
  361. {
  362. if (io_obj)
  363. {
  364. free_io_object(io_obj);
  365. io_obj = 0;
  366. }
  367. }
  368. void io_uring_service::run(long usec, op_queue<operation>& ops)
  369. {
  370. __kernel_timespec ts;
  371. int local_ops = 0;
  372. if (usec > 0)
  373. {
  374. ts.tv_sec = usec / 1000000;
  375. ts.tv_nsec = (usec % 1000000) * 1000;
  376. mutex::scoped_lock lock(mutex_);
  377. if (::io_uring_sqe* sqe = get_sqe())
  378. {
  379. ++local_ops;
  380. ::io_uring_prep_timeout(sqe, &ts, 0, 0);
  381. ::io_uring_sqe_set_data(sqe, &ts);
  382. submit_sqes();
  383. }
  384. }
  385. ::io_uring_cqe* cqe = 0;
  386. int result = (usec == 0)
  387. ? ::io_uring_peek_cqe(&ring_, &cqe)
  388. : ::io_uring_wait_cqe(&ring_, &cqe);
  389. if (local_ops > 0)
  390. {
  391. if (result != 0 || ::io_uring_cqe_get_data(cqe) != &ts)
  392. {
  393. mutex::scoped_lock lock(mutex_);
  394. if (::io_uring_sqe* sqe = get_sqe())
  395. {
  396. ++local_ops;
  397. ::io_uring_prep_timeout_remove(sqe, reinterpret_cast<__u64>(&ts), 0);
  398. ::io_uring_sqe_set_data(sqe, &ts);
  399. submit_sqes();
  400. }
  401. }
  402. }
  403. bool check_timers = false;
  404. int count = 0;
  405. while (result == 0 || local_ops > 0)
  406. {
  407. if (result == 0)
  408. {
  409. if (void* ptr = ::io_uring_cqe_get_data(cqe))
  410. {
  411. if (ptr == this)
  412. {
  413. // The io_uring service was interrupted.
  414. }
  415. else if (ptr == &timer_queues_)
  416. {
  417. check_timers = true;
  418. }
  419. else if (ptr == &timeout_)
  420. {
  421. check_timers = true;
  422. timeout_.tv_sec = 0;
  423. timeout_.tv_nsec = 0;
  424. }
  425. else if (ptr == &ts)
  426. {
  427. --local_ops;
  428. }
  429. else
  430. {
  431. io_queue* io_q = static_cast<io_queue*>(ptr);
  432. io_q->set_result(cqe->res);
  433. ops.push(io_q);
  434. }
  435. }
  436. ::io_uring_cqe_seen(&ring_, cqe);
  437. ++count;
  438. }
  439. result = (count < complete_batch_size || local_ops > 0)
  440. ? ::io_uring_peek_cqe(&ring_, &cqe) : -EAGAIN;
  441. }
  442. decrement(outstanding_work_, count);
  443. if (check_timers)
  444. {
  445. mutex::scoped_lock lock(mutex_);
  446. timer_queues_.get_ready_timers(ops);
  447. if (timeout_.tv_sec == 0 && timeout_.tv_nsec == 0)
  448. {
  449. timeout_ = get_timeout();
  450. if (::io_uring_sqe* sqe = get_sqe())
  451. {
  452. ::io_uring_prep_timeout(sqe, &timeout_, 0, 0);
  453. ::io_uring_sqe_set_data(sqe, &timeout_);
  454. push_submit_sqes_op(ops);
  455. }
  456. }
  457. }
  458. }
  459. void io_uring_service::interrupt()
  460. {
  461. mutex::scoped_lock lock(mutex_);
  462. if (::io_uring_sqe* sqe = get_sqe())
  463. {
  464. ::io_uring_prep_nop(sqe);
  465. ::io_uring_sqe_set_data(sqe, this);
  466. }
  467. submit_sqes();
  468. }
  469. void io_uring_service::init_ring()
  470. {
  471. int result = ::io_uring_queue_init(ring_size, &ring_, 0);
  472. if (result < 0)
  473. {
  474. ring_.ring_fd = -1;
  475. boost::system::error_code ec(-result,
  476. boost::asio::error::get_system_category());
  477. boost::asio::detail::throw_error(ec, "io_uring_queue_init");
  478. }
  479. #if !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
  480. event_fd_ = ::eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
  481. if (event_fd_ < 0)
  482. {
  483. boost::system::error_code ec(-result,
  484. boost::asio::error::get_system_category());
  485. ::io_uring_queue_exit(&ring_);
  486. boost::asio::detail::throw_error(ec, "eventfd");
  487. }
  488. result = ::io_uring_register_eventfd(&ring_, event_fd_);
  489. if (result < 0)
  490. {
  491. ::close(event_fd_);
  492. ::io_uring_queue_exit(&ring_);
  493. boost::system::error_code ec(-result,
  494. boost::asio::error::get_system_category());
  495. boost::asio::detail::throw_error(ec, "io_uring_queue_init");
  496. }
  497. #endif // !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
  498. }
  499. #if !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
  500. class io_uring_service::event_fd_read_op :
  501. public reactor_op
  502. {
  503. public:
  504. event_fd_read_op(io_uring_service* s)
  505. : reactor_op(boost::system::error_code(),
  506. &event_fd_read_op::do_perform, event_fd_read_op::do_complete),
  507. service_(s)
  508. {
  509. }
  510. static status do_perform(reactor_op* base)
  511. {
  512. event_fd_read_op* o(static_cast<event_fd_read_op*>(base));
  513. for (;;)
  514. {
  515. // Only perform one read. The kernel maintains an atomic counter.
  516. uint64_t counter(0);
  517. errno = 0;
  518. int bytes_read = ::read(o->service_->event_fd_,
  519. &counter, sizeof(uint64_t));
  520. if (bytes_read < 0 && errno == EINTR)
  521. continue;
  522. break;
  523. }
  524. op_queue<operation> ops;
  525. o->service_->run(0, ops);
  526. o->service_->scheduler_.post_deferred_completions(ops);
  527. return not_done;
  528. }
  529. static void do_complete(void* /*owner*/, operation* base,
  530. const boost::system::error_code& /*ec*/,
  531. std::size_t /*bytes_transferred*/)
  532. {
  533. event_fd_read_op* o(static_cast<event_fd_read_op*>(base));
  534. delete o;
  535. }
  536. private:
  537. io_uring_service* service_;
  538. };
  539. #endif // !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
  540. void io_uring_service::register_with_reactor()
  541. {
  542. #if !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
  543. reactor_.register_internal_descriptor(reactor::read_op,
  544. event_fd_, reactor_data_, new event_fd_read_op(this));
  545. #endif // !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
  546. }
  547. io_uring_service::io_object* io_uring_service::allocate_io_object()
  548. {
  549. mutex::scoped_lock registration_lock(registration_mutex_);
  550. return registered_io_objects_.alloc(
  551. BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
  552. REACTOR_IO, scheduler_.concurrency_hint()));
  553. }
  554. void io_uring_service::free_io_object(io_uring_service::io_object* io_obj)
  555. {
  556. mutex::scoped_lock registration_lock(registration_mutex_);
  557. registered_io_objects_.free(io_obj);
  558. }
  559. bool io_uring_service::do_cancel_ops(
  560. per_io_object_data& io_obj, op_queue<operation>& ops)
  561. {
  562. bool cancel_op = false;
  563. for (int i = 0; i < max_ops; ++i)
  564. {
  565. if (io_uring_operation* first_op = io_obj->queues_[i].op_queue_.front())
  566. {
  567. cancel_op = true;
  568. io_obj->queues_[i].op_queue_.pop();
  569. while (io_uring_operation* op = io_obj->queues_[i].op_queue_.front())
  570. {
  571. op->ec_ = boost::asio::error::operation_aborted;
  572. io_obj->queues_[i].op_queue_.pop();
  573. ops.push(op);
  574. }
  575. io_obj->queues_[i].op_queue_.push(first_op);
  576. }
  577. }
  578. if (cancel_op)
  579. {
  580. mutex::scoped_lock lock(mutex_);
  581. for (int i = 0; i < max_ops; ++i)
  582. {
  583. if (!io_obj->queues_[i].op_queue_.empty()
  584. && !io_obj->queues_[i].cancel_requested_)
  585. {
  586. io_obj->queues_[i].cancel_requested_ = true;
  587. if (::io_uring_sqe* sqe = get_sqe())
  588. ::io_uring_prep_cancel(sqe, &io_obj->queues_[i], 0);
  589. }
  590. }
  591. submit_sqes();
  592. }
  593. return cancel_op;
  594. }
  595. void io_uring_service::do_add_timer_queue(timer_queue_base& queue)
  596. {
  597. mutex::scoped_lock lock(mutex_);
  598. timer_queues_.insert(&queue);
  599. }
  600. void io_uring_service::do_remove_timer_queue(timer_queue_base& queue)
  601. {
  602. mutex::scoped_lock lock(mutex_);
  603. timer_queues_.erase(&queue);
  604. }
  605. void io_uring_service::update_timeout()
  606. {
  607. if (::io_uring_sqe* sqe = get_sqe())
  608. {
  609. ::io_uring_prep_timeout_remove(sqe, reinterpret_cast<__u64>(&timeout_), 0);
  610. ::io_uring_sqe_set_data(sqe, &timer_queues_);
  611. }
  612. }
  613. __kernel_timespec io_uring_service::get_timeout() const
  614. {
  615. __kernel_timespec ts;
  616. long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000);
  617. ts.tv_sec = usec / 1000000;
  618. ts.tv_nsec = usec ? (usec % 1000000) * 1000 : 1;
  619. return ts;
  620. }
  621. ::io_uring_sqe* io_uring_service::get_sqe()
  622. {
  623. ::io_uring_sqe* sqe = ::io_uring_get_sqe(&ring_);
  624. if (!sqe)
  625. {
  626. submit_sqes();
  627. sqe = ::io_uring_get_sqe(&ring_);
  628. }
  629. if (sqe)
  630. {
  631. ::io_uring_sqe_set_data(sqe, 0);
  632. ++pending_sqes_;
  633. }
  634. return sqe;
  635. }
  636. void io_uring_service::submit_sqes()
  637. {
  638. if (pending_sqes_ != 0)
  639. {
  640. int result = ::io_uring_submit(&ring_);
  641. if (result > 0)
  642. {
  643. pending_sqes_ -= result;
  644. increment(outstanding_work_, result);
  645. }
  646. }
  647. }
  648. void io_uring_service::post_submit_sqes_op(mutex::scoped_lock& lock)
  649. {
  650. if (pending_sqes_ >= submit_batch_size)
  651. {
  652. submit_sqes();
  653. }
  654. else if (pending_sqes_ != 0 && !pending_submit_sqes_op_)
  655. {
  656. pending_submit_sqes_op_ = true;
  657. lock.unlock();
  658. scheduler_.post_immediate_completion(&submit_sqes_op_, false);
  659. }
  660. }
  661. void io_uring_service::push_submit_sqes_op(op_queue<operation>& ops)
  662. {
  663. if (pending_sqes_ != 0 && !pending_submit_sqes_op_)
  664. {
  665. pending_submit_sqes_op_ = true;
  666. ops.push(&submit_sqes_op_);
  667. scheduler_.compensating_work_started();
  668. }
  669. }
  670. io_uring_service::submit_sqes_op::submit_sqes_op(io_uring_service* s)
  671. : operation(&io_uring_service::submit_sqes_op::do_complete),
  672. service_(s)
  673. {
  674. }
  675. void io_uring_service::submit_sqes_op::do_complete(void* owner, operation* base,
  676. const boost::system::error_code& /*ec*/, std::size_t /*bytes_transferred*/)
  677. {
  678. if (owner)
  679. {
  680. submit_sqes_op* o = static_cast<submit_sqes_op*>(base);
  681. mutex::scoped_lock lock(o->service_->mutex_);
  682. o->service_->submit_sqes();
  683. if (o->service_->pending_sqes_ != 0)
  684. o->service_->scheduler_.post_immediate_completion(o, true);
  685. else
  686. o->service_->pending_submit_sqes_op_ = false;
  687. }
  688. }
  689. io_uring_service::io_queue::io_queue()
  690. : operation(&io_uring_service::io_queue::do_complete)
  691. {
  692. }
  693. struct io_uring_service::perform_io_cleanup_on_block_exit
  694. {
  695. explicit perform_io_cleanup_on_block_exit(io_uring_service* s)
  696. : service_(s), io_object_to_free_(0), first_op_(0)
  697. {
  698. }
  699. ~perform_io_cleanup_on_block_exit()
  700. {
  701. if (io_object_to_free_)
  702. {
  703. mutex::scoped_lock lock(service_->mutex_);
  704. service_->free_io_object(io_object_to_free_);
  705. }
  706. if (first_op_)
  707. {
  708. // Post the remaining completed operations for invocation.
  709. if (!ops_.empty())
  710. service_->scheduler_.post_deferred_completions(ops_);
  711. // A user-initiated operation has completed, but there's no need to
  712. // explicitly call work_finished() here. Instead, we'll take advantage of
  713. // the fact that the scheduler will call work_finished() once we return.
  714. }
  715. else
  716. {
  717. // No user-initiated operations have completed, so we need to compensate
  718. // for the work_finished() call that the scheduler will make once this
  719. // operation returns.
  720. service_->scheduler_.compensating_work_started();
  721. }
  722. }
  723. io_uring_service* service_;
  724. io_object* io_object_to_free_;
  725. op_queue<operation> ops_;
  726. operation* first_op_;
  727. };
  728. operation* io_uring_service::io_queue::perform_io(int result)
  729. {
  730. perform_io_cleanup_on_block_exit io_cleanup(io_object_->service_);
  731. mutex::scoped_lock io_object_lock(io_object_->mutex_);
  732. if (result != -ECANCELED || cancel_requested_)
  733. {
  734. if (io_uring_operation* op = op_queue_.front())
  735. {
  736. if (result < 0)
  737. {
  738. op->ec_.assign(-result, boost::asio::error::get_system_category());
  739. op->bytes_transferred_ = 0;
  740. }
  741. else
  742. {
  743. op->ec_.assign(0, op->ec_.category());
  744. op->bytes_transferred_ = static_cast<std::size_t>(result);
  745. }
  746. }
  747. while (io_uring_operation* op = op_queue_.front())
  748. {
  749. if (op->perform(io_cleanup.ops_.empty()))
  750. {
  751. op_queue_.pop();
  752. io_cleanup.ops_.push(op);
  753. }
  754. else
  755. break;
  756. }
  757. }
  758. cancel_requested_ = false;
  759. if (!op_queue_.empty())
  760. {
  761. io_uring_service* service = io_object_->service_;
  762. mutex::scoped_lock lock(service->mutex_);
  763. if (::io_uring_sqe* sqe = service->get_sqe())
  764. {
  765. op_queue_.front()->prepare(sqe);
  766. ::io_uring_sqe_set_data(sqe, this);
  767. service->post_submit_sqes_op(lock);
  768. }
  769. else
  770. {
  771. lock.unlock();
  772. while (io_uring_operation* op = op_queue_.front())
  773. {
  774. op->ec_ = boost::asio::error::no_buffer_space;
  775. op_queue_.pop();
  776. io_cleanup.ops_.push(op);
  777. }
  778. }
  779. }
  780. // The last operation to complete on a shut down object must free it.
  781. if (io_object_->shutdown_)
  782. {
  783. io_cleanup.io_object_to_free_ = io_object_;
  784. for (int i = 0; i < max_ops; ++i)
  785. if (!io_object_->queues_[i].op_queue_.empty())
  786. io_cleanup.io_object_to_free_ = 0;
  787. }
  788. // The first operation will be returned for completion now. The others will
  789. // be posted for later by the io_cleanup object's destructor.
  790. io_cleanup.first_op_ = io_cleanup.ops_.front();
  791. io_cleanup.ops_.pop();
  792. return io_cleanup.first_op_;
  793. }
  794. void io_uring_service::io_queue::do_complete(void* owner, operation* base,
  795. const boost::system::error_code& ec, std::size_t bytes_transferred)
  796. {
  797. if (owner)
  798. {
  799. io_queue* io_q = static_cast<io_queue*>(base);
  800. int result = static_cast<int>(bytes_transferred);
  801. if (operation* op = io_q->perform_io(result))
  802. {
  803. op->complete(owner, ec, 0);
  804. }
  805. }
  806. }
  807. io_uring_service::io_object::io_object(bool locking)
  808. : mutex_(locking)
  809. {
  810. }
  811. } // namespace detail
  812. } // namespace asio
  813. } // namespace boost
  814. #include <boost/asio/detail/pop_options.hpp>
  815. #endif // defined(BOOST_ASIO_HAS_IO_URING)
  816. #endif // BOOST_ASIO_DETAIL_IMPL_IO_URING_SERVICE_IPP