| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513 | //// experimental/basic_concurrent_channel.hpp// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~//// Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)//// Distributed under the Boost Software License, Version 1.0. (See accompanying// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)//#ifndef ASIO_EXPERIMENTAL_BASIC_CONCURRENT_CHANNEL_HPP#define ASIO_EXPERIMENTAL_BASIC_CONCURRENT_CHANNEL_HPP#if defined(_MSC_VER) && (_MSC_VER >= 1200)# pragma once#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)#include "asio/detail/config.hpp"#include "asio/detail/non_const_lvalue.hpp"#include "asio/detail/mutex.hpp"#include "asio/execution/executor.hpp"#include "asio/execution_context.hpp"#include "asio/experimental/detail/channel_send_functions.hpp"#include "asio/experimental/detail/channel_service.hpp"#include "asio/detail/push_options.hpp"namespace asio {namespace experimental {namespace detail {} // namespace detail/// A channel for messages./** * The basic_concurrent_channel class template is used for sending messages * between different parts of the same application. A <em>message</em> is * defined as a collection of arguments to be passed to a completion handler, * and the set of messages supported by a channel is specified by its @c Traits * and <tt>Signatures...</tt> template parameters. Messages may be sent and * received using asynchronous or non-blocking synchronous operations. * * Unless customising the traits, applications will typically use the @c * experimental::concurrent_channel alias template. For example: * @code void send_loop(int i, steady_timer& timer, *     concurrent_channel<void(error_code, int)>& ch) * { *   if (i < 10) *   { *     timer.expires_after(chrono::seconds(1)); *     timer.async_wait( *         [i, &timer, &ch](error_code error) *         { *           if (!error) *           { *             ch.async_send(error_code(), i, *                 [i, &timer, &ch](error_code error) *                 { *                   if (!error) *                   { *                     send_loop(i + 1, timer, ch); *                   } *                 }); *           } *         }); *   } *   else *   { *     ch.close(); *   } * } * * void receive_loop(concurent_channel<void(error_code, int)>& ch) * { *   ch.async_receive( *       [&ch](error_code error, int i) *       { *         if (!error) *         { *           std::cout << "Received " << i << "\n"; *           receive_loop(ch); *         } *       }); * } @endcode * * @par Thread Safety * @e Distinct @e objects: Safe.@n * @e Shared @e objects: Safe. * * The basic_concurrent_channel class template is thread-safe, and would * typically be used for passing messages between application code that run on * different threads. Consider using @ref basic_channel, and its alias template * @c experimental::channel, to pass messages between code running in a single * thread or on the same strand. */template <typename Executor, typename Traits, typename... Signatures>class basic_concurrent_channel#if !defined(GENERATING_DOCUMENTATION)  : public detail::channel_send_functions<      basic_concurrent_channel<Executor, Traits, Signatures...>,      Executor, Signatures...>#endif // !defined(GENERATING_DOCUMENTATION){private:  class initiate_async_send;  class initiate_async_receive;  typedef detail::channel_service<asio::detail::mutex> service_type;  typedef typename service_type::template implementation_type<      Traits, Signatures...>::payload_type payload_type;  template <typename... PayloadSignatures,      ASIO_COMPLETION_TOKEN_FOR(PayloadSignatures...) CompletionToken>  auto do_async_receive(detail::channel_payload<PayloadSignatures...>*,      CompletionToken&& token)    -> decltype(        async_initiate<CompletionToken, PayloadSignatures...>(          declval<initiate_async_receive>(), token))  {    return async_initiate<CompletionToken, PayloadSignatures...>(        initiate_async_receive(this), token);  }public:  /// The type of the executor associated with the channel.  typedef Executor executor_type;  /// Rebinds the channel type to another executor.  template <typename Executor1>  struct rebind_executor  {    /// The channel type when rebound to the specified executor.    typedef basic_concurrent_channel<Executor1, Traits, Signatures...> other;  };  /// The traits type associated with the channel.  typedef typename Traits::template rebind<Signatures...>::other traits_type;  /// Construct a basic_concurrent_channel.  /**   * This constructor creates and channel.   *   * @param ex The I/O executor that the channel will use, by default, to   * dispatch handlers for any asynchronous operations performed on the channel.   *   * @param max_buffer_size The maximum number of messages that may be buffered   * in the channel.   */  basic_concurrent_channel(const executor_type& ex,      std::size_t max_buffer_size = 0)    : service_(&asio::use_service<service_type>(            basic_concurrent_channel::get_context(ex))),      impl_(),      executor_(ex)  {    service_->construct(impl_, max_buffer_size);  }  /// Construct and open a basic_concurrent_channel.  /**   * This constructor creates and opens a channel.   *   * @param context An execution context which provides the I/O executor that   * the channel will use, by default, to dispatch handlers for any asynchronous   * operations performed on the channel.   *   * @param max_buffer_size The maximum number of messages that may be buffered   * in the channel.   */  template <typename ExecutionContext>  basic_concurrent_channel(ExecutionContext& context,      std::size_t max_buffer_size = 0,      constraint_t<        is_convertible<ExecutionContext&, execution_context&>::value,        defaulted_constraint      > = defaulted_constraint())    : service_(&asio::use_service<service_type>(context)),      impl_(),      executor_(context.get_executor())  {    service_->construct(impl_, max_buffer_size);  }  /// Move-construct a basic_concurrent_channel from another.  /**   * This constructor moves a channel from one object to another.   *   * @param other The other basic_concurrent_channel object from which the move   * will occur.   *   * @note Following the move, the moved-from object is in the same state as if   * constructed using the @c basic_concurrent_channel(const executor_type&)   * constructor.   */  basic_concurrent_channel(basic_concurrent_channel&& other)    : service_(other.service_),      executor_(other.executor_)  {    service_->move_construct(impl_, other.impl_);  }  /// Move-assign a basic_concurrent_channel from another.  /**   * This assignment operator moves a channel from one object to another.   * Cancels any outstanding asynchronous operations associated with the target   * object.   *   * @param other The other basic_concurrent_channel object from which the move   * will occur.   *   * @note Following the move, the moved-from object is in the same state as if   * constructed using the @c basic_concurrent_channel(const executor_type&)   * constructor.   */  basic_concurrent_channel& operator=(basic_concurrent_channel&& other)  {    if (this != &other)    {      service_->move_assign(impl_, *other.service_, other.impl_);      executor_.~executor_type();      new (&executor_) executor_type(other.executor_);      service_ = other.service_;    }    return *this;  }  // All channels have access to each other's implementations.  template <typename, typename, typename...>  friend class basic_concurrent_channel;  /// Move-construct a basic_concurrent_channel from another.  /**   * This constructor moves a channel from one object to another.   *   * @param other The other basic_concurrent_channel object from which the move   * will occur.   *   * @note Following the move, the moved-from object is in the same state as if   * constructed using the @c basic_concurrent_channel(const executor_type&)   * constructor.   */  template <typename Executor1>  basic_concurrent_channel(      basic_concurrent_channel<Executor1, Traits, Signatures...>&& other,      constraint_t<          is_convertible<Executor1, Executor>::value      > = 0)    : service_(other.service_),      executor_(other.executor_)  {    service_->move_construct(impl_, other.impl_);  }  /// Move-assign a basic_concurrent_channel from another.  /**   * This assignment operator moves a channel from one object to another.   * Cancels any outstanding asynchronous operations associated with the target   * object.   *   * @param other The other basic_concurrent_channel object from which the move   * will occur.   *   * @note Following the move, the moved-from object is in the same state as if   * constructed using the @c basic_concurrent_channel(const executor_type&)   * constructor.   */  template <typename Executor1>  constraint_t<    is_convertible<Executor1, Executor>::value,    basic_concurrent_channel&  > operator=(      basic_concurrent_channel<Executor1, Traits, Signatures...>&& other)  {    if (this != &other)    {      service_->move_assign(impl_, *other.service_, other.impl_);      executor_.~executor_type();      new (&executor_) executor_type(other.executor_);      service_ = other.service_;    }    return *this;  }  /// Destructor.  ~basic_concurrent_channel()  {    service_->destroy(impl_);  }  /// Get the executor associated with the object.  const executor_type& get_executor() noexcept  {    return executor_;  }  /// Get the capacity of the channel's buffer.  std::size_t capacity() noexcept  {    return service_->capacity(impl_);  }  /// Determine whether the channel is open.  bool is_open() const noexcept  {    return service_->is_open(impl_);  }  /// Reset the channel to its initial state.  void reset()  {    service_->reset(impl_);  }  /// Close the channel.  void close()  {    service_->close(impl_);  }  /// Cancel all asynchronous operations waiting on the channel.  /**   * All outstanding send operations will complete with the error   * @c asio::experimental::error::channel_cancelled. Outstanding receive   * operations complete with the result as determined by the channel traits.   */  void cancel()  {    service_->cancel(impl_);  }  /// Determine whether a message can be received without blocking.  bool ready() const noexcept  {    return service_->ready(impl_);  }#if defined(GENERATING_DOCUMENTATION)  /// Try to send a message without blocking.  /**   * Fails if the buffer is full and there are no waiting receive operations.   *   * @returns @c true on success, @c false on failure.   */  template <typename... Args>  bool try_send(Args&&... args);  /// Try to send a message without blocking, using dispatch semantics to call  /// the receive operation's completion handler.  /**   * Fails if the buffer is full and there are no waiting receive operations.   *   * The receive operation's completion handler may be called from inside this   * function.   *   * @returns @c true on success, @c false on failure.   */  template <typename... Args>  bool try_send_via_dispatch(Args&&... args);  /// Try to send a number of messages without blocking.  /**   * @returns The number of messages that were sent.   */  template <typename... Args>  std::size_t try_send_n(std::size_t count, Args&&... args);  /// Try to send a number of messages without blocking, using dispatch  /// semantics to call the receive operations' completion handlers.  /**   * The receive operations' completion handlers may be called from inside this   * function.   *   * @returns The number of messages that were sent.   */  template <typename... Args>  std::size_t try_send_n_via_dispatch(std::size_t count, Args&&... args);  /// Asynchronously send a message.  template <typename... Args,      ASIO_COMPLETION_TOKEN_FOR(void (asio::error_code))        CompletionToken ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)>  auto async_send(Args&&... args,      CompletionToken&& token);#endif // defined(GENERATING_DOCUMENTATION)  /// Try to receive a message without blocking.  /**   * Fails if the buffer is full and there are no waiting receive operations.   *   * @returns @c true on success, @c false on failure.   */  template <typename Handler>  bool try_receive(Handler&& handler)  {    return service_->try_receive(impl_, static_cast<Handler&&>(handler));  }  /// Asynchronously receive a message.  template <typename CompletionToken      ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)>  auto async_receive(      CompletionToken&& token        ASIO_DEFAULT_COMPLETION_TOKEN(Executor))#if !defined(GENERATING_DOCUMENTATION)    -> decltype(        this->do_async_receive(static_cast<payload_type*>(0),          static_cast<CompletionToken&&>(token)))#endif // !defined(GENERATING_DOCUMENTATION)  {    return this->do_async_receive(static_cast<payload_type*>(0),        static_cast<CompletionToken&&>(token));  }private:  // Disallow copying and assignment.  basic_concurrent_channel(      const basic_concurrent_channel&) = delete;  basic_concurrent_channel& operator=(      const basic_concurrent_channel&) = delete;  template <typename, typename, typename...>  friend class detail::channel_send_functions;  // Helper function to get an executor's context.  template <typename T>  static execution_context& get_context(const T& t,      enable_if_t<execution::is_executor<T>::value>* = 0)  {    return asio::query(t, execution::context);  }  // Helper function to get an executor's context.  template <typename T>  static execution_context& get_context(const T& t,      enable_if_t<!execution::is_executor<T>::value>* = 0)  {    return t.context();  }  class initiate_async_send  {  public:    typedef Executor executor_type;    explicit initiate_async_send(basic_concurrent_channel* self)      : self_(self)    {    }    const executor_type& get_executor() const noexcept    {      return self_->get_executor();    }    template <typename SendHandler>    void operator()(SendHandler&& handler,        payload_type&& payload) const    {      asio::detail::non_const_lvalue<SendHandler> handler2(handler);      self_->service_->async_send(self_->impl_,          static_cast<payload_type&&>(payload),          handler2.value, self_->get_executor());    }  private:    basic_concurrent_channel* self_;  };  class initiate_async_receive  {  public:    typedef Executor executor_type;    explicit initiate_async_receive(basic_concurrent_channel* self)      : self_(self)    {    }    const executor_type& get_executor() const noexcept    {      return self_->get_executor();    }    template <typename ReceiveHandler>    void operator()(ReceiveHandler&& handler) const    {      asio::detail::non_const_lvalue<ReceiveHandler> handler2(handler);      self_->service_->async_receive(self_->impl_,          handler2.value, self_->get_executor());    }  private:    basic_concurrent_channel* self_;  };  // The service associated with the I/O object.  service_type* service_;  // The underlying implementation of the I/O object.  typename service_type::template implementation_type<      Traits, Signatures...> impl_;  // The associated executor.  Executor executor_;};} // namespace experimental} // namespace asio#include "asio/detail/pop_options.hpp"#endif // ASIO_EXPERIMENTAL_BASIC_CONCURRENT_CHANNEL_HPP
 |