thread.hpp 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. //
  2. // Copyright (c) 2022 Klemens Morgenstern (klemens.morgenstern@gmx.net)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_COBALT_DETAIL_THREAD_HPP
  8. #define BOOST_COBALT_DETAIL_THREAD_HPP
  9. #include <boost/cobalt/config.hpp>
  10. #include <boost/cobalt/detail/forward_cancellation.hpp>
  11. #include <boost/cobalt/detail/handler.hpp>
  12. #include <boost/cobalt/concepts.hpp>
  13. #include <boost/cobalt/op.hpp>
  14. #include <boost/cobalt/this_coro.hpp>
  15. #include <boost/asio/cancellation_signal.hpp>
  16. #include <thread>
  17. namespace boost::cobalt
  18. {
  19. struct as_tuple_tag;
  20. struct as_result_tag;
  21. namespace detail
  22. {
  23. struct thread_promise;
  24. }
  25. struct thread;
  26. namespace detail
  27. {
  28. struct signal_helper_2
  29. {
  30. asio::cancellation_signal signal;
  31. };
  32. struct thread_state
  33. {
  34. asio::io_context ctx{1u};
  35. asio::cancellation_signal signal;
  36. std::mutex mtx;
  37. std::optional<completion_handler<std::exception_ptr>> waitor;
  38. std::atomic<bool> done = false;
  39. };
  40. struct thread_promise : signal_helper_2,
  41. promise_cancellation_base<asio::cancellation_slot, asio::enable_total_cancellation>,
  42. promise_throw_if_cancelled_base,
  43. enable_awaitables<thread_promise>,
  44. enable_await_allocator<thread_promise>,
  45. enable_await_executor<thread_promise>,
  46. enable_await_deferred
  47. {
  48. BOOST_COBALT_DECL thread_promise();
  49. struct initial_awaitable
  50. {
  51. bool await_ready() const {return false;}
  52. void await_suspend(std::coroutine_handle<thread_promise> h)
  53. {
  54. h.promise().mtx.unlock();
  55. }
  56. void await_resume() {}
  57. };
  58. auto initial_suspend() noexcept
  59. {
  60. return initial_awaitable{};
  61. }
  62. std::suspend_never final_suspend() noexcept
  63. {
  64. wexec_.reset();
  65. return {};
  66. }
  67. #if !defined(BOOST_NO_EXCEPTIONS)
  68. void unhandled_exception() { throw; }
  69. #endif
  70. void return_void() { }
  71. using executor_type = typename cobalt::executor;
  72. const executor_type & get_executor() const {return *exec_;}
  73. #if !defined(BOOST_COBALT_NO_PMR)
  74. using allocator_type = pmr::polymorphic_allocator<void>;
  75. using resource_type = pmr::unsynchronized_pool_resource;
  76. resource_type * resource;
  77. allocator_type get_allocator() const { return allocator_type(resource); }
  78. #endif
  79. using promise_cancellation_base<asio::cancellation_slot, asio::enable_total_cancellation>::await_transform;
  80. using promise_throw_if_cancelled_base::await_transform;
  81. using enable_awaitables<thread_promise>::await_transform;
  82. using enable_await_allocator<thread_promise>::await_transform;
  83. using enable_await_executor<thread_promise>::await_transform;
  84. using enable_await_deferred::await_transform;
  85. BOOST_COBALT_DECL
  86. boost::cobalt::thread get_return_object();
  87. void set_executor(asio::io_context::executor_type exec)
  88. {
  89. wexec_.emplace(exec);
  90. exec_.emplace(exec);
  91. }
  92. std::mutex mtx;
  93. private:
  94. std::optional<asio::executor_work_guard<asio::io_context::executor_type>> wexec_;
  95. std::optional<cobalt::executor> exec_;
  96. };
  97. struct thread_awaitable
  98. {
  99. asio::cancellation_slot cl;
  100. std::optional<std::tuple<std::exception_ptr>> res;
  101. bool await_ready(const boost::source_location & loc = BOOST_CURRENT_LOCATION) const
  102. {
  103. if (state_ == nullptr)
  104. boost::throw_exception(std::invalid_argument("Thread expired"), loc);
  105. std::lock_guard<std::mutex> lock{state_->mtx};
  106. return state_->done;
  107. }
  108. template<typename Promise>
  109. bool await_suspend(std::coroutine_handle<Promise> h)
  110. {
  111. BOOST_ASSERT(state_);
  112. std::lock_guard<std::mutex> lock{state_->mtx};
  113. if (state_->done)
  114. return false;
  115. if constexpr (requires (Promise p) {p.get_cancellation_slot();})
  116. if ((cl = h.promise().get_cancellation_slot()).is_connected())
  117. {
  118. cl.assign(
  119. [st = state_](asio::cancellation_type type)
  120. {
  121. std::lock_guard<std::mutex> lock{st->mtx};
  122. asio::post(st->ctx,
  123. [st, type]
  124. {
  125. BOOST_ASIO_HANDLER_LOCATION((__FILE__, __LINE__, __func__));
  126. st->signal.emit(type);
  127. });
  128. });
  129. }
  130. state_->waitor.emplace(h, res);
  131. return true;
  132. }
  133. void await_resume()
  134. {
  135. if (cl.is_connected())
  136. cl.clear();
  137. if (thread_)
  138. thread_->join();
  139. if (!res) // await_ready
  140. return;
  141. if (auto ee = std::get<0>(*res))
  142. std::rethrow_exception(ee);
  143. }
  144. system::result<void, std::exception_ptr> await_resume(const as_result_tag &)
  145. {
  146. if (cl.is_connected())
  147. cl.clear();
  148. if (thread_)
  149. thread_->join();
  150. if (!res) // await_ready
  151. return {system::in_place_value};
  152. if (auto ee = std::get<0>(*res))
  153. return {system::in_place_error, std::move(ee)};
  154. return {system::in_place_value};
  155. }
  156. std::tuple<std::exception_ptr> await_resume(const as_tuple_tag &)
  157. {
  158. if (cl.is_connected())
  159. cl.clear();
  160. if (thread_)
  161. thread_->join();
  162. return std::get<0>(*res);
  163. }
  164. explicit thread_awaitable(std::shared_ptr<detail::thread_state> state)
  165. : state_(std::move(state)) {}
  166. explicit thread_awaitable(std::thread thread,
  167. std::shared_ptr<detail::thread_state> state)
  168. : thread_(std::move(thread)), state_(std::move(state)) {}
  169. private:
  170. std::optional<std::thread> thread_;
  171. std::shared_ptr<detail::thread_state> state_;
  172. };
  173. }
  174. }
  175. #endif //BOOST_COBALT_DETAIL_THREAD_HPP