basic_concurrent_channel.hpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516
  1. //
  2. // experimental/basic_concurrent_channel.hpp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2024 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef BOOST_ASIO_EXPERIMENTAL_BASIC_CONCURRENT_CHANNEL_HPP
  11. #define BOOST_ASIO_EXPERIMENTAL_BASIC_CONCURRENT_CHANNEL_HPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <boost/asio/detail/config.hpp>
  16. #include <boost/asio/detail/non_const_lvalue.hpp>
  17. #include <boost/asio/detail/mutex.hpp>
  18. #include <boost/asio/execution/executor.hpp>
  19. #include <boost/asio/execution_context.hpp>
  20. #include <boost/asio/experimental/detail/channel_send_functions.hpp>
  21. #include <boost/asio/experimental/detail/channel_service.hpp>
  22. #include <boost/asio/detail/push_options.hpp>
  23. namespace boost {
  24. namespace asio {
  25. namespace experimental {
  26. namespace detail {
  27. } // namespace detail
  28. /// A channel for messages.
  29. /**
  30. * The basic_concurrent_channel class template is used for sending messages
  31. * between different parts of the same application. A <em>message</em> is
  32. * defined as a collection of arguments to be passed to a completion handler,
  33. * and the set of messages supported by a channel is specified by its @c Traits
  34. * and <tt>Signatures...</tt> template parameters. Messages may be sent and
  35. * received using asynchronous or non-blocking synchronous operations.
  36. *
  37. * Unless customising the traits, applications will typically use the @c
  38. * experimental::concurrent_channel alias template. For example:
  39. * @code void send_loop(int i, steady_timer& timer,
  40. * concurrent_channel<void(error_code, int)>& ch)
  41. * {
  42. * if (i < 10)
  43. * {
  44. * timer.expires_after(chrono::seconds(1));
  45. * timer.async_wait(
  46. * [i, &timer, &ch](error_code error)
  47. * {
  48. * if (!error)
  49. * {
  50. * ch.async_send(error_code(), i,
  51. * [i, &timer, &ch](error_code error)
  52. * {
  53. * if (!error)
  54. * {
  55. * send_loop(i + 1, timer, ch);
  56. * }
  57. * });
  58. * }
  59. * });
  60. * }
  61. * else
  62. * {
  63. * ch.close();
  64. * }
  65. * }
  66. *
  67. * void receive_loop(concurent_channel<void(error_code, int)>& ch)
  68. * {
  69. * ch.async_receive(
  70. * [&ch](error_code error, int i)
  71. * {
  72. * if (!error)
  73. * {
  74. * std::cout << "Received " << i << "\n";
  75. * receive_loop(ch);
  76. * }
  77. * });
  78. * } @endcode
  79. *
  80. * @par Thread Safety
  81. * @e Distinct @e objects: Safe.@n
  82. * @e Shared @e objects: Safe.
  83. *
  84. * The basic_concurrent_channel class template is thread-safe, and would
  85. * typically be used for passing messages between application code that run on
  86. * different threads. Consider using @ref basic_channel, and its alias template
  87. * @c experimental::channel, to pass messages between code running in a single
  88. * thread or on the same strand.
  89. */
  90. template <typename Executor, typename Traits, typename... Signatures>
  91. class basic_concurrent_channel
  92. #if !defined(GENERATING_DOCUMENTATION)
  93. : public detail::channel_send_functions<
  94. basic_concurrent_channel<Executor, Traits, Signatures...>,
  95. Executor, Signatures...>
  96. #endif // !defined(GENERATING_DOCUMENTATION)
  97. {
  98. private:
  99. class initiate_async_send;
  100. class initiate_async_receive;
  101. typedef detail::channel_service<boost::asio::detail::mutex> service_type;
  102. typedef typename service_type::template implementation_type<
  103. Traits, Signatures...>::payload_type payload_type;
  104. template <typename... PayloadSignatures,
  105. BOOST_ASIO_COMPLETION_TOKEN_FOR(PayloadSignatures...) CompletionToken>
  106. auto do_async_receive(
  107. boost::asio::detail::completion_payload<PayloadSignatures...>*,
  108. CompletionToken&& token)
  109. -> decltype(
  110. async_initiate<CompletionToken, PayloadSignatures...>(
  111. declval<initiate_async_receive>(), token))
  112. {
  113. return async_initiate<CompletionToken, PayloadSignatures...>(
  114. initiate_async_receive(this), token);
  115. }
  116. public:
  117. /// The type of the executor associated with the channel.
  118. typedef Executor executor_type;
  119. /// Rebinds the channel type to another executor.
  120. template <typename Executor1>
  121. struct rebind_executor
  122. {
  123. /// The channel type when rebound to the specified executor.
  124. typedef basic_concurrent_channel<Executor1, Traits, Signatures...> other;
  125. };
  126. /// The traits type associated with the channel.
  127. typedef typename Traits::template rebind<Signatures...>::other traits_type;
  128. /// Construct a basic_concurrent_channel.
  129. /**
  130. * This constructor creates and channel.
  131. *
  132. * @param ex The I/O executor that the channel will use, by default, to
  133. * dispatch handlers for any asynchronous operations performed on the channel.
  134. *
  135. * @param max_buffer_size The maximum number of messages that may be buffered
  136. * in the channel.
  137. */
  138. basic_concurrent_channel(const executor_type& ex,
  139. std::size_t max_buffer_size = 0)
  140. : service_(&boost::asio::use_service<service_type>(
  141. basic_concurrent_channel::get_context(ex))),
  142. impl_(),
  143. executor_(ex)
  144. {
  145. service_->construct(impl_, max_buffer_size);
  146. }
  147. /// Construct and open a basic_concurrent_channel.
  148. /**
  149. * This constructor creates and opens a channel.
  150. *
  151. * @param context An execution context which provides the I/O executor that
  152. * the channel will use, by default, to dispatch handlers for any asynchronous
  153. * operations performed on the channel.
  154. *
  155. * @param max_buffer_size The maximum number of messages that may be buffered
  156. * in the channel.
  157. */
  158. template <typename ExecutionContext>
  159. basic_concurrent_channel(ExecutionContext& context,
  160. std::size_t max_buffer_size = 0,
  161. constraint_t<
  162. is_convertible<ExecutionContext&, execution_context&>::value,
  163. defaulted_constraint
  164. > = defaulted_constraint())
  165. : service_(&boost::asio::use_service<service_type>(context)),
  166. impl_(),
  167. executor_(context.get_executor())
  168. {
  169. service_->construct(impl_, max_buffer_size);
  170. }
  171. /// Move-construct a basic_concurrent_channel from another.
  172. /**
  173. * This constructor moves a channel from one object to another.
  174. *
  175. * @param other The other basic_concurrent_channel object from which the move
  176. * will occur.
  177. *
  178. * @note Following the move, the moved-from object is in the same state as if
  179. * constructed using the @c basic_concurrent_channel(const executor_type&)
  180. * constructor.
  181. */
  182. basic_concurrent_channel(basic_concurrent_channel&& other)
  183. : service_(other.service_),
  184. executor_(other.executor_)
  185. {
  186. service_->move_construct(impl_, other.impl_);
  187. }
  188. /// Move-assign a basic_concurrent_channel from another.
  189. /**
  190. * This assignment operator moves a channel from one object to another.
  191. * Cancels any outstanding asynchronous operations associated with the target
  192. * object.
  193. *
  194. * @param other The other basic_concurrent_channel object from which the move
  195. * will occur.
  196. *
  197. * @note Following the move, the moved-from object is in the same state as if
  198. * constructed using the @c basic_concurrent_channel(const executor_type&)
  199. * constructor.
  200. */
  201. basic_concurrent_channel& operator=(basic_concurrent_channel&& other)
  202. {
  203. if (this != &other)
  204. {
  205. service_->move_assign(impl_, *other.service_, other.impl_);
  206. executor_.~executor_type();
  207. new (&executor_) executor_type(other.executor_);
  208. service_ = other.service_;
  209. }
  210. return *this;
  211. }
  212. // All channels have access to each other's implementations.
  213. template <typename, typename, typename...>
  214. friend class basic_concurrent_channel;
  215. /// Move-construct a basic_concurrent_channel from another.
  216. /**
  217. * This constructor moves a channel from one object to another.
  218. *
  219. * @param other The other basic_concurrent_channel object from which the move
  220. * will occur.
  221. *
  222. * @note Following the move, the moved-from object is in the same state as if
  223. * constructed using the @c basic_concurrent_channel(const executor_type&)
  224. * constructor.
  225. */
  226. template <typename Executor1>
  227. basic_concurrent_channel(
  228. basic_concurrent_channel<Executor1, Traits, Signatures...>&& other,
  229. constraint_t<
  230. is_convertible<Executor1, Executor>::value
  231. > = 0)
  232. : service_(other.service_),
  233. executor_(other.executor_)
  234. {
  235. service_->move_construct(impl_, other.impl_);
  236. }
  237. /// Move-assign a basic_concurrent_channel from another.
  238. /**
  239. * This assignment operator moves a channel from one object to another.
  240. * Cancels any outstanding asynchronous operations associated with the target
  241. * object.
  242. *
  243. * @param other The other basic_concurrent_channel object from which the move
  244. * will occur.
  245. *
  246. * @note Following the move, the moved-from object is in the same state as if
  247. * constructed using the @c basic_concurrent_channel(const executor_type&)
  248. * constructor.
  249. */
  250. template <typename Executor1>
  251. constraint_t<
  252. is_convertible<Executor1, Executor>::value,
  253. basic_concurrent_channel&
  254. > operator=(
  255. basic_concurrent_channel<Executor1, Traits, Signatures...>&& other)
  256. {
  257. if (this != &other)
  258. {
  259. service_->move_assign(impl_, *other.service_, other.impl_);
  260. executor_.~executor_type();
  261. new (&executor_) executor_type(other.executor_);
  262. service_ = other.service_;
  263. }
  264. return *this;
  265. }
  266. /// Destructor.
  267. ~basic_concurrent_channel()
  268. {
  269. service_->destroy(impl_);
  270. }
  271. /// Get the executor associated with the object.
  272. const executor_type& get_executor() noexcept
  273. {
  274. return executor_;
  275. }
  276. /// Get the capacity of the channel's buffer.
  277. std::size_t capacity() noexcept
  278. {
  279. return service_->capacity(impl_);
  280. }
  281. /// Determine whether the channel is open.
  282. bool is_open() const noexcept
  283. {
  284. return service_->is_open(impl_);
  285. }
  286. /// Reset the channel to its initial state.
  287. void reset()
  288. {
  289. service_->reset(impl_);
  290. }
  291. /// Close the channel.
  292. void close()
  293. {
  294. service_->close(impl_);
  295. }
  296. /// Cancel all asynchronous operations waiting on the channel.
  297. /**
  298. * All outstanding send operations will complete with the error
  299. * @c boost::asio::experimental::error::channel_cancelled. Outstanding receive
  300. * operations complete with the result as determined by the channel traits.
  301. */
  302. void cancel()
  303. {
  304. service_->cancel(impl_);
  305. }
  306. /// Determine whether a message can be received without blocking.
  307. bool ready() const noexcept
  308. {
  309. return service_->ready(impl_);
  310. }
  311. #if defined(GENERATING_DOCUMENTATION)
  312. /// Try to send a message without blocking.
  313. /**
  314. * Fails if the buffer is full and there are no waiting receive operations.
  315. *
  316. * @returns @c true on success, @c false on failure.
  317. */
  318. template <typename... Args>
  319. bool try_send(Args&&... args);
  320. /// Try to send a message without blocking, using dispatch semantics to call
  321. /// the receive operation's completion handler.
  322. /**
  323. * Fails if the buffer is full and there are no waiting receive operations.
  324. *
  325. * The receive operation's completion handler may be called from inside this
  326. * function.
  327. *
  328. * @returns @c true on success, @c false on failure.
  329. */
  330. template <typename... Args>
  331. bool try_send_via_dispatch(Args&&... args);
  332. /// Try to send a number of messages without blocking.
  333. /**
  334. * @returns The number of messages that were sent.
  335. */
  336. template <typename... Args>
  337. std::size_t try_send_n(std::size_t count, Args&&... args);
  338. /// Try to send a number of messages without blocking, using dispatch
  339. /// semantics to call the receive operations' completion handlers.
  340. /**
  341. * The receive operations' completion handlers may be called from inside this
  342. * function.
  343. *
  344. * @returns The number of messages that were sent.
  345. */
  346. template <typename... Args>
  347. std::size_t try_send_n_via_dispatch(std::size_t count, Args&&... args);
  348. /// Asynchronously send a message.
  349. template <typename... Args,
  350. BOOST_ASIO_COMPLETION_TOKEN_FOR(void (boost::system::error_code))
  351. CompletionToken BOOST_ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)>
  352. auto async_send(Args&&... args,
  353. CompletionToken&& token);
  354. #endif // defined(GENERATING_DOCUMENTATION)
  355. /// Try to receive a message without blocking.
  356. /**
  357. * Fails if the buffer is full and there are no waiting receive operations.
  358. *
  359. * @returns @c true on success, @c false on failure.
  360. */
  361. template <typename Handler>
  362. bool try_receive(Handler&& handler)
  363. {
  364. return service_->try_receive(impl_, static_cast<Handler&&>(handler));
  365. }
  366. /// Asynchronously receive a message.
  367. template <typename CompletionToken
  368. BOOST_ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)>
  369. auto async_receive(
  370. CompletionToken&& token
  371. BOOST_ASIO_DEFAULT_COMPLETION_TOKEN(Executor))
  372. #if !defined(GENERATING_DOCUMENTATION)
  373. -> decltype(
  374. this->do_async_receive(static_cast<payload_type*>(0),
  375. static_cast<CompletionToken&&>(token)))
  376. #endif // !defined(GENERATING_DOCUMENTATION)
  377. {
  378. return this->do_async_receive(static_cast<payload_type*>(0),
  379. static_cast<CompletionToken&&>(token));
  380. }
  381. private:
  382. // Disallow copying and assignment.
  383. basic_concurrent_channel(
  384. const basic_concurrent_channel&) = delete;
  385. basic_concurrent_channel& operator=(
  386. const basic_concurrent_channel&) = delete;
  387. template <typename, typename, typename...>
  388. friend class detail::channel_send_functions;
  389. // Helper function to get an executor's context.
  390. template <typename T>
  391. static execution_context& get_context(const T& t,
  392. enable_if_t<execution::is_executor<T>::value>* = 0)
  393. {
  394. return boost::asio::query(t, execution::context);
  395. }
  396. // Helper function to get an executor's context.
  397. template <typename T>
  398. static execution_context& get_context(const T& t,
  399. enable_if_t<!execution::is_executor<T>::value>* = 0)
  400. {
  401. return t.context();
  402. }
  403. class initiate_async_send
  404. {
  405. public:
  406. typedef Executor executor_type;
  407. explicit initiate_async_send(basic_concurrent_channel* self)
  408. : self_(self)
  409. {
  410. }
  411. const executor_type& get_executor() const noexcept
  412. {
  413. return self_->get_executor();
  414. }
  415. template <typename SendHandler>
  416. void operator()(SendHandler&& handler,
  417. payload_type&& payload) const
  418. {
  419. boost::asio::detail::non_const_lvalue<SendHandler> handler2(handler);
  420. self_->service_->async_send(self_->impl_,
  421. static_cast<payload_type&&>(payload),
  422. handler2.value, self_->get_executor());
  423. }
  424. private:
  425. basic_concurrent_channel* self_;
  426. };
  427. class initiate_async_receive
  428. {
  429. public:
  430. typedef Executor executor_type;
  431. explicit initiate_async_receive(basic_concurrent_channel* self)
  432. : self_(self)
  433. {
  434. }
  435. const executor_type& get_executor() const noexcept
  436. {
  437. return self_->get_executor();
  438. }
  439. template <typename ReceiveHandler>
  440. void operator()(ReceiveHandler&& handler) const
  441. {
  442. boost::asio::detail::non_const_lvalue<ReceiveHandler> handler2(handler);
  443. self_->service_->async_receive(self_->impl_,
  444. handler2.value, self_->get_executor());
  445. }
  446. private:
  447. basic_concurrent_channel* self_;
  448. };
  449. // The service associated with the I/O object.
  450. service_type* service_;
  451. // The underlying implementation of the I/O object.
  452. typename service_type::template implementation_type<
  453. Traits, Signatures...> impl_;
  454. // The associated executor.
  455. Executor executor_;
  456. };
  457. } // namespace experimental
  458. } // namespace asio
  459. } // namespace boost
  460. #include <boost/asio/detail/pop_options.hpp>
  461. #endif // BOOST_ASIO_EXPERIMENTAL_BASIC_CONCURRENT_CHANNEL_HPP