co_spawn.hpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. //
  2. // impl/co_spawn.hpp
  3. // ~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef ASIO_IMPL_CO_SPAWN_HPP
  11. #define ASIO_IMPL_CO_SPAWN_HPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include "asio/detail/config.hpp"
  16. #include "asio/associated_cancellation_slot.hpp"
  17. #include "asio/awaitable.hpp"
  18. #include "asio/detail/memory.hpp"
  19. #include "asio/detail/recycling_allocator.hpp"
  20. #include "asio/dispatch.hpp"
  21. #include "asio/execution/outstanding_work.hpp"
  22. #include "asio/post.hpp"
  23. #include "asio/prefer.hpp"
  24. #include "asio/use_awaitable.hpp"
  25. #include "asio/detail/push_options.hpp"
  26. namespace asio {
  27. namespace detail {
  28. template <typename Executor, typename = void>
  29. class co_spawn_work_guard
  30. {
  31. public:
  32. typedef decay_t<
  33. prefer_result_t<Executor,
  34. execution::outstanding_work_t::tracked_t
  35. >
  36. > executor_type;
  37. co_spawn_work_guard(const Executor& ex)
  38. : executor_(asio::prefer(ex, execution::outstanding_work.tracked))
  39. {
  40. }
  41. executor_type get_executor() const noexcept
  42. {
  43. return executor_;
  44. }
  45. private:
  46. executor_type executor_;
  47. };
  48. #if !defined(ASIO_NO_TS_EXECUTORS)
  49. template <typename Executor>
  50. struct co_spawn_work_guard<Executor,
  51. enable_if_t<
  52. !execution::is_executor<Executor>::value
  53. >> : executor_work_guard<Executor>
  54. {
  55. co_spawn_work_guard(const Executor& ex)
  56. : executor_work_guard<Executor>(ex)
  57. {
  58. }
  59. };
  60. #endif // !defined(ASIO_NO_TS_EXECUTORS)
  61. template <typename Handler, typename Executor,
  62. typename Function, typename = void>
  63. struct co_spawn_state
  64. {
  65. template <typename H, typename F>
  66. co_spawn_state(H&& h, const Executor& ex, F&& f)
  67. : handler(std::forward<H>(h)),
  68. spawn_work(ex),
  69. handler_work(asio::get_associated_executor(handler, ex)),
  70. function(std::forward<F>(f))
  71. {
  72. }
  73. Handler handler;
  74. co_spawn_work_guard<Executor> spawn_work;
  75. co_spawn_work_guard<associated_executor_t<Handler, Executor>> handler_work;
  76. Function function;
  77. };
  78. template <typename Handler, typename Executor, typename Function>
  79. struct co_spawn_state<Handler, Executor, Function,
  80. enable_if_t<
  81. is_same<
  82. typename associated_executor<Handler,
  83. Executor>::asio_associated_executor_is_unspecialised,
  84. void
  85. >::value
  86. >>
  87. {
  88. template <typename H, typename F>
  89. co_spawn_state(H&& h, const Executor& ex, F&& f)
  90. : handler(std::forward<H>(h)),
  91. handler_work(ex),
  92. function(std::forward<F>(f))
  93. {
  94. }
  95. Handler handler;
  96. co_spawn_work_guard<Executor> handler_work;
  97. Function function;
  98. };
  99. struct co_spawn_dispatch
  100. {
  101. template <typename CompletionToken>
  102. auto operator()(CompletionToken&& token) const
  103. -> decltype(asio::dispatch(std::forward<CompletionToken>(token)))
  104. {
  105. return asio::dispatch(std::forward<CompletionToken>(token));
  106. }
  107. };
  108. struct co_spawn_post
  109. {
  110. template <typename CompletionToken>
  111. auto operator()(CompletionToken&& token) const
  112. -> decltype(asio::post(std::forward<CompletionToken>(token)))
  113. {
  114. return asio::post(std::forward<CompletionToken>(token));
  115. }
  116. };
  117. template <typename T, typename Handler, typename Executor, typename Function>
  118. awaitable<awaitable_thread_entry_point, Executor> co_spawn_entry_point(
  119. awaitable<T, Executor>*, co_spawn_state<Handler, Executor, Function> s)
  120. {
  121. (void) co_await co_spawn_dispatch{};
  122. (co_await awaitable_thread_has_context_switched{}) = false;
  123. std::exception_ptr e = nullptr;
  124. bool done = false;
  125. try
  126. {
  127. T t = co_await s.function();
  128. done = true;
  129. bool switched = (co_await awaitable_thread_has_context_switched{});
  130. if (!switched)
  131. (void) co_await co_spawn_post();
  132. (dispatch)(s.handler_work.get_executor(),
  133. [handler = std::move(s.handler), t = std::move(t)]() mutable
  134. {
  135. std::move(handler)(std::exception_ptr(), std::move(t));
  136. });
  137. co_return;
  138. }
  139. catch (...)
  140. {
  141. if (done)
  142. throw;
  143. e = std::current_exception();
  144. }
  145. bool switched = (co_await awaitable_thread_has_context_switched{});
  146. if (!switched)
  147. (void) co_await co_spawn_post();
  148. (dispatch)(s.handler_work.get_executor(),
  149. [handler = std::move(s.handler), e]() mutable
  150. {
  151. std::move(handler)(e, T());
  152. });
  153. }
  154. template <typename Handler, typename Executor, typename Function>
  155. awaitable<awaitable_thread_entry_point, Executor> co_spawn_entry_point(
  156. awaitable<void, Executor>*, co_spawn_state<Handler, Executor, Function> s)
  157. {
  158. (void) co_await co_spawn_dispatch{};
  159. (co_await awaitable_thread_has_context_switched{}) = false;
  160. std::exception_ptr e = nullptr;
  161. try
  162. {
  163. co_await s.function();
  164. }
  165. catch (...)
  166. {
  167. e = std::current_exception();
  168. }
  169. bool switched = (co_await awaitable_thread_has_context_switched{});
  170. if (!switched)
  171. (void) co_await co_spawn_post();
  172. (dispatch)(s.handler_work.get_executor(),
  173. [handler = std::move(s.handler), e]() mutable
  174. {
  175. std::move(handler)(e);
  176. });
  177. }
  178. template <typename T, typename Executor>
  179. class awaitable_as_function
  180. {
  181. public:
  182. explicit awaitable_as_function(awaitable<T, Executor>&& a)
  183. : awaitable_(std::move(a))
  184. {
  185. }
  186. awaitable<T, Executor> operator()()
  187. {
  188. return std::move(awaitable_);
  189. }
  190. private:
  191. awaitable<T, Executor> awaitable_;
  192. };
  193. template <typename Handler, typename Executor, typename = void>
  194. class co_spawn_cancellation_handler
  195. {
  196. public:
  197. co_spawn_cancellation_handler(const Handler&, const Executor& ex)
  198. : signal_(detail::allocate_shared<cancellation_signal>(
  199. detail::recycling_allocator<cancellation_signal,
  200. detail::thread_info_base::cancellation_signal_tag>())),
  201. ex_(ex)
  202. {
  203. }
  204. cancellation_slot slot()
  205. {
  206. return signal_->slot();
  207. }
  208. void operator()(cancellation_type_t type)
  209. {
  210. shared_ptr<cancellation_signal> sig = signal_;
  211. asio::dispatch(ex_, [sig, type]{ sig->emit(type); });
  212. }
  213. private:
  214. shared_ptr<cancellation_signal> signal_;
  215. Executor ex_;
  216. };
  217. template <typename Handler, typename Executor>
  218. class co_spawn_cancellation_handler<Handler, Executor,
  219. enable_if_t<
  220. is_same<
  221. typename associated_executor<Handler,
  222. Executor>::asio_associated_executor_is_unspecialised,
  223. void
  224. >::value
  225. >>
  226. {
  227. public:
  228. co_spawn_cancellation_handler(const Handler&, const Executor&)
  229. {
  230. }
  231. cancellation_slot slot()
  232. {
  233. return signal_.slot();
  234. }
  235. void operator()(cancellation_type_t type)
  236. {
  237. signal_.emit(type);
  238. }
  239. private:
  240. cancellation_signal signal_;
  241. };
  242. template <typename Executor>
  243. class initiate_co_spawn
  244. {
  245. public:
  246. typedef Executor executor_type;
  247. template <typename OtherExecutor>
  248. explicit initiate_co_spawn(const OtherExecutor& ex)
  249. : ex_(ex)
  250. {
  251. }
  252. executor_type get_executor() const noexcept
  253. {
  254. return ex_;
  255. }
  256. template <typename Handler, typename F>
  257. void operator()(Handler&& handler, F&& f) const
  258. {
  259. typedef result_of_t<F()> awaitable_type;
  260. typedef decay_t<Handler> handler_type;
  261. typedef decay_t<F> function_type;
  262. typedef co_spawn_cancellation_handler<
  263. handler_type, Executor> cancel_handler_type;
  264. auto slot = asio::get_associated_cancellation_slot(handler);
  265. cancel_handler_type* cancel_handler = slot.is_connected()
  266. ? &slot.template emplace<cancel_handler_type>(handler, ex_)
  267. : nullptr;
  268. cancellation_slot proxy_slot(
  269. cancel_handler
  270. ? cancel_handler->slot()
  271. : cancellation_slot());
  272. cancellation_state cancel_state(proxy_slot);
  273. auto a = (co_spawn_entry_point)(static_cast<awaitable_type*>(nullptr),
  274. co_spawn_state<handler_type, Executor, function_type>(
  275. std::forward<Handler>(handler), ex_, std::forward<F>(f)));
  276. awaitable_handler<executor_type, void>(std::move(a),
  277. ex_, proxy_slot, cancel_state).launch();
  278. }
  279. private:
  280. Executor ex_;
  281. };
  282. } // namespace detail
  283. template <typename Executor, typename T, typename AwaitableExecutor,
  284. ASIO_COMPLETION_TOKEN_FOR(
  285. void(std::exception_ptr, T)) CompletionToken>
  286. inline ASIO_INITFN_AUTO_RESULT_TYPE(
  287. CompletionToken, void(std::exception_ptr, T))
  288. co_spawn(const Executor& ex,
  289. awaitable<T, AwaitableExecutor> a, CompletionToken&& token,
  290. constraint_t<
  291. (is_executor<Executor>::value || execution::is_executor<Executor>::value)
  292. && is_convertible<Executor, AwaitableExecutor>::value
  293. >)
  294. {
  295. return async_initiate<CompletionToken, void(std::exception_ptr, T)>(
  296. detail::initiate_co_spawn<AwaitableExecutor>(AwaitableExecutor(ex)),
  297. token, detail::awaitable_as_function<T, AwaitableExecutor>(std::move(a)));
  298. }
  299. template <typename Executor, typename AwaitableExecutor,
  300. ASIO_COMPLETION_TOKEN_FOR(
  301. void(std::exception_ptr)) CompletionToken>
  302. inline ASIO_INITFN_AUTO_RESULT_TYPE(
  303. CompletionToken, void(std::exception_ptr))
  304. co_spawn(const Executor& ex,
  305. awaitable<void, AwaitableExecutor> a, CompletionToken&& token,
  306. constraint_t<
  307. (is_executor<Executor>::value || execution::is_executor<Executor>::value)
  308. && is_convertible<Executor, AwaitableExecutor>::value
  309. >)
  310. {
  311. return async_initiate<CompletionToken, void(std::exception_ptr)>(
  312. detail::initiate_co_spawn<AwaitableExecutor>(AwaitableExecutor(ex)),
  313. token, detail::awaitable_as_function<
  314. void, AwaitableExecutor>(std::move(a)));
  315. }
  316. template <typename ExecutionContext, typename T, typename AwaitableExecutor,
  317. ASIO_COMPLETION_TOKEN_FOR(
  318. void(std::exception_ptr, T)) CompletionToken>
  319. inline ASIO_INITFN_AUTO_RESULT_TYPE(
  320. CompletionToken, void(std::exception_ptr, T))
  321. co_spawn(ExecutionContext& ctx,
  322. awaitable<T, AwaitableExecutor> a, CompletionToken&& token,
  323. constraint_t<
  324. is_convertible<ExecutionContext&, execution_context&>::value
  325. && is_convertible<typename ExecutionContext::executor_type,
  326. AwaitableExecutor>::value
  327. >)
  328. {
  329. return (co_spawn)(ctx.get_executor(), std::move(a),
  330. std::forward<CompletionToken>(token));
  331. }
  332. template <typename ExecutionContext, typename AwaitableExecutor,
  333. ASIO_COMPLETION_TOKEN_FOR(
  334. void(std::exception_ptr)) CompletionToken>
  335. inline ASIO_INITFN_AUTO_RESULT_TYPE(
  336. CompletionToken, void(std::exception_ptr))
  337. co_spawn(ExecutionContext& ctx,
  338. awaitable<void, AwaitableExecutor> a, CompletionToken&& token,
  339. constraint_t<
  340. is_convertible<ExecutionContext&, execution_context&>::value
  341. && is_convertible<typename ExecutionContext::executor_type,
  342. AwaitableExecutor>::value
  343. >)
  344. {
  345. return (co_spawn)(ctx.get_executor(), std::move(a),
  346. std::forward<CompletionToken>(token));
  347. }
  348. template <typename Executor, typename F,
  349. ASIO_COMPLETION_TOKEN_FOR(typename detail::awaitable_signature<
  350. result_of_t<F()>>::type) CompletionToken>
  351. inline ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken,
  352. typename detail::awaitable_signature<result_of_t<F()>>::type)
  353. co_spawn(const Executor& ex, F&& f, CompletionToken&& token,
  354. constraint_t<
  355. is_executor<Executor>::value || execution::is_executor<Executor>::value
  356. >)
  357. {
  358. return async_initiate<CompletionToken,
  359. typename detail::awaitable_signature<result_of_t<F()>>::type>(
  360. detail::initiate_co_spawn<
  361. typename result_of_t<F()>::executor_type>(ex),
  362. token, std::forward<F>(f));
  363. }
  364. template <typename ExecutionContext, typename F,
  365. ASIO_COMPLETION_TOKEN_FOR(typename detail::awaitable_signature<
  366. result_of_t<F()>>::type) CompletionToken>
  367. inline ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken,
  368. typename detail::awaitable_signature<result_of_t<F()>>::type)
  369. co_spawn(ExecutionContext& ctx, F&& f, CompletionToken&& token,
  370. constraint_t<
  371. is_convertible<ExecutionContext&, execution_context&>::value
  372. >)
  373. {
  374. return (co_spawn)(ctx.get_executor(), std::forward<F>(f),
  375. std::forward<CompletionToken>(token));
  376. }
  377. } // namespace asio
  378. #include "asio/detail/pop_options.hpp"
  379. #endif // ASIO_IMPL_CO_SPAWN_HPP