basic_concurrent_channel.hpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513
  1. //
  2. // experimental/basic_concurrent_channel.hpp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef ASIO_EXPERIMENTAL_BASIC_CONCURRENT_CHANNEL_HPP
  11. #define ASIO_EXPERIMENTAL_BASIC_CONCURRENT_CHANNEL_HPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include "asio/detail/config.hpp"
  16. #include "asio/detail/non_const_lvalue.hpp"
  17. #include "asio/detail/mutex.hpp"
  18. #include "asio/execution/executor.hpp"
  19. #include "asio/execution_context.hpp"
  20. #include "asio/experimental/detail/channel_send_functions.hpp"
  21. #include "asio/experimental/detail/channel_service.hpp"
  22. #include "asio/detail/push_options.hpp"
  23. namespace asio {
  24. namespace experimental {
  25. namespace detail {
  26. } // namespace detail
  27. /// A channel for messages.
  28. /**
  29. * The basic_concurrent_channel class template is used for sending messages
  30. * between different parts of the same application. A <em>message</em> is
  31. * defined as a collection of arguments to be passed to a completion handler,
  32. * and the set of messages supported by a channel is specified by its @c Traits
  33. * and <tt>Signatures...</tt> template parameters. Messages may be sent and
  34. * received using asynchronous or non-blocking synchronous operations.
  35. *
  36. * Unless customising the traits, applications will typically use the @c
  37. * experimental::concurrent_channel alias template. For example:
  38. * @code void send_loop(int i, steady_timer& timer,
  39. * concurrent_channel<void(error_code, int)>& ch)
  40. * {
  41. * if (i < 10)
  42. * {
  43. * timer.expires_after(chrono::seconds(1));
  44. * timer.async_wait(
  45. * [i, &timer, &ch](error_code error)
  46. * {
  47. * if (!error)
  48. * {
  49. * ch.async_send(error_code(), i,
  50. * [i, &timer, &ch](error_code error)
  51. * {
  52. * if (!error)
  53. * {
  54. * send_loop(i + 1, timer, ch);
  55. * }
  56. * });
  57. * }
  58. * });
  59. * }
  60. * else
  61. * {
  62. * ch.close();
  63. * }
  64. * }
  65. *
  66. * void receive_loop(concurent_channel<void(error_code, int)>& ch)
  67. * {
  68. * ch.async_receive(
  69. * [&ch](error_code error, int i)
  70. * {
  71. * if (!error)
  72. * {
  73. * std::cout << "Received " << i << "\n";
  74. * receive_loop(ch);
  75. * }
  76. * });
  77. * } @endcode
  78. *
  79. * @par Thread Safety
  80. * @e Distinct @e objects: Safe.@n
  81. * @e Shared @e objects: Safe.
  82. *
  83. * The basic_concurrent_channel class template is thread-safe, and would
  84. * typically be used for passing messages between application code that run on
  85. * different threads. Consider using @ref basic_channel, and its alias template
  86. * @c experimental::channel, to pass messages between code running in a single
  87. * thread or on the same strand.
  88. */
  89. template <typename Executor, typename Traits, typename... Signatures>
  90. class basic_concurrent_channel
  91. #if !defined(GENERATING_DOCUMENTATION)
  92. : public detail::channel_send_functions<
  93. basic_concurrent_channel<Executor, Traits, Signatures...>,
  94. Executor, Signatures...>
  95. #endif // !defined(GENERATING_DOCUMENTATION)
  96. {
  97. private:
  98. class initiate_async_send;
  99. class initiate_async_receive;
  100. typedef detail::channel_service<asio::detail::mutex> service_type;
  101. typedef typename service_type::template implementation_type<
  102. Traits, Signatures...>::payload_type payload_type;
  103. template <typename... PayloadSignatures,
  104. ASIO_COMPLETION_TOKEN_FOR(PayloadSignatures...) CompletionToken>
  105. auto do_async_receive(detail::channel_payload<PayloadSignatures...>*,
  106. CompletionToken&& token)
  107. -> decltype(
  108. async_initiate<CompletionToken, PayloadSignatures...>(
  109. declval<initiate_async_receive>(), token))
  110. {
  111. return async_initiate<CompletionToken, PayloadSignatures...>(
  112. initiate_async_receive(this), token);
  113. }
  114. public:
  115. /// The type of the executor associated with the channel.
  116. typedef Executor executor_type;
  117. /// Rebinds the channel type to another executor.
  118. template <typename Executor1>
  119. struct rebind_executor
  120. {
  121. /// The channel type when rebound to the specified executor.
  122. typedef basic_concurrent_channel<Executor1, Traits, Signatures...> other;
  123. };
  124. /// The traits type associated with the channel.
  125. typedef typename Traits::template rebind<Signatures...>::other traits_type;
  126. /// Construct a basic_concurrent_channel.
  127. /**
  128. * This constructor creates and channel.
  129. *
  130. * @param ex The I/O executor that the channel will use, by default, to
  131. * dispatch handlers for any asynchronous operations performed on the channel.
  132. *
  133. * @param max_buffer_size The maximum number of messages that may be buffered
  134. * in the channel.
  135. */
  136. basic_concurrent_channel(const executor_type& ex,
  137. std::size_t max_buffer_size = 0)
  138. : service_(&asio::use_service<service_type>(
  139. basic_concurrent_channel::get_context(ex))),
  140. impl_(),
  141. executor_(ex)
  142. {
  143. service_->construct(impl_, max_buffer_size);
  144. }
  145. /// Construct and open a basic_concurrent_channel.
  146. /**
  147. * This constructor creates and opens a channel.
  148. *
  149. * @param context An execution context which provides the I/O executor that
  150. * the channel will use, by default, to dispatch handlers for any asynchronous
  151. * operations performed on the channel.
  152. *
  153. * @param max_buffer_size The maximum number of messages that may be buffered
  154. * in the channel.
  155. */
  156. template <typename ExecutionContext>
  157. basic_concurrent_channel(ExecutionContext& context,
  158. std::size_t max_buffer_size = 0,
  159. constraint_t<
  160. is_convertible<ExecutionContext&, execution_context&>::value,
  161. defaulted_constraint
  162. > = defaulted_constraint())
  163. : service_(&asio::use_service<service_type>(context)),
  164. impl_(),
  165. executor_(context.get_executor())
  166. {
  167. service_->construct(impl_, max_buffer_size);
  168. }
  169. /// Move-construct a basic_concurrent_channel from another.
  170. /**
  171. * This constructor moves a channel from one object to another.
  172. *
  173. * @param other The other basic_concurrent_channel object from which the move
  174. * will occur.
  175. *
  176. * @note Following the move, the moved-from object is in the same state as if
  177. * constructed using the @c basic_concurrent_channel(const executor_type&)
  178. * constructor.
  179. */
  180. basic_concurrent_channel(basic_concurrent_channel&& other)
  181. : service_(other.service_),
  182. executor_(other.executor_)
  183. {
  184. service_->move_construct(impl_, other.impl_);
  185. }
  186. /// Move-assign a basic_concurrent_channel from another.
  187. /**
  188. * This assignment operator moves a channel from one object to another.
  189. * Cancels any outstanding asynchronous operations associated with the target
  190. * object.
  191. *
  192. * @param other The other basic_concurrent_channel object from which the move
  193. * will occur.
  194. *
  195. * @note Following the move, the moved-from object is in the same state as if
  196. * constructed using the @c basic_concurrent_channel(const executor_type&)
  197. * constructor.
  198. */
  199. basic_concurrent_channel& operator=(basic_concurrent_channel&& other)
  200. {
  201. if (this != &other)
  202. {
  203. service_->move_assign(impl_, *other.service_, other.impl_);
  204. executor_.~executor_type();
  205. new (&executor_) executor_type(other.executor_);
  206. service_ = other.service_;
  207. }
  208. return *this;
  209. }
  210. // All channels have access to each other's implementations.
  211. template <typename, typename, typename...>
  212. friend class basic_concurrent_channel;
  213. /// Move-construct a basic_concurrent_channel from another.
  214. /**
  215. * This constructor moves a channel from one object to another.
  216. *
  217. * @param other The other basic_concurrent_channel object from which the move
  218. * will occur.
  219. *
  220. * @note Following the move, the moved-from object is in the same state as if
  221. * constructed using the @c basic_concurrent_channel(const executor_type&)
  222. * constructor.
  223. */
  224. template <typename Executor1>
  225. basic_concurrent_channel(
  226. basic_concurrent_channel<Executor1, Traits, Signatures...>&& other,
  227. constraint_t<
  228. is_convertible<Executor1, Executor>::value
  229. > = 0)
  230. : service_(other.service_),
  231. executor_(other.executor_)
  232. {
  233. service_->move_construct(impl_, other.impl_);
  234. }
  235. /// Move-assign a basic_concurrent_channel from another.
  236. /**
  237. * This assignment operator moves a channel from one object to another.
  238. * Cancels any outstanding asynchronous operations associated with the target
  239. * object.
  240. *
  241. * @param other The other basic_concurrent_channel object from which the move
  242. * will occur.
  243. *
  244. * @note Following the move, the moved-from object is in the same state as if
  245. * constructed using the @c basic_concurrent_channel(const executor_type&)
  246. * constructor.
  247. */
  248. template <typename Executor1>
  249. constraint_t<
  250. is_convertible<Executor1, Executor>::value,
  251. basic_concurrent_channel&
  252. > operator=(
  253. basic_concurrent_channel<Executor1, Traits, Signatures...>&& other)
  254. {
  255. if (this != &other)
  256. {
  257. service_->move_assign(impl_, *other.service_, other.impl_);
  258. executor_.~executor_type();
  259. new (&executor_) executor_type(other.executor_);
  260. service_ = other.service_;
  261. }
  262. return *this;
  263. }
  264. /// Destructor.
  265. ~basic_concurrent_channel()
  266. {
  267. service_->destroy(impl_);
  268. }
  269. /// Get the executor associated with the object.
  270. const executor_type& get_executor() noexcept
  271. {
  272. return executor_;
  273. }
  274. /// Get the capacity of the channel's buffer.
  275. std::size_t capacity() noexcept
  276. {
  277. return service_->capacity(impl_);
  278. }
  279. /// Determine whether the channel is open.
  280. bool is_open() const noexcept
  281. {
  282. return service_->is_open(impl_);
  283. }
  284. /// Reset the channel to its initial state.
  285. void reset()
  286. {
  287. service_->reset(impl_);
  288. }
  289. /// Close the channel.
  290. void close()
  291. {
  292. service_->close(impl_);
  293. }
  294. /// Cancel all asynchronous operations waiting on the channel.
  295. /**
  296. * All outstanding send operations will complete with the error
  297. * @c asio::experimental::error::channel_cancelled. Outstanding receive
  298. * operations complete with the result as determined by the channel traits.
  299. */
  300. void cancel()
  301. {
  302. service_->cancel(impl_);
  303. }
  304. /// Determine whether a message can be received without blocking.
  305. bool ready() const noexcept
  306. {
  307. return service_->ready(impl_);
  308. }
  309. #if defined(GENERATING_DOCUMENTATION)
  310. /// Try to send a message without blocking.
  311. /**
  312. * Fails if the buffer is full and there are no waiting receive operations.
  313. *
  314. * @returns @c true on success, @c false on failure.
  315. */
  316. template <typename... Args>
  317. bool try_send(Args&&... args);
  318. /// Try to send a message without blocking, using dispatch semantics to call
  319. /// the receive operation's completion handler.
  320. /**
  321. * Fails if the buffer is full and there are no waiting receive operations.
  322. *
  323. * The receive operation's completion handler may be called from inside this
  324. * function.
  325. *
  326. * @returns @c true on success, @c false on failure.
  327. */
  328. template <typename... Args>
  329. bool try_send_via_dispatch(Args&&... args);
  330. /// Try to send a number of messages without blocking.
  331. /**
  332. * @returns The number of messages that were sent.
  333. */
  334. template <typename... Args>
  335. std::size_t try_send_n(std::size_t count, Args&&... args);
  336. /// Try to send a number of messages without blocking, using dispatch
  337. /// semantics to call the receive operations' completion handlers.
  338. /**
  339. * The receive operations' completion handlers may be called from inside this
  340. * function.
  341. *
  342. * @returns The number of messages that were sent.
  343. */
  344. template <typename... Args>
  345. std::size_t try_send_n_via_dispatch(std::size_t count, Args&&... args);
  346. /// Asynchronously send a message.
  347. template <typename... Args,
  348. ASIO_COMPLETION_TOKEN_FOR(void (asio::error_code))
  349. CompletionToken ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)>
  350. auto async_send(Args&&... args,
  351. CompletionToken&& token);
  352. #endif // defined(GENERATING_DOCUMENTATION)
  353. /// Try to receive a message without blocking.
  354. /**
  355. * Fails if the buffer is full and there are no waiting receive operations.
  356. *
  357. * @returns @c true on success, @c false on failure.
  358. */
  359. template <typename Handler>
  360. bool try_receive(Handler&& handler)
  361. {
  362. return service_->try_receive(impl_, static_cast<Handler&&>(handler));
  363. }
  364. /// Asynchronously receive a message.
  365. template <typename CompletionToken
  366. ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)>
  367. auto async_receive(
  368. CompletionToken&& token
  369. ASIO_DEFAULT_COMPLETION_TOKEN(Executor))
  370. #if !defined(GENERATING_DOCUMENTATION)
  371. -> decltype(
  372. this->do_async_receive(static_cast<payload_type*>(0),
  373. static_cast<CompletionToken&&>(token)))
  374. #endif // !defined(GENERATING_DOCUMENTATION)
  375. {
  376. return this->do_async_receive(static_cast<payload_type*>(0),
  377. static_cast<CompletionToken&&>(token));
  378. }
  379. private:
  380. // Disallow copying and assignment.
  381. basic_concurrent_channel(
  382. const basic_concurrent_channel&) = delete;
  383. basic_concurrent_channel& operator=(
  384. const basic_concurrent_channel&) = delete;
  385. template <typename, typename, typename...>
  386. friend class detail::channel_send_functions;
  387. // Helper function to get an executor's context.
  388. template <typename T>
  389. static execution_context& get_context(const T& t,
  390. enable_if_t<execution::is_executor<T>::value>* = 0)
  391. {
  392. return asio::query(t, execution::context);
  393. }
  394. // Helper function to get an executor's context.
  395. template <typename T>
  396. static execution_context& get_context(const T& t,
  397. enable_if_t<!execution::is_executor<T>::value>* = 0)
  398. {
  399. return t.context();
  400. }
  401. class initiate_async_send
  402. {
  403. public:
  404. typedef Executor executor_type;
  405. explicit initiate_async_send(basic_concurrent_channel* self)
  406. : self_(self)
  407. {
  408. }
  409. const executor_type& get_executor() const noexcept
  410. {
  411. return self_->get_executor();
  412. }
  413. template <typename SendHandler>
  414. void operator()(SendHandler&& handler,
  415. payload_type&& payload) const
  416. {
  417. asio::detail::non_const_lvalue<SendHandler> handler2(handler);
  418. self_->service_->async_send(self_->impl_,
  419. static_cast<payload_type&&>(payload),
  420. handler2.value, self_->get_executor());
  421. }
  422. private:
  423. basic_concurrent_channel* self_;
  424. };
  425. class initiate_async_receive
  426. {
  427. public:
  428. typedef Executor executor_type;
  429. explicit initiate_async_receive(basic_concurrent_channel* self)
  430. : self_(self)
  431. {
  432. }
  433. const executor_type& get_executor() const noexcept
  434. {
  435. return self_->get_executor();
  436. }
  437. template <typename ReceiveHandler>
  438. void operator()(ReceiveHandler&& handler) const
  439. {
  440. asio::detail::non_const_lvalue<ReceiveHandler> handler2(handler);
  441. self_->service_->async_receive(self_->impl_,
  442. handler2.value, self_->get_executor());
  443. }
  444. private:
  445. basic_concurrent_channel* self_;
  446. };
  447. // The service associated with the I/O object.
  448. service_type* service_;
  449. // The underlying implementation of the I/O object.
  450. typename service_type::template implementation_type<
  451. Traits, Signatures...> impl_;
  452. // The associated executor.
  453. Executor executor_;
  454. };
  455. } // namespace experimental
  456. } // namespace asio
  457. #include "asio/detail/pop_options.hpp"
  458. #endif // ASIO_EXPERIMENTAL_BASIC_CONCURRENT_CHANNEL_HPP