channel.hpp 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. //
  2. // Copyright (c) 2022 Klemens Morgenstern (klemens.morgenstern@gmx.net)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_COBALT_CHANNEL_HPP
  8. #define BOOST_COBALT_CHANNEL_HPP
  9. #include <boost/cobalt/this_thread.hpp>
  10. #include <boost/cobalt/unique_handle.hpp>
  11. #include <boost/cobalt/detail/util.hpp>
  12. #include <boost/asio/cancellation_signal.hpp>
  13. #include <boost/asio/cancellation_type.hpp>
  14. #include <boost/circular_buffer.hpp>
  15. #include <boost/config.hpp>
  16. #include <boost/intrusive/list.hpp>
  17. #include <boost/variant2/variant.hpp>
  18. #include <optional>
  19. namespace boost::cobalt
  20. {
  21. template<typename T>
  22. struct channel_reader;
  23. // tag::outline[]
  24. template<typename T>
  25. struct channel
  26. {
  27. // end::outline[]
  28. #if defined(BOOST_COBALT_NO_PMR)
  29. channel(std::size_t limit = 0u,
  30. executor executor = this_thread::get_executor());
  31. #else
  32. // tag::outline[]
  33. // create a channel with a buffer limit, executor & resource.
  34. explicit
  35. channel(std::size_t limit = 0u,
  36. executor executor = this_thread::get_executor(),
  37. pmr::memory_resource * resource = this_thread::get_default_resource());
  38. // end::outline[]
  39. #endif
  40. // tag::outline[]
  41. // not movable.
  42. channel(channel && rhs) noexcept = delete;
  43. channel & operator=(channel && lhs) noexcept = delete;
  44. using executor_type = executor;
  45. const executor_type & get_executor();
  46. // Closes the channel
  47. ~channel();
  48. bool is_open() const;
  49. // close the operation, will cancel all pending ops, too
  50. void close();
  51. // end::outline[]
  52. private:
  53. #if !defined(BOOST_COBALT_NO_PMR)
  54. boost::circular_buffer<T, pmr::polymorphic_allocator<T>> buffer_;
  55. #else
  56. boost::circular_buffer<T> buffer_;
  57. #endif
  58. executor_type executor_;
  59. bool is_closed_{false};
  60. struct read_op : intrusive::list_base_hook<intrusive::link_mode<intrusive::auto_unlink> >
  61. {
  62. channel * chn;
  63. boost::source_location loc;
  64. bool cancelled = false;
  65. std::optional<T> direct{};
  66. asio::cancellation_slot cancel_slot{};
  67. unique_handle<void> awaited_from{nullptr};
  68. void (*begin_transaction)(void*) = nullptr;
  69. void transactional_unlink()
  70. {
  71. if (begin_transaction)
  72. begin_transaction(awaited_from.get());
  73. this->unlink();
  74. }
  75. struct cancel_impl;
  76. bool await_ready() { return !chn->buffer_.empty(); }
  77. template<typename Promise>
  78. BOOST_NOINLINE
  79. std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
  80. T await_resume();
  81. std::tuple<system::error_code, T> await_resume(const struct as_tuple_tag & );
  82. system::result<T> await_resume(const struct as_result_tag &);
  83. explicit operator bool() const {return chn && chn->is_open();}
  84. };
  85. struct write_op : intrusive::list_base_hook<intrusive::link_mode<intrusive::auto_unlink> >
  86. {
  87. channel * chn;
  88. using ref_t = std::conditional_t<
  89. std::is_copy_constructible_v<T>,
  90. variant2::variant<T*, const T*>,
  91. T*>;
  92. ref_t ref;
  93. boost::source_location loc;
  94. bool cancelled = false, direct = false;
  95. asio::cancellation_slot cancel_slot{};
  96. unique_handle<void> awaited_from{nullptr};
  97. void (*begin_transaction)(void*) = nullptr;
  98. void transactional_unlink()
  99. {
  100. if (begin_transaction)
  101. begin_transaction(awaited_from.get());
  102. this->unlink();
  103. }
  104. struct cancel_impl;
  105. bool await_ready() { return !chn->buffer_.full(); }
  106. template<typename Promise>
  107. BOOST_NOINLINE
  108. std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
  109. void await_resume();
  110. std::tuple<system::error_code> await_resume(const struct as_tuple_tag & );
  111. system::result<void> await_resume(const struct as_result_tag &);
  112. explicit operator bool() const {return chn && chn->is_open();}
  113. };
  114. boost::intrusive::list<read_op, intrusive::constant_time_size<false> > read_queue_;
  115. boost::intrusive::list<write_op, intrusive::constant_time_size<false> > write_queue_;
  116. public:
  117. read_op read(const boost::source_location & loc = BOOST_CURRENT_LOCATION) {return read_op{{}, this, loc}; }
  118. #if defined(BOOST_WINDOWS_API)
  119. BOOST_NOINLINE
  120. #endif
  121. write_op write(const T && value, const boost::source_location & loc = BOOST_CURRENT_LOCATION)
  122. requires std::is_copy_constructible_v<T>
  123. {
  124. return write_op{{}, this, &value, loc};
  125. }
  126. #if defined(BOOST_WINDOWS_API)
  127. BOOST_NOINLINE
  128. #endif
  129. write_op write(const T & value, const boost::source_location & loc = BOOST_CURRENT_LOCATION)
  130. requires std::is_copy_constructible_v<T>
  131. {
  132. return write_op{{}, this, &value, loc};
  133. }
  134. #if defined(BOOST_WINDOWS_API)
  135. BOOST_NOINLINE
  136. #endif
  137. write_op write( T && value, const boost::source_location & loc = BOOST_CURRENT_LOCATION)
  138. {
  139. return write_op{{}, this, &value, loc};
  140. }
  141. #if defined(BOOST_WINDOWS_API)
  142. BOOST_NOINLINE
  143. #endif
  144. write_op write( T & value, const boost::source_location & loc = BOOST_CURRENT_LOCATION)
  145. {
  146. return write_op{{}, this, &value, loc};
  147. }
  148. /*
  149. // tag::outline[]
  150. // an awaitable that yields T
  151. using __read_op__ = __unspecified__;
  152. // an awaitable that yields void
  153. using __write_op__ = __unspecified__;
  154. // read a value to a channel
  155. __read_op__ read();
  156. // write a value to the channel
  157. __write_op__ write(const T && value);
  158. __write_op__ write(const T & value);
  159. __write_op__ write( T && value);
  160. __write_op__ write( T & value);
  161. // write a value to the channel if T is void
  162. __write_op__ write(); // end::outline[]
  163. */
  164. // tag::outline[]
  165. };
  166. // end::outline[]
  167. template<>
  168. struct channel<void>
  169. {
  170. explicit
  171. channel(std::size_t limit = 0u,
  172. executor executor = this_thread::get_executor())
  173. : limit_(limit), executor_(executor) {}
  174. channel(channel &&) noexcept = delete;
  175. channel & operator=(channel && lhs) noexcept = delete;
  176. using executor_type = executor;
  177. const executor_type & get_executor() {return executor_;}
  178. BOOST_COBALT_DECL ~channel();
  179. bool is_open() const {return !is_closed_;}
  180. BOOST_COBALT_DECL void close();
  181. private:
  182. std::size_t limit_;
  183. std::size_t n_{0u};
  184. executor_type executor_;
  185. bool is_closed_{false};
  186. struct read_op : intrusive::list_base_hook<intrusive::link_mode<intrusive::auto_unlink> >
  187. {
  188. channel * chn;
  189. boost::source_location loc;
  190. bool cancelled = false, direct = false;
  191. asio::cancellation_slot cancel_slot{};
  192. unique_handle<void> awaited_from{nullptr};
  193. void (*begin_transaction)(void*) = nullptr;
  194. void transactional_unlink()
  195. {
  196. if (begin_transaction)
  197. begin_transaction(awaited_from.get());
  198. this->unlink();
  199. }
  200. struct cancel_impl;
  201. bool await_ready() { return (chn->n_ > 0); }
  202. template<typename Promise>
  203. BOOST_NOINLINE
  204. std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
  205. BOOST_COBALT_DECL void await_resume();
  206. BOOST_COBALT_DECL std::tuple<system::error_code> await_resume(const struct as_tuple_tag & );
  207. BOOST_COBALT_DECL system::result<void> await_resume(const struct as_result_tag &);
  208. explicit operator bool() const {return chn && chn->is_open();}
  209. };
  210. struct write_op : intrusive::list_base_hook<intrusive::link_mode<intrusive::auto_unlink> >
  211. {
  212. channel * chn;
  213. boost::source_location loc;
  214. bool cancelled = false, direct = false;
  215. asio::cancellation_slot cancel_slot{};
  216. unique_handle<void> awaited_from{nullptr};
  217. void (*begin_transaction)(void*) = nullptr;
  218. void transactional_unlink()
  219. {
  220. if (begin_transaction)
  221. begin_transaction(awaited_from.get());
  222. this->unlink();
  223. }
  224. struct cancel_impl;
  225. bool await_ready()
  226. {
  227. return chn->n_ < chn->limit_;
  228. }
  229. template<typename Promise>
  230. BOOST_NOINLINE
  231. std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
  232. BOOST_COBALT_DECL void await_resume();
  233. BOOST_COBALT_DECL std::tuple<system::error_code> await_resume(const struct as_tuple_tag & );
  234. BOOST_COBALT_DECL system::result<void> await_resume(const struct as_result_tag &);
  235. explicit operator bool() const {return chn && chn->is_open();}
  236. };
  237. boost::intrusive::list<read_op, intrusive::constant_time_size<false> > read_queue_;
  238. boost::intrusive::list<write_op, intrusive::constant_time_size<false> > write_queue_;
  239. public:
  240. read_op read(const boost::source_location & loc = BOOST_CURRENT_LOCATION) {return read_op{{}, this, loc}; }
  241. write_op write(const boost::source_location & loc = BOOST_CURRENT_LOCATION) {return write_op{{}, this, loc}; }
  242. };
  243. template<typename T>
  244. struct channel_reader
  245. {
  246. channel_reader(channel<T> & chan,
  247. const boost::source_location & loc = BOOST_CURRENT_LOCATION) : chan_(&chan), loc_(loc) {}
  248. auto operator co_await ()
  249. {
  250. return chan_->read(loc_);
  251. }
  252. explicit operator bool () const {return chan_ && chan_->is_open();}
  253. private:
  254. channel<T> * chan_;
  255. boost::source_location loc_;
  256. };
  257. }
  258. #include <boost/cobalt/impl/channel.hpp>
  259. #endif //BOOST_COBALT_CHANNEL_HPP