fork.hpp 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. // Copyright (c) 2023 Klemens D. Morgenstern
  2. //
  3. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  4. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  5. #ifndef BOOST_COBALT_DETAIL_FORK_HPP
  6. #define BOOST_COBALT_DETAIL_FORK_HPP
  7. #include <boost/cobalt/config.hpp>
  8. #include <boost/cobalt/detail/await_result_helper.hpp>
  9. #include <boost/cobalt/detail/util.hpp>
  10. #include <boost/cobalt/this_thread.hpp>
  11. #include <boost/cobalt/unique_handle.hpp>
  12. #if defined(BOOST_COBALT_NO_PMR)
  13. #include <boost/cobalt/detail/monotonic_resource.hpp>
  14. #endif
  15. #include <boost/asio/cancellation_signal.hpp>
  16. #include <boost/intrusive_ptr.hpp>
  17. #include <coroutine>
  18. namespace boost::cobalt::detail
  19. {
  20. struct fork
  21. {
  22. fork() = default;
  23. struct shared_state
  24. {
  25. #if !defined(BOOST_COBALT_NO_PMR)
  26. pmr::monotonic_buffer_resource resource;
  27. template<typename ... Args>
  28. shared_state(Args && ... args)
  29. : resource(std::forward<Args>(args)...,
  30. this_thread::get_default_resource())
  31. {
  32. }
  33. #else
  34. detail::monotonic_resource resource;
  35. template<typename ... Args>
  36. shared_state(Args && ... args)
  37. : resource(std::forward<Args>(args)...)
  38. {
  39. }
  40. #endif
  41. // the coro awaiting the fork statement, e.g. awaiting race
  42. unique_handle<void> coro;
  43. std::size_t use_count = 0u;
  44. friend void intrusive_ptr_add_ref(shared_state * st) {st->use_count++;}
  45. friend void intrusive_ptr_release(shared_state * st)
  46. {
  47. if (st->use_count-- == 1u)
  48. st->coro.reset();
  49. }
  50. bool outstanding_work() {return use_count != 0u;}
  51. const executor * exec = nullptr;
  52. bool wired_up() {return exec != nullptr;}
  53. using executor_type = executor;
  54. const executor_type & get_executor() const
  55. {
  56. BOOST_ASSERT(exec != nullptr);
  57. return *exec;
  58. }
  59. #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
  60. boost::source_location loc;
  61. #endif
  62. };
  63. template<typename std::size_t BufferSize>
  64. struct static_shared_state : private std::array<char, BufferSize>, shared_state
  65. {
  66. static_shared_state() : shared_state{std::array<char, BufferSize>::data(),
  67. std::array<char, BufferSize>::size()}
  68. {}
  69. };
  70. struct wired_up_t {};
  71. constexpr static wired_up_t wired_up{};
  72. struct set_transaction_function
  73. {
  74. void * begin_transaction_this = nullptr;
  75. void (*begin_transaction_func)(void*) = nullptr;
  76. template<typename BeginTransaction>
  77. set_transaction_function(BeginTransaction & transaction)
  78. : begin_transaction_this(&transaction)
  79. , begin_transaction_func(
  80. +[](void * ptr)
  81. {
  82. (*static_cast<BeginTransaction*>(ptr))();
  83. })
  84. {
  85. }
  86. };
  87. struct promise_type
  88. {
  89. template<typename State, typename ... Rest>
  90. void * operator new(const std::size_t size, State & st, Rest &&...)
  91. {
  92. return st.resource.allocate(size);
  93. }
  94. void operator delete(void *) noexcept {}
  95. template<typename ... Rest>
  96. promise_type(shared_state & st, Rest & ...)
  97. : state(&st)
  98. {
  99. }
  100. intrusive_ptr<shared_state> state;
  101. asio::cancellation_slot cancel;
  102. using executor_type = executor;
  103. const executor_type & get_executor() const { return state->get_executor(); }
  104. #if defined(BOOST_COBALT_NO_PMR)
  105. using allocator_type = detail::monotonic_allocator<void>;
  106. const allocator_type get_allocator() const { return &state->resource; }
  107. #else
  108. using allocator_type = pmr::polymorphic_allocator<void>;
  109. const allocator_type get_allocator() const { return &state->resource; }
  110. #endif
  111. using cancellation_slot_type = asio::cancellation_slot;
  112. cancellation_slot_type get_cancellation_slot() const { return cancel; }
  113. constexpr static std::suspend_never initial_suspend() noexcept {return {};}
  114. struct final_awaitable
  115. {
  116. promise_type * self;
  117. bool await_ready() noexcept
  118. {
  119. return self->state->use_count != 1u;
  120. }
  121. std::coroutine_handle<void> await_suspend(std::coroutine_handle<promise_type> h) noexcept
  122. {
  123. auto pp = h.promise().state.detach();
  124. #if defined(BOOST_COBALT_NO_SELF_DELETE)
  125. h.promise().~promise_type();
  126. #else
  127. // mem is in a monotonic_resource, this is fine on msvc- gcc doesn't like it though
  128. h.destroy();
  129. #endif
  130. pp->use_count--;
  131. BOOST_ASSERT(pp->use_count == 0u);
  132. if (pp->coro)
  133. return pp->coro.release();
  134. else
  135. return std::noop_coroutine();
  136. }
  137. constexpr static void await_resume() noexcept {}
  138. };
  139. final_awaitable final_suspend() noexcept
  140. {
  141. if (cancel.is_connected())
  142. cancel.clear();
  143. return final_awaitable{this};
  144. }
  145. void return_void()
  146. {
  147. }
  148. template<awaitable<promise_type> Aw>
  149. struct wrapped_awaitable
  150. {
  151. Aw & aw;
  152. constexpr static bool await_ready() noexcept
  153. {
  154. return false;
  155. }
  156. auto await_suspend(std::coroutine_handle<promise_type> h)
  157. {
  158. BOOST_ASSERT(h.promise().state->wired_up());
  159. #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
  160. if constexpr (requires {aw.await_suspend(h, boost::source_location ());})
  161. return aw.await_suspend(h, h.promise().state->loc);
  162. #endif
  163. return aw.await_suspend(h);
  164. }
  165. auto await_resume()
  166. {
  167. return aw.await_resume();
  168. }
  169. };
  170. template<awaitable<promise_type> Aw>
  171. auto await_transform(Aw & aw)
  172. {
  173. return wrapped_awaitable<Aw>{aw};
  174. }
  175. struct wired_up_awaitable
  176. {
  177. promise_type * promise;
  178. bool await_ready() const noexcept
  179. {
  180. return promise->state->wired_up();
  181. }
  182. void await_suspend(std::coroutine_handle<promise_type>)
  183. {
  184. }
  185. constexpr static void await_resume() noexcept {}
  186. };
  187. auto await_transform(wired_up_t)
  188. {
  189. return wired_up_awaitable{this};
  190. }
  191. auto await_transform(set_transaction_function sf)
  192. {
  193. begin_transaction_this = sf.begin_transaction_this;
  194. begin_transaction_func = sf.begin_transaction_func;
  195. return std::suspend_never();
  196. }
  197. auto await_transform(asio::cancellation_slot slot)
  198. {
  199. this->cancel = slot;
  200. return std::suspend_never();
  201. }
  202. [[noreturn]] void unhandled_exception() noexcept {std::terminate();}
  203. void * begin_transaction_this = nullptr;
  204. void (*begin_transaction_func)(void*) = nullptr;
  205. void begin_transaction()
  206. {
  207. if (begin_transaction_this)
  208. begin_transaction_func(begin_transaction_this);
  209. }
  210. fork get_return_object()
  211. {
  212. return this;
  213. }
  214. };
  215. [[nodiscard]] bool done() const
  216. {
  217. return ! handle_ || handle_.done();
  218. }
  219. auto release() -> std::coroutine_handle<promise_type>
  220. {
  221. return handle_.release();
  222. }
  223. private:
  224. fork(promise_type * pt) : handle_(pt) {}
  225. unique_handle<promise_type> handle_;
  226. };
  227. }
  228. #endif //BOOST_COBALT_DETAIL_FORK_HPP