basic_channel.hpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513
  1. //
  2. // experimental/basic_channel.hpp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef ASIO_EXPERIMENTAL_BASIC_CHANNEL_HPP
  11. #define ASIO_EXPERIMENTAL_BASIC_CHANNEL_HPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include "asio/detail/config.hpp"
  16. #include "asio/detail/non_const_lvalue.hpp"
  17. #include "asio/detail/null_mutex.hpp"
  18. #include "asio/execution/executor.hpp"
  19. #include "asio/execution_context.hpp"
  20. #include "asio/experimental/detail/channel_send_functions.hpp"
  21. #include "asio/experimental/detail/channel_service.hpp"
  22. #include "asio/detail/push_options.hpp"
  23. namespace asio {
  24. namespace experimental {
  25. namespace detail {
  26. } // namespace detail
  27. /// A channel for messages.
  28. /**
  29. * The basic_channel class template is used for sending messages between
  30. * different parts of the same application. A <em>message</em> is defined as a
  31. * collection of arguments to be passed to a completion handler, and the set of
  32. * messages supported by a channel is specified by its @c Traits and
  33. * <tt>Signatures...</tt> template parameters. Messages may be sent and received
  34. * using asynchronous or non-blocking synchronous operations.
  35. *
  36. * Unless customising the traits, applications will typically use the @c
  37. * experimental::channel alias template. For example:
  38. * @code void send_loop(int i, steady_timer& timer,
  39. * channel<void(error_code, int)>& ch)
  40. * {
  41. * if (i < 10)
  42. * {
  43. * timer.expires_after(chrono::seconds(1));
  44. * timer.async_wait(
  45. * [i, &timer, &ch](error_code error)
  46. * {
  47. * if (!error)
  48. * {
  49. * ch.async_send(error_code(), i,
  50. * [i, &timer, &ch](error_code error)
  51. * {
  52. * if (!error)
  53. * {
  54. * send_loop(i + 1, timer, ch);
  55. * }
  56. * });
  57. * }
  58. * });
  59. * }
  60. * else
  61. * {
  62. * ch.close();
  63. * }
  64. * }
  65. *
  66. * void receive_loop(channel<void(error_code, int)>& ch)
  67. * {
  68. * ch.async_receive(
  69. * [&ch](error_code error, int i)
  70. * {
  71. * if (!error)
  72. * {
  73. * std::cout << "Received " << i << "\n";
  74. * receive_loop(ch);
  75. * }
  76. * });
  77. * } @endcode
  78. *
  79. * @par Thread Safety
  80. * @e Distinct @e objects: Safe.@n
  81. * @e Shared @e objects: Unsafe.
  82. *
  83. * The basic_channel class template is not thread-safe, and would typically be
  84. * used for passing messages between application code that runs on the same
  85. * thread or in the same strand. Consider using @ref basic_concurrent_channel,
  86. * and its alias template @c experimental::concurrent_channel, to pass messages
  87. * between code running in different threads.
  88. */
  89. template <typename Executor, typename Traits, typename... Signatures>
  90. class basic_channel
  91. #if !defined(GENERATING_DOCUMENTATION)
  92. : public detail::channel_send_functions<
  93. basic_channel<Executor, Traits, Signatures...>,
  94. Executor, Signatures...>
  95. #endif // !defined(GENERATING_DOCUMENTATION)
  96. {
  97. private:
  98. class initiate_async_send;
  99. class initiate_async_receive;
  100. typedef detail::channel_service<asio::detail::null_mutex> service_type;
  101. typedef typename service_type::template implementation_type<
  102. Traits, Signatures...>::payload_type payload_type;
  103. template <typename... PayloadSignatures,
  104. ASIO_COMPLETION_TOKEN_FOR(PayloadSignatures...) CompletionToken>
  105. auto do_async_receive(detail::channel_payload<PayloadSignatures...>*,
  106. CompletionToken&& token)
  107. -> decltype(
  108. async_initiate<CompletionToken, PayloadSignatures...>(
  109. declval<initiate_async_receive>(), token))
  110. {
  111. return async_initiate<CompletionToken, PayloadSignatures...>(
  112. initiate_async_receive(this), token);
  113. }
  114. public:
  115. /// The type of the executor associated with the channel.
  116. typedef Executor executor_type;
  117. /// Rebinds the channel type to another executor.
  118. template <typename Executor1>
  119. struct rebind_executor
  120. {
  121. /// The channel type when rebound to the specified executor.
  122. typedef basic_channel<Executor1, Traits, Signatures...> other;
  123. };
  124. /// The traits type associated with the channel.
  125. typedef typename Traits::template rebind<Signatures...>::other traits_type;
  126. /// Construct a basic_channel.
  127. /**
  128. * This constructor creates and channel.
  129. *
  130. * @param ex The I/O executor that the channel will use, by default, to
  131. * dispatch handlers for any asynchronous operations performed on the channel.
  132. *
  133. * @param max_buffer_size The maximum number of messages that may be buffered
  134. * in the channel.
  135. */
  136. basic_channel(const executor_type& ex, std::size_t max_buffer_size = 0)
  137. : service_(&asio::use_service<service_type>(
  138. basic_channel::get_context(ex))),
  139. impl_(),
  140. executor_(ex)
  141. {
  142. service_->construct(impl_, max_buffer_size);
  143. }
  144. /// Construct and open a basic_channel.
  145. /**
  146. * This constructor creates and opens a channel.
  147. *
  148. * @param context An execution context which provides the I/O executor that
  149. * the channel will use, by default, to dispatch handlers for any asynchronous
  150. * operations performed on the channel.
  151. *
  152. * @param max_buffer_size The maximum number of messages that may be buffered
  153. * in the channel.
  154. */
  155. template <typename ExecutionContext>
  156. basic_channel(ExecutionContext& context, std::size_t max_buffer_size = 0,
  157. constraint_t<
  158. is_convertible<ExecutionContext&, execution_context&>::value,
  159. defaulted_constraint
  160. > = defaulted_constraint())
  161. : service_(&asio::use_service<service_type>(context)),
  162. impl_(),
  163. executor_(context.get_executor())
  164. {
  165. service_->construct(impl_, max_buffer_size);
  166. }
  167. /// Move-construct a basic_channel from another.
  168. /**
  169. * This constructor moves a channel from one object to another.
  170. *
  171. * @param other The other basic_channel object from which the move will occur.
  172. *
  173. * @note Following the move, the moved-from object is in the same state as if
  174. * constructed using the @c basic_channel(const executor_type&) constructor.
  175. */
  176. basic_channel(basic_channel&& other)
  177. : service_(other.service_),
  178. executor_(other.executor_)
  179. {
  180. service_->move_construct(impl_, other.impl_);
  181. }
  182. /// Move-assign a basic_channel from another.
  183. /**
  184. * This assignment operator moves a channel from one object to another.
  185. * Cancels any outstanding asynchronous operations associated with the target
  186. * object.
  187. *
  188. * @param other The other basic_channel object from which the move will occur.
  189. *
  190. * @note Following the move, the moved-from object is in the same state as if
  191. * constructed using the @c basic_channel(const executor_type&)
  192. * constructor.
  193. */
  194. basic_channel& operator=(basic_channel&& other)
  195. {
  196. if (this != &other)
  197. {
  198. service_->move_assign(impl_, *other.service_, other.impl_);
  199. executor_.~executor_type();
  200. new (&executor_) executor_type(other.executor_);
  201. service_ = other.service_;
  202. }
  203. return *this;
  204. }
  205. // All channels have access to each other's implementations.
  206. template <typename, typename, typename...>
  207. friend class basic_channel;
  208. /// Move-construct a basic_channel from another.
  209. /**
  210. * This constructor moves a channel from one object to another.
  211. *
  212. * @param other The other basic_channel object from which the move will occur.
  213. *
  214. * @note Following the move, the moved-from object is in the same state as if
  215. * constructed using the @c basic_channel(const executor_type&)
  216. * constructor.
  217. */
  218. template <typename Executor1>
  219. basic_channel(
  220. basic_channel<Executor1, Traits, Signatures...>&& other,
  221. constraint_t<
  222. is_convertible<Executor1, Executor>::value
  223. > = 0)
  224. : service_(other.service_),
  225. executor_(other.executor_)
  226. {
  227. service_->move_construct(impl_, other.impl_);
  228. }
  229. /// Move-assign a basic_channel from another.
  230. /**
  231. * This assignment operator moves a channel from one object to another.
  232. * Cancels any outstanding asynchronous operations associated with the target
  233. * object.
  234. *
  235. * @param other The other basic_channel object from which the move will
  236. * occur.
  237. *
  238. * @note Following the move, the moved-from object is in the same state as if
  239. * constructed using the @c basic_channel(const executor_type&)
  240. * constructor.
  241. */
  242. template <typename Executor1>
  243. constraint_t<
  244. is_convertible<Executor1, Executor>::value,
  245. basic_channel&
  246. > operator=(basic_channel<Executor1, Traits, Signatures...>&& other)
  247. {
  248. if (this != &other)
  249. {
  250. service_->move_assign(impl_, *other.service_, other.impl_);
  251. executor_.~executor_type();
  252. new (&executor_) executor_type(other.executor_);
  253. service_ = other.service_;
  254. }
  255. return *this;
  256. }
  257. /// Destructor.
  258. ~basic_channel()
  259. {
  260. service_->destroy(impl_);
  261. }
  262. /// Get the executor associated with the object.
  263. const executor_type& get_executor() noexcept
  264. {
  265. return executor_;
  266. }
  267. /// Get the capacity of the channel's buffer.
  268. std::size_t capacity() noexcept
  269. {
  270. return service_->capacity(impl_);
  271. }
  272. /// Determine whether the channel is open.
  273. bool is_open() const noexcept
  274. {
  275. return service_->is_open(impl_);
  276. }
  277. /// Reset the channel to its initial state.
  278. void reset()
  279. {
  280. service_->reset(impl_);
  281. }
  282. /// Close the channel.
  283. void close()
  284. {
  285. service_->close(impl_);
  286. }
  287. /// Cancel all asynchronous operations waiting on the channel.
  288. /**
  289. * All outstanding send operations will complete with the error
  290. * @c asio::experimental::error::channel_cancelled. Outstanding receive
  291. * operations complete with the result as determined by the channel traits.
  292. */
  293. void cancel()
  294. {
  295. service_->cancel(impl_);
  296. }
  297. /// Determine whether a message can be received without blocking.
  298. bool ready() const noexcept
  299. {
  300. return service_->ready(impl_);
  301. }
  302. #if defined(GENERATING_DOCUMENTATION)
  303. /// Try to send a message without blocking.
  304. /**
  305. * Fails if the buffer is full and there are no waiting receive operations.
  306. *
  307. * @returns @c true on success, @c false on failure.
  308. */
  309. template <typename... Args>
  310. bool try_send(Args&&... args);
  311. /// Try to send a message without blocking, using dispatch semantics to call
  312. /// the receive operation's completion handler.
  313. /**
  314. * Fails if the buffer is full and there are no waiting receive operations.
  315. *
  316. * The receive operation's completion handler may be called from inside this
  317. * function.
  318. *
  319. * @returns @c true on success, @c false on failure.
  320. */
  321. template <typename... Args>
  322. bool try_send_via_dispatch(Args&&... args);
  323. /// Try to send a number of messages without blocking.
  324. /**
  325. * @returns The number of messages that were sent.
  326. */
  327. template <typename... Args>
  328. std::size_t try_send_n(std::size_t count, Args&&... args);
  329. /// Try to send a number of messages without blocking, using dispatch
  330. /// semantics to call the receive operations' completion handlers.
  331. /**
  332. * The receive operations' completion handlers may be called from inside this
  333. * function.
  334. *
  335. * @returns The number of messages that were sent.
  336. */
  337. template <typename... Args>
  338. std::size_t try_send_n_via_dispatch(std::size_t count, Args&&... args);
  339. /// Asynchronously send a message.
  340. /**
  341. * @par Completion Signature
  342. * @code void(asio::error_code) @endcode
  343. */
  344. template <typename... Args,
  345. ASIO_COMPLETION_TOKEN_FOR(void (asio::error_code))
  346. CompletionToken ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)>
  347. auto async_send(Args&&... args,
  348. CompletionToken&& token);
  349. #endif // defined(GENERATING_DOCUMENTATION)
  350. /// Try to receive a message without blocking.
  351. /**
  352. * Fails if the buffer is full and there are no waiting receive operations.
  353. *
  354. * @returns @c true on success, @c false on failure.
  355. */
  356. template <typename Handler>
  357. bool try_receive(Handler&& handler)
  358. {
  359. return service_->try_receive(impl_, static_cast<Handler&&>(handler));
  360. }
  361. /// Asynchronously receive a message.
  362. /**
  363. * @par Completion Signature
  364. * As determined by the <tt>Signatures...</tt> template parameter and the
  365. * channel traits.
  366. */
  367. template <typename CompletionToken
  368. ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)>
  369. auto async_receive(
  370. CompletionToken&& token
  371. ASIO_DEFAULT_COMPLETION_TOKEN(Executor))
  372. #if !defined(GENERATING_DOCUMENTATION)
  373. -> decltype(
  374. this->do_async_receive(static_cast<payload_type*>(0),
  375. static_cast<CompletionToken&&>(token)))
  376. #endif // !defined(GENERATING_DOCUMENTATION)
  377. {
  378. return this->do_async_receive(static_cast<payload_type*>(0),
  379. static_cast<CompletionToken&&>(token));
  380. }
  381. private:
  382. // Disallow copying and assignment.
  383. basic_channel(const basic_channel&) = delete;
  384. basic_channel& operator=(const basic_channel&) = delete;
  385. template <typename, typename, typename...>
  386. friend class detail::channel_send_functions;
  387. // Helper function to get an executor's context.
  388. template <typename T>
  389. static execution_context& get_context(const T& t,
  390. enable_if_t<execution::is_executor<T>::value>* = 0)
  391. {
  392. return asio::query(t, execution::context);
  393. }
  394. // Helper function to get an executor's context.
  395. template <typename T>
  396. static execution_context& get_context(const T& t,
  397. enable_if_t<!execution::is_executor<T>::value>* = 0)
  398. {
  399. return t.context();
  400. }
  401. class initiate_async_send
  402. {
  403. public:
  404. typedef Executor executor_type;
  405. explicit initiate_async_send(basic_channel* self)
  406. : self_(self)
  407. {
  408. }
  409. const executor_type& get_executor() const noexcept
  410. {
  411. return self_->get_executor();
  412. }
  413. template <typename SendHandler>
  414. void operator()(SendHandler&& handler,
  415. payload_type&& payload) const
  416. {
  417. asio::detail::non_const_lvalue<SendHandler> handler2(handler);
  418. self_->service_->async_send(self_->impl_,
  419. static_cast<payload_type&&>(payload),
  420. handler2.value, self_->get_executor());
  421. }
  422. private:
  423. basic_channel* self_;
  424. };
  425. class initiate_async_receive
  426. {
  427. public:
  428. typedef Executor executor_type;
  429. explicit initiate_async_receive(basic_channel* self)
  430. : self_(self)
  431. {
  432. }
  433. const executor_type& get_executor() const noexcept
  434. {
  435. return self_->get_executor();
  436. }
  437. template <typename ReceiveHandler>
  438. void operator()(ReceiveHandler&& handler) const
  439. {
  440. asio::detail::non_const_lvalue<ReceiveHandler> handler2(handler);
  441. self_->service_->async_receive(self_->impl_,
  442. handler2.value, self_->get_executor());
  443. }
  444. private:
  445. basic_channel* self_;
  446. };
  447. // The service associated with the I/O object.
  448. service_type* service_;
  449. // The underlying implementation of the I/O object.
  450. typename service_type::template implementation_type<
  451. Traits, Signatures...> impl_;
  452. // The associated executor.
  453. Executor executor_;
  454. };
  455. } // namespace experimental
  456. } // namespace asio
  457. #include "asio/detail/pop_options.hpp"
  458. #endif // ASIO_EXPERIMENTAL_BASIC_CHANNEL_HPP