spawn.hpp 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. //
  2. // Copyright (c) 2022 Klemens Morgenstern (klemens.morgenstern@gmx.net)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_COBALT_DETAIL_SPAWN_HPP
  8. #define BOOST_COBALT_DETAIL_SPAWN_HPP
  9. #include <boost/cobalt/task.hpp>
  10. #include <boost/asio/dispatch.hpp>
  11. #include <boost/smart_ptr/allocate_unique.hpp>
  12. namespace boost::cobalt
  13. {
  14. template<typename T>
  15. struct task;
  16. }
  17. namespace boost::cobalt::detail
  18. {
  19. struct async_initiate_spawn
  20. {
  21. async_initiate_spawn(executor exec) : exec(exec) {}
  22. using executor_type = executor;
  23. const executor_type & get_executor() const {return exec;}
  24. executor exec;
  25. template<typename Handler, typename T>
  26. void operator()(Handler && h, task<T> a)
  27. {
  28. auto & rec = a.receiver_;
  29. if (rec.done)
  30. return asio::dispatch(
  31. asio::get_associated_immediate_executor(h, exec),
  32. asio::append(std::forward<Handler>(h), rec.exception, rec.exception ? T() : *rec.get_result()));
  33. #if !defined(BOOST_COBALT_NO_PMR)
  34. auto dalloc = pmr::polymorphic_allocator<void>{boost::cobalt::this_thread::get_default_resource()};
  35. auto alloc = asio::get_associated_allocator(h, dalloc);
  36. #else
  37. auto alloc = asio::get_associated_allocator(h);
  38. #endif
  39. auto recs = std::allocate_shared<detail::task_receiver<T>>(alloc, std::move(rec));
  40. auto sl = asio::get_associated_cancellation_slot(h);
  41. if (sl.is_connected())
  42. sl.assign(
  43. [ex = exec, recs](asio::cancellation_type ct)
  44. {
  45. asio::dispatch(ex, [recs, ct] {recs->cancel(ct);});
  46. });
  47. auto p = recs.get();
  48. p->promise->exec.emplace(exec);
  49. p->promise->exec_ = exec;
  50. struct completion_handler
  51. {
  52. using allocator_type = std::decay_t<decltype(alloc)>;
  53. allocator_type get_allocator() const { return alloc_; }
  54. allocator_type alloc_;
  55. using executor_type = std::decay_t<decltype(asio::get_associated_executor(h, exec))>;
  56. const executor_type &get_executor() const { return exec_; }
  57. executor_type exec_;
  58. decltype(recs) r;
  59. Handler handler;
  60. void operator()()
  61. {
  62. auto ex = r->exception;
  63. T rr{};
  64. if (r->result)
  65. rr = std::move(*r->result);
  66. r.reset();
  67. std::move(handler)(ex, std::move(rr));
  68. }
  69. };
  70. p->awaited_from.reset(detail::post_coroutine(
  71. completion_handler{
  72. alloc, asio::get_associated_executor(h, exec), std::move(recs), std::move(h)
  73. }).address());
  74. asio::dispatch(exec, std::coroutine_handle<detail::task_promise<T>>::from_promise(*p->promise));
  75. }
  76. template<typename Handler>
  77. void operator()(Handler && h, task<void> a)
  78. {
  79. if (a.receiver_.done)
  80. return asio::dispatch(
  81. asio::get_associated_immediate_executor(h, exec),
  82. asio::append(std::forward<Handler>(h), a.receiver_.exception));
  83. #if !defined(BOOST_COBALT_NO_PMR)
  84. auto alloc = asio::get_associated_allocator(h, pmr::polymorphic_allocator<void>{boost::cobalt::this_thread::get_default_resource()});
  85. #else
  86. auto alloc = asio::get_associated_allocator(h);
  87. #endif
  88. auto recs = std::allocate_shared<detail::task_receiver<void>>(alloc, std::move(a.receiver_));
  89. if (recs->done)
  90. return asio::dispatch(asio::get_associated_immediate_executor(h, exec),
  91. asio::append(std::forward<Handler>(h), recs->exception));
  92. auto sl = asio::get_associated_cancellation_slot(h);
  93. if (sl.is_connected())
  94. sl.assign(
  95. [ex = exec, recs](asio::cancellation_type ct)
  96. {
  97. asio::dispatch(ex, [recs, ct] {recs->cancel(ct);});
  98. });
  99. auto p = recs.get();
  100. p->promise->exec.emplace(exec);
  101. p->promise->exec_ = exec;
  102. struct completion_handler
  103. {
  104. using allocator_type = std::decay_t<decltype(alloc)>;
  105. const allocator_type &get_allocator() const { return alloc_; }
  106. allocator_type alloc_;
  107. using executor_type = std::decay_t<decltype(asio::get_associated_executor(h, exec))>;
  108. const executor_type &get_executor() const { return exec_; }
  109. executor_type exec_;
  110. decltype(recs) r;
  111. Handler handler;
  112. void operator()()
  113. {
  114. auto ex = r->exception;
  115. r.reset();
  116. std::move(handler)(ex);
  117. }
  118. };
  119. p->awaited_from.reset(detail::post_coroutine(completion_handler{
  120. alloc, asio::get_associated_executor(h, exec), std::move(recs), std::forward<Handler>(h)
  121. }).address());
  122. asio::dispatch(exec, std::coroutine_handle<detail::task_promise<void>>::from_promise(*p->promise));
  123. }
  124. };
  125. }
  126. #endif //BOOST_COBALT_DETAIL_SPAWN_HPP