race.hpp 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696
  1. //
  2. // Copyright (c) 2022 Klemens Morgenstern (klemens.morgenstern@gmx.net)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_COBALT_DETAIL_RACE_HPP
  8. #define BOOST_COBALT_DETAIL_RACE_HPP
  9. #include <boost/cobalt/detail/await_result_helper.hpp>
  10. #include <boost/cobalt/detail/fork.hpp>
  11. #include <boost/cobalt/detail/handler.hpp>
  12. #include <boost/cobalt/detail/forward_cancellation.hpp>
  13. #include <boost/cobalt/result.hpp>
  14. #include <boost/cobalt/this_thread.hpp>
  15. #include <boost/cobalt/detail/util.hpp>
  16. #include <boost/asio/bind_allocator.hpp>
  17. #include <boost/asio/bind_cancellation_slot.hpp>
  18. #include <boost/asio/bind_executor.hpp>
  19. #include <boost/asio/cancellation_signal.hpp>
  20. #include <boost/asio/associated_cancellation_slot.hpp>
  21. #include <boost/core/no_exceptions_support.hpp>
  22. #include <boost/intrusive_ptr.hpp>
  23. #include <boost/core/demangle.hpp>
  24. #include <boost/core/span.hpp>
  25. #include <boost/variant2/variant.hpp>
  26. #include <coroutine>
  27. #include <optional>
  28. #include <algorithm>
  29. namespace boost::cobalt::detail
  30. {
  31. struct left_race_tag {};
  32. // helpers it determining the type of things;
  33. template<typename Base, // range of aw
  34. typename Awaitable = Base>
  35. struct race_traits
  36. {
  37. // for a ranges race this is based on the range, not the AW in it.
  38. constexpr static bool is_lvalue = std::is_lvalue_reference_v<Base>;
  39. // what the value is supposed to be cast to before the co_await_operator
  40. using awaitable = std::conditional_t<is_lvalue, std::decay_t<Awaitable> &, Awaitable &&>;
  41. // do we need operator co_await
  42. constexpr static bool is_actual = awaitable_type<awaitable>;
  43. // the type with .await_ functions & interrupt_await
  44. using actual_awaitable
  45. = std::conditional_t<
  46. is_actual,
  47. awaitable,
  48. decltype(get_awaitable_type(std::declval<awaitable>()))>;
  49. // the type to be used with interruptible
  50. using interruptible_type
  51. = std::conditional_t<
  52. std::is_lvalue_reference_v<Base>,
  53. std::decay_t<actual_awaitable> &,
  54. std::decay_t<actual_awaitable> &&>;
  55. constexpr static bool interruptible =
  56. cobalt::interruptible<interruptible_type>;
  57. static void do_interrupt(std::decay_t<actual_awaitable> & aw)
  58. {
  59. if constexpr (interruptible)
  60. static_cast<interruptible_type>(aw).interrupt_await();
  61. }
  62. };
  63. struct interruptible_base
  64. {
  65. virtual void interrupt_await() = 0;
  66. };
  67. template<asio::cancellation_type Ct, typename URBG, typename ... Args>
  68. struct race_variadic_impl
  69. {
  70. template<typename URBG_>
  71. race_variadic_impl(URBG_ && g, Args && ... args)
  72. : args{std::forward<Args>(args)...}, g(std::forward<URBG_>(g))
  73. {
  74. }
  75. std::tuple<Args...> args;
  76. URBG g;
  77. constexpr static std::size_t tuple_size = sizeof...(Args);
  78. struct awaitable : fork::static_shared_state<256 * tuple_size>
  79. {
  80. #if !defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
  81. boost::source_location loc;
  82. #endif
  83. template<std::size_t ... Idx>
  84. awaitable(std::tuple<Args...> & args, URBG & g, std::index_sequence<Idx...>) :
  85. aws{args}
  86. {
  87. if constexpr (!std::is_same_v<URBG, left_race_tag>)
  88. std::shuffle(impls.begin(), impls.end(), g);
  89. std::fill(working.begin(), working.end(), nullptr);
  90. }
  91. std::tuple<Args...> & aws;
  92. std::array<asio::cancellation_signal, tuple_size> cancel_;
  93. template<typename > constexpr static auto make_null() {return nullptr;};
  94. std::array<asio::cancellation_signal*, tuple_size> cancel = {make_null<Args>()...};
  95. std::array<interruptible_base*, tuple_size> working;
  96. std::size_t index{std::numeric_limits<std::size_t>::max()};
  97. constexpr static bool all_void = (std::is_void_v<co_await_result_t<Args>> && ... );
  98. std::optional<variant2::variant<void_as_monostate<co_await_result_t<Args>>...>> result;
  99. std::exception_ptr error;
  100. bool has_result() const
  101. {
  102. return index != std::numeric_limits<std::size_t>::max();
  103. }
  104. void cancel_all()
  105. {
  106. interrupt_await();
  107. for (auto i = 0u; i < tuple_size; i++)
  108. if (auto &r = cancel[i]; r)
  109. std::exchange(r, nullptr)->emit(Ct);
  110. }
  111. void interrupt_await()
  112. {
  113. for (auto i : working)
  114. if (i)
  115. i->interrupt_await();
  116. }
  117. template<typename T, typename Error>
  118. void assign_error(system::result<T, Error> & res)
  119. BOOST_TRY
  120. {
  121. std::move(res).value(loc);
  122. }
  123. BOOST_CATCH(...)
  124. {
  125. error = std::current_exception();
  126. }
  127. BOOST_CATCH_END
  128. template<typename T>
  129. void assign_error(system::result<T, std::exception_ptr> & res)
  130. {
  131. error = std::move(res).error();
  132. }
  133. template<std::size_t Idx>
  134. static detail::fork await_impl(awaitable & this_)
  135. BOOST_TRY
  136. {
  137. using traits = race_traits<mp11::mp_at_c<mp11::mp_list<Args...>, Idx>>;
  138. typename traits::actual_awaitable aw_{
  139. get_awaitable_type(
  140. static_cast<typename traits::awaitable>(std::get<Idx>(this_.aws))
  141. )
  142. };
  143. as_result_t aw{aw_};
  144. struct interruptor final : interruptible_base
  145. {
  146. std::decay_t<typename traits::actual_awaitable> & aw;
  147. interruptor(std::decay_t<typename traits::actual_awaitable> & aw) : aw(aw) {}
  148. void interrupt_await() override
  149. {
  150. traits::do_interrupt(aw);
  151. }
  152. };
  153. interruptor in{aw_};
  154. //if constexpr (traits::interruptible)
  155. this_.working[Idx] = &in;
  156. auto transaction = [&this_, idx = Idx] {
  157. if (this_.has_result())
  158. boost::throw_exception(std::runtime_error("Another transaction already started"));
  159. this_.cancel[idx] = nullptr;
  160. // reserve the index early bc
  161. this_.index = idx;
  162. this_.cancel_all();
  163. };
  164. co_await fork::set_transaction_function(transaction);
  165. // check manually if we're ready
  166. auto rd = aw.await_ready();
  167. if (!rd)
  168. {
  169. this_.cancel[Idx] = &this_.cancel_[Idx];
  170. co_await this_.cancel[Idx]->slot();
  171. // make sure the executor is set
  172. co_await detail::fork::wired_up;
  173. // do the await - this doesn't call await-ready again
  174. if constexpr (std::is_void_v<decltype(aw_.await_resume())>)
  175. {
  176. auto res = co_await aw;
  177. if (!this_.has_result())
  178. {
  179. this_.index = Idx;
  180. if (res.has_error())
  181. this_.assign_error(res);
  182. }
  183. if constexpr(!all_void)
  184. if (this_.index == Idx && !res.has_error())
  185. this_.result.emplace(variant2::in_place_index<Idx>);
  186. }
  187. else
  188. {
  189. auto val = co_await aw;
  190. if (!this_.has_result())
  191. this_.index = Idx;
  192. if (this_.index == Idx)
  193. {
  194. if (val.has_error())
  195. this_.assign_error(val);
  196. else
  197. this_.result.emplace(variant2::in_place_index<Idx>, *std::move(val));
  198. }
  199. }
  200. this_.cancel[Idx] = nullptr;
  201. }
  202. else
  203. {
  204. if (!this_.has_result())
  205. this_.index = Idx;
  206. if constexpr (std::is_void_v<decltype(aw_.await_resume())>)
  207. {
  208. auto res = aw.await_resume();
  209. if (this_.index == Idx)
  210. {
  211. if (res.has_error())
  212. this_.assign_error(res);
  213. else
  214. this_.result.emplace(variant2::in_place_index<Idx>);
  215. }
  216. }
  217. else
  218. {
  219. if (this_.index == Idx)
  220. {
  221. auto res = aw.await_resume();
  222. if (res.has_error())
  223. this_.assign_error(res);
  224. else
  225. this_.result.emplace(variant2::in_place_index<Idx>, *std::move(res));
  226. }
  227. else
  228. aw.await_resume();
  229. }
  230. this_.cancel[Idx] = nullptr;
  231. }
  232. this_.cancel_all();
  233. this_.working[Idx] = nullptr;
  234. }
  235. BOOST_CATCH(...)
  236. {
  237. if (!this_.has_result())
  238. this_.index = Idx;
  239. if (this_.index == Idx)
  240. this_.error = std::current_exception();
  241. this_.working[Idx] = nullptr;
  242. }
  243. BOOST_CATCH_END
  244. std::array<detail::fork(*)(awaitable&), tuple_size> impls {
  245. []<std::size_t ... Idx>(std::index_sequence<Idx...>)
  246. {
  247. return std::array<detail::fork(*)(awaitable&), tuple_size>{&await_impl<Idx>...};
  248. }(std::make_index_sequence<tuple_size>{})
  249. };
  250. detail::fork last_forked;
  251. bool await_ready()
  252. {
  253. last_forked = impls[0](*this);
  254. return last_forked.done();
  255. }
  256. template<typename H>
  257. auto await_suspend(
  258. std::coroutine_handle<H> h,
  259. const boost::source_location & loc = BOOST_CURRENT_LOCATION)
  260. {
  261. this->loc = loc;
  262. this->exec = &cobalt::detail::get_executor(h);
  263. last_forked.release().resume();
  264. if (!this->outstanding_work()) // already done, resume rightaway.
  265. return false;
  266. for (std::size_t idx = 1u;
  267. idx < tuple_size; idx++) // we'
  268. {
  269. auto l = impls[idx](*this);
  270. const auto d = l.done();
  271. l.release();
  272. if (d)
  273. break;
  274. }
  275. if (!this->outstanding_work()) // already done, resume rightaway.
  276. return false;
  277. // arm the cancel
  278. assign_cancellation(
  279. h,
  280. [&](asio::cancellation_type ct)
  281. {
  282. for (auto & cs : cancel)
  283. if (cs)
  284. cs->emit(ct);
  285. });
  286. this->coro.reset(h.address());
  287. return true;
  288. }
  289. #if _MSC_VER
  290. BOOST_NOINLINE
  291. #endif
  292. auto await_resume()
  293. {
  294. if (error)
  295. std::rethrow_exception(error);
  296. if constexpr (all_void)
  297. return index;
  298. else
  299. return std::move(*result);
  300. }
  301. auto await_resume(const as_tuple_tag &)
  302. {
  303. if constexpr (all_void)
  304. return std::make_tuple(error, index);
  305. else
  306. return std::make_tuple(error, std::move(*result));
  307. }
  308. auto await_resume(const as_result_tag & )
  309. -> system::result<std::conditional_t<all_void, std::size_t, variant2::variant<void_as_monostate<co_await_result_t<Args>>...>>, std::exception_ptr>
  310. {
  311. if (error)
  312. return {system::in_place_error, error};
  313. if constexpr (all_void)
  314. return {system::in_place_value, index};
  315. else
  316. return {system::in_place_value, std::move(*result)};
  317. }
  318. };
  319. awaitable operator co_await() &&
  320. {
  321. return awaitable{args, g, std::make_index_sequence<tuple_size>{}};
  322. }
  323. };
  324. template<asio::cancellation_type Ct, typename URBG, typename Range>
  325. struct race_ranged_impl
  326. {
  327. using result_type = co_await_result_t<std::decay_t<decltype(*std::begin(std::declval<Range>()))>>;
  328. template<typename URBG_>
  329. race_ranged_impl(URBG_ && g, Range && rng)
  330. : range{std::forward<Range>(rng)}, g(std::forward<URBG_>(g))
  331. {
  332. }
  333. Range range;
  334. URBG g;
  335. struct awaitable : fork::shared_state
  336. {
  337. #if !defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
  338. boost::source_location loc;
  339. #endif
  340. using type = std::decay_t<decltype(*std::begin(std::declval<Range>()))>;
  341. using traits = race_traits<Range, type>;
  342. std::size_t index{std::numeric_limits<std::size_t>::max()};
  343. std::conditional_t<
  344. std::is_void_v<result_type>,
  345. variant2::monostate,
  346. std::optional<result_type>> result;
  347. std::exception_ptr error;
  348. #if !defined(BOOST_COBALT_NO_PMR)
  349. pmr::monotonic_buffer_resource res;
  350. pmr::polymorphic_allocator<void> alloc{&resource};
  351. Range &aws;
  352. struct dummy
  353. {
  354. template<typename ... Args>
  355. dummy(Args && ...) {}
  356. };
  357. std::conditional_t<traits::interruptible,
  358. pmr::vector<std::decay_t<typename traits::actual_awaitable>*>,
  359. dummy> working{std::size(aws), alloc};
  360. /* all below `reorder` is reordered
  361. *
  362. * cancel[idx] is for aws[reorder[idx]]
  363. */
  364. pmr::vector<std::size_t> reorder{std::size(aws), alloc};
  365. pmr::vector<asio::cancellation_signal> cancel_{std::size(aws), alloc};
  366. pmr::vector<asio::cancellation_signal*> cancel{std::size(aws), alloc};
  367. #else
  368. Range &aws;
  369. struct dummy
  370. {
  371. template<typename ... Args>
  372. dummy(Args && ...) {}
  373. };
  374. std::conditional_t<traits::interruptible,
  375. std::vector<std::decay_t<typename traits::actual_awaitable>*>,
  376. dummy> working{std::size(aws), std::allocator<void>()};
  377. /* all below `reorder` is reordered
  378. *
  379. * cancel[idx] is for aws[reorder[idx]]
  380. */
  381. std::vector<std::size_t> reorder{std::size(aws), std::allocator<void>()};
  382. std::vector<asio::cancellation_signal> cancel_{std::size(aws), std::allocator<void>()};
  383. std::vector<asio::cancellation_signal*> cancel{std::size(aws), std::allocator<void>()};
  384. #endif
  385. bool has_result() const {return index != std::numeric_limits<std::size_t>::max(); }
  386. awaitable(Range & aws, URBG & g)
  387. : fork::shared_state((256 + sizeof(co_awaitable_type<type>) + sizeof(std::size_t)) * std::size(aws))
  388. , aws(aws)
  389. {
  390. std::generate(reorder.begin(), reorder.end(), [i = std::size_t(0u)]() mutable {return i++;});
  391. if constexpr (traits::interruptible)
  392. std::fill(working.begin(), working.end(), nullptr);
  393. if constexpr (!std::is_same_v<URBG, left_race_tag>)
  394. std::shuffle(reorder.begin(), reorder.end(), g);
  395. }
  396. void cancel_all()
  397. {
  398. interrupt_await();
  399. for (auto & r : cancel)
  400. if (r)
  401. std::exchange(r, nullptr)->emit(Ct);
  402. }
  403. void interrupt_await()
  404. {
  405. if constexpr (traits::interruptible)
  406. for (auto aw : working)
  407. if (aw)
  408. traits::do_interrupt(*aw);
  409. }
  410. template<typename T, typename Error>
  411. void assign_error(system::result<T, Error> & res)
  412. BOOST_TRY
  413. {
  414. std::move(res).value(loc);
  415. }
  416. BOOST_CATCH(...)
  417. {
  418. error = std::current_exception();
  419. }
  420. BOOST_CATCH_END
  421. template<typename T>
  422. void assign_error(system::result<T, std::exception_ptr> & res)
  423. {
  424. error = std::move(res).error();
  425. }
  426. static detail::fork await_impl(awaitable & this_, std::size_t idx)
  427. BOOST_TRY
  428. {
  429. typename traits::actual_awaitable aw_{
  430. get_awaitable_type(
  431. static_cast<typename traits::awaitable>(*std::next(std::begin(this_.aws), idx))
  432. )};
  433. as_result_t aw{aw_};
  434. if constexpr (traits::interruptible)
  435. this_.working[idx] = &aw_;
  436. auto transaction = [&this_, idx = idx] {
  437. if (this_.has_result())
  438. boost::throw_exception(std::runtime_error("Another transaction already started"));
  439. this_.cancel[idx] = nullptr;
  440. // reserve the index early bc
  441. this_.index = idx;
  442. this_.cancel_all();
  443. };
  444. co_await fork::set_transaction_function(transaction);
  445. // check manually if we're ready
  446. auto rd = aw.await_ready();
  447. if (!rd)
  448. {
  449. this_.cancel[idx] = &this_.cancel_[idx];
  450. co_await this_.cancel[idx]->slot();
  451. // make sure the executor is set
  452. co_await detail::fork::wired_up;
  453. // do the await - this doesn't call await-ready again
  454. if constexpr (std::is_void_v<result_type>)
  455. {
  456. auto res = co_await aw;
  457. if (!this_.has_result())
  458. {
  459. if (res.has_error())
  460. this_.assign_error(res);
  461. this_.index = idx;
  462. }
  463. }
  464. else
  465. {
  466. auto val = co_await aw;
  467. if (!this_.has_result())
  468. this_.index = idx;
  469. if (this_.index == idx)
  470. {
  471. if (val.has_error())
  472. this_.assign_error(val);
  473. else
  474. this_.result.emplace(*std::move(val));
  475. }
  476. }
  477. this_.cancel[idx] = nullptr;
  478. }
  479. else
  480. {
  481. if (!this_.has_result())
  482. this_.index = idx;
  483. if constexpr (std::is_void_v<decltype(aw_.await_resume())>)
  484. {
  485. auto val = aw.await_resume();
  486. if (val.has_error())
  487. this_.assign_error(val);
  488. }
  489. else
  490. {
  491. if (this_.index == idx)
  492. {
  493. auto val = aw.await_resume();
  494. if (val.has_error())
  495. this_.assign_error(val);
  496. else
  497. this_.result.emplace(*std::move(val));
  498. }
  499. else
  500. aw.await_resume();
  501. }
  502. this_.cancel[idx] = nullptr;
  503. }
  504. this_.cancel_all();
  505. if constexpr (traits::interruptible)
  506. this_.working[idx] = nullptr;
  507. }
  508. BOOST_CATCH(...)
  509. {
  510. if (!this_.has_result())
  511. this_.index = idx;
  512. if (this_.index == idx)
  513. this_.error = std::current_exception();
  514. if constexpr (traits::interruptible)
  515. this_.working[idx] = nullptr;
  516. }
  517. BOOST_CATCH_END
  518. detail::fork last_forked;
  519. bool await_ready()
  520. {
  521. last_forked = await_impl(*this, reorder.front());
  522. return last_forked.done();
  523. }
  524. template<typename H>
  525. auto await_suspend(std::coroutine_handle<H> h,
  526. const boost::source_location & loc = BOOST_CURRENT_LOCATION)
  527. {
  528. this->loc = loc;
  529. this->exec = &detail::get_executor(h);
  530. last_forked.release().resume();
  531. if (!this->outstanding_work()) // already done, resume rightaway.
  532. return false;
  533. for (auto itr = std::next(reorder.begin());
  534. itr < reorder.end(); std::advance(itr, 1)) // we'
  535. {
  536. auto l = await_impl(*this, *itr);
  537. auto d = l.done();
  538. l.release();
  539. if (d)
  540. break;
  541. }
  542. if (!this->outstanding_work()) // already done, resume rightaway.
  543. return false;
  544. // arm the cancel
  545. assign_cancellation(
  546. h,
  547. [&](asio::cancellation_type ct)
  548. {
  549. for (auto & cs : cancel)
  550. if (cs)
  551. cs->emit(ct);
  552. });
  553. this->coro.reset(h.address());
  554. return true;
  555. }
  556. #if _MSC_VER
  557. BOOST_NOINLINE
  558. #endif
  559. auto await_resume()
  560. {
  561. if (error)
  562. std::rethrow_exception(error);
  563. if constexpr (std::is_void_v<result_type>)
  564. return index;
  565. else
  566. return std::make_pair(index, *result);
  567. }
  568. auto await_resume(const as_tuple_tag &)
  569. {
  570. if constexpr (std::is_void_v<result_type>)
  571. return std::make_tuple(error, index);
  572. else
  573. return std::make_tuple(error, std::make_pair(index, std::move(*result)));
  574. }
  575. auto await_resume(const as_result_tag & )
  576. -> system::result<result_type, std::exception_ptr>
  577. {
  578. if (error)
  579. return {system::in_place_error, error};
  580. if constexpr (std::is_void_v<result_type>)
  581. return {system::in_place_value, index};
  582. else
  583. return {system::in_place_value, std::make_pair(index, std::move(*result))};
  584. }
  585. };
  586. awaitable operator co_await() &&
  587. {
  588. return awaitable{range, g};
  589. }
  590. };
  591. }
  592. #endif //BOOST_COBALT_DETAIL_RACE_HPP