channel.hpp 9.8 KB


  1. //
  2. // Copyright (c) 2022 Klemens Morgenstern (klemens.morgenstern@gmx.net)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_COBALT_IMPL_CHANNEL_HPP
  8. #define BOOST_COBALT_IMPL_CHANNEL_HPP
  9. #include <boost/cobalt/channel.hpp>
  10. #include <boost/cobalt/result.hpp>
  11. #include <boost/asio/post.hpp>
  12. namespace boost::cobalt
  13. {
  14. #if !defined(BOOST_COBALT_NO_PMR)
  15. template<typename T>
  16. inline channel<T>::channel(
  17. std::size_t limit,
  18. executor executor,
  19. pmr::memory_resource * resource)
  20. : buffer_(limit, pmr::polymorphic_allocator<T>(resource)), executor_(executor) {}
  21. #else
  22. template<typename T>
  23. inline channel<T>::channel(
  24. std::size_t limit,
  25. executor executor)
  26. : buffer_(limit), executor_(executor) {}
  27. #endif
  28. template<typename T>
  29. auto channel<T>::get_executor() -> const executor_type & {return executor_;}
  30. template<typename T>
  31. bool channel<T>::is_open() const {return !is_closed_;}
  32. template<typename T>
  33. channel<T>::~channel()
  34. {
  35. while (!read_queue_.empty())
  36. read_queue_.front().awaited_from.reset();
  37. while (!write_queue_.empty())
  38. write_queue_.front().awaited_from.reset();
  39. }
  40. template<typename T>
  41. void channel<T>::close()
  42. {
  43. is_closed_ = true;
  44. while (!read_queue_.empty())
  45. {
  46. auto & op = read_queue_.front();
  47. op.unlink();
  48. op.cancelled = true;
  49. op.cancel_slot.clear();
  50. if (op.awaited_from)
  51. asio::post(executor_, std::move(op.awaited_from));
  52. }
  53. while (!write_queue_.empty())
  54. {
  55. auto & op = write_queue_.front();
  56. op.unlink();
  57. op.cancelled = true;
  58. op.cancel_slot.clear();
  59. if (op.awaited_from)
  60. asio::post(executor_, std::move(op.awaited_from));
  61. }
  62. }
  63. template<typename T>
  64. struct channel<T>::read_op::cancel_impl
  65. {
  66. read_op * op;
  67. cancel_impl(read_op * op) : op(op) {}
  68. void operator()(asio::cancellation_type)
  69. {
  70. op->cancelled = true;
  71. op->unlink();
  72. if (op->awaited_from)
  73. asio::post(
  74. op->chn->executor_,
  75. std::move(op->awaited_from));
  76. op->cancel_slot.clear();
  77. }
  78. };
  79. template<typename T>
  80. template<typename Promise>
  81. std::coroutine_handle<void> channel<T>::read_op::await_suspend(std::coroutine_handle<Promise> h)
  82. {
  83. if constexpr (requires (Promise p) {p.get_cancellation_slot();})
  84. if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
  85. cancel_slot.emplace<cancel_impl>(this);
  86. if (awaited_from)
  87. boost::throw_exception(std::runtime_error("already-awaited"), loc);
  88. awaited_from.reset(h.address());
  89. // currently nothing to read
  90. if constexpr (requires (Promise p) {p.begin_transaction();})
  91. begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
  92. if (chn->write_queue_.empty())
  93. {
  94. chn->read_queue_.push_back(*this);
  95. return std::noop_coroutine();
  96. }
  97. else
  98. {
  99. cancel_slot.clear();
  100. auto & op = chn->write_queue_.front();
  101. op.transactional_unlink();
  102. op.direct = true;
  103. if constexpr (std::is_copy_constructible_v<T>)
  104. {
  105. if (op.ref.index() == 0)
  106. direct = std::move(*variant2::get<0>(op.ref));
  107. else
  108. direct = *variant2::get<1>(op.ref);
  109. }
  110. else
  111. direct = std::move(*op.ref);
  112. BOOST_ASSERT(op.awaited_from);
  113. asio::post(chn->executor_, std::move(awaited_from));
  114. return op.awaited_from.release();
  115. }
  116. }
  117. template<typename T>
  118. T channel<T>::read_op::await_resume()
  119. {
  120. return await_resume(as_result_tag{}).value(loc);
  121. }
  122. template<typename T>
  123. std::tuple<system::error_code, T> channel<T>::read_op::await_resume(const struct as_tuple_tag &)
  124. {
  125. auto res = await_resume(as_result_tag{});
  126. if (res.has_error())
  127. return {res.error(), T{}};
  128. else
  129. return {system::error_code{}, std::move(*res)};
  130. }
  131. template<typename T>
  132. system::result<T> channel<T>::read_op::await_resume(const struct as_result_tag &)
  133. {
  134. if (cancel_slot.is_connected())
  135. cancel_slot.clear();
  136. if (cancelled)
  137. return {system::in_place_error, asio::error::operation_aborted};
  138. T value = direct ? std::move(*direct) : std::move(chn->buffer_.front());
  139. if (!direct)
  140. chn->buffer_.pop_front();
  141. if (!chn->write_queue_.empty())
  142. {
  143. auto &op = chn->write_queue_.front();
  144. BOOST_ASSERT(chn->read_queue_.empty());
  145. if (op.await_ready())
  146. {
  147. op.transactional_unlink();
  148. BOOST_ASSERT(op.awaited_from);
  149. asio::post(chn->executor_, std::move(op.awaited_from));
  150. }
  151. }
  152. return {system::in_place_value, std::move(value)};
  153. }
  154. template<typename T>
  155. struct channel<T>::write_op::cancel_impl
  156. {
  157. write_op * op;
  158. cancel_impl(write_op * op) : op(op) {}
  159. void operator()(asio::cancellation_type)
  160. {
  161. op->cancelled = true;
  162. op->unlink();
  163. if (op->awaited_from)
  164. asio::post(
  165. op->chn->executor_, std::move(op->awaited_from));
  166. op->cancel_slot.clear();
  167. }
  168. };
  169. template<typename T>
  170. template<typename Promise>
  171. std::coroutine_handle<void> channel<T>::write_op::await_suspend(std::coroutine_handle<Promise> h)
  172. {
  173. if constexpr (requires (Promise p) {p.get_cancellation_slot();})
  174. if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
  175. cancel_slot.emplace<cancel_impl>(this);
  176. awaited_from.reset(h.address());
  177. if constexpr (requires (Promise p) {p.begin_transaction();})
  178. begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
  179. // currently nothing to read
  180. if (chn->read_queue_.empty())
  181. {
  182. chn->write_queue_.push_back(*this);
  183. return std::noop_coroutine();
  184. }
  185. else
  186. {
  187. cancel_slot.clear();
  188. auto & op = chn->read_queue_.front();
  189. op.transactional_unlink();
  190. if constexpr (std::is_copy_constructible_v<T>)
  191. {
  192. if (ref.index() == 0)
  193. op.direct.emplace(std::move(*variant2::get<0>(ref)));
  194. else
  195. op.direct.emplace(*variant2::get<1>(ref));
  196. }
  197. else
  198. op.direct.emplace(std::move(*ref));
  199. BOOST_ASSERT(op.awaited_from);
  200. direct = true;
  201. asio::post(chn->executor_, std::move(awaited_from));
  202. return op.awaited_from.release();
  203. }
  204. }
  205. template<typename T>
  206. std::tuple<system::error_code> channel<T>::write_op::await_resume(const struct as_tuple_tag &)
  207. {
  208. return await_resume(as_result_tag{}).error();
  209. }
  210. template<typename T>
  211. void channel<T>::write_op::await_resume()
  212. {
  213. await_resume(as_result_tag{}).value(loc);
  214. }
  215. template<typename T>
  216. system::result<void> channel<T>::write_op::await_resume(const struct as_result_tag &)
  217. {
  218. if (cancel_slot.is_connected())
  219. cancel_slot.clear();
  220. if (cancelled)
  221. boost::throw_exception(system::system_error(asio::error::operation_aborted), loc);
  222. if (!direct)
  223. {
  224. BOOST_ASSERT(!chn->buffer_.full());
  225. if constexpr (std::is_copy_constructible_v<T>)
  226. {
  227. if (ref.index() == 0)
  228. chn->buffer_.push_back(std::move(*variant2::get<0>(ref)));
  229. else
  230. chn->buffer_.push_back(*variant2::get<1>(ref));
  231. }
  232. else
  233. chn->buffer_.push_back(std::move(*ref));
  234. }
  235. if (!chn->read_queue_.empty())
  236. {
  237. auto & op = chn->read_queue_.front();
  238. BOOST_ASSERT(chn->write_queue_.empty());
  239. if (op.await_ready())
  240. {
  241. op.transactional_unlink();
  242. BOOST_ASSERT(op.awaited_from);
  243. asio::post(chn->executor_, std::move(op.awaited_from));
  244. }
  245. }
  246. return system::in_place_value;
  247. }
  248. struct channel<void>::read_op::cancel_impl
  249. {
  250. read_op * op;
  251. cancel_impl(read_op * op) : op(op) {}
  252. void operator()(asio::cancellation_type)
  253. {
  254. op->cancelled = true;
  255. op->unlink();
  256. asio::post(op->chn->executor_, std::move(op->awaited_from));
  257. op->cancel_slot.clear();
  258. }
  259. };
  260. struct channel<void>::write_op::cancel_impl
  261. {
  262. write_op * op;
  263. cancel_impl(write_op * op) : op(op) {}
  264. void operator()(asio::cancellation_type)
  265. {
  266. op->cancelled = true;
  267. op->unlink();
  268. asio::post(op->chn->executor_, std::move(op->awaited_from));
  269. op->cancel_slot.clear();
  270. }
  271. };
  272. template<typename Promise>
  273. std::coroutine_handle<void> channel<void>::read_op::await_suspend(std::coroutine_handle<Promise> h)
  274. {
  275. if constexpr (requires (Promise p) {p.get_cancellation_slot();})
  276. if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
  277. cancel_slot.emplace<cancel_impl>(this);
  278. awaited_from.reset(h.address());
  279. if constexpr (requires (Promise p) {p.begin_transaction();})
  280. begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
  281. // nothing to read currently, enqueue
  282. if (chn->write_queue_.empty())
  283. {
  284. chn->read_queue_.push_back(*this);
  285. return std::noop_coroutine();
  286. }
  287. else // we're good, we can read, so we'll do that, but we need to post, so we need to initialize a transactin.
  288. {
  289. cancel_slot.clear();
  290. auto & op = chn->write_queue_.front();
  291. op.unlink();
  292. op.direct = true;
  293. BOOST_ASSERT(op.awaited_from);
  294. direct = true;
  295. asio::post(chn->executor_, std::move(awaited_from));
  296. return op.awaited_from.release();
  297. }
  298. }
  299. template<typename Promise>
  300. std::coroutine_handle<void> channel<void>::write_op::await_suspend(std::coroutine_handle<Promise> h)
  301. {
  302. if constexpr (requires (Promise p) {p.get_cancellation_slot();})
  303. if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
  304. cancel_slot.emplace<cancel_impl>(this);
  305. awaited_from.reset(h.address());
  306. // currently nothing to read
  307. if constexpr (requires (Promise p) {p.begin_transaction();})
  308. begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
  309. if (chn->read_queue_.empty())
  310. {
  311. chn->write_queue_.push_back(*this);
  312. return std::noop_coroutine();
  313. }
  314. else
  315. {
  316. cancel_slot.clear();
  317. auto & op = chn->read_queue_.front();
  318. op.unlink();
  319. op.direct = true;
  320. BOOST_ASSERT(op.awaited_from);
  321. direct = true;
  322. asio::post(chn->executor_, std::move(awaited_from));
  323. return op.awaited_from.release();
  324. }
  325. }
  326. }
  327. #endif //BOOST_COBALT_IMPL_CHANNEL_HPP