// // Copyright (c) 2022 Klemens Morgenstern (klemens.morgenstern@gmx.net) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #ifndef BOOST_COBALT_CHANNEL_HPP #define BOOST_COBALT_CHANNEL_HPP #include <boost/cobalt/this_thread.hpp> #include <boost/cobalt/unique_handle.hpp> #include <boost/cobalt/detail/util.hpp> #include <boost/asio/cancellation_signal.hpp> #include <boost/asio/cancellation_type.hpp> #include <boost/circular_buffer.hpp> #include <boost/config.hpp> #include <boost/intrusive/list.hpp> #include <boost/variant2/variant.hpp> #include <optional> namespace boost::cobalt { template<typename T> struct channel_reader; // tag::outline[] template<typename T> struct channel { // end::outline[] #if defined(BOOST_COBALT_NO_PMR) channel(std::size_t limit = 0u, executor executor = this_thread::get_executor()); #else // tag::outline[] // create a channel with a buffer limit, executor & resource. explicit channel(std::size_t limit = 0u, executor executor = this_thread::get_executor(), pmr::memory_resource * resource = this_thread::get_default_resource()); // end::outline[] #endif // tag::outline[] // not movable. channel(channel && rhs) noexcept = delete; channel & operator=(channel && lhs) noexcept = delete; using executor_type = executor; const executor_type & get_executor(); // Closes the channel ~channel(); bool is_open() const; // close the operation, will cancel all pending ops, too void close(); // end::outline[] private: #if !defined(BOOST_COBALT_NO_PMR) boost::circular_buffer<T, pmr::polymorphic_allocator<T>> buffer_; #else boost::circular_buffer<T> buffer_; #endif executor_type executor_; bool is_closed_{false}; struct read_op : intrusive::list_base_hook<intrusive::link_mode<intrusive::auto_unlink> > { channel * chn; boost::source_location loc; bool cancelled = false; std::optional<T> direct{}; asio::cancellation_slot cancel_slot{}; unique_handle<void> awaited_from{nullptr}; void (*begin_transaction)(void*) = nullptr; void transactional_unlink() { if (begin_transaction) begin_transaction(awaited_from.get()); this->unlink(); } struct cancel_impl; bool await_ready() { return !chn->buffer_.empty(); } template<typename Promise> BOOST_NOINLINE std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h); T await_resume(); std::tuple<system::error_code, T> await_resume(const struct as_tuple_tag & ); system::result<T> await_resume(const struct as_result_tag &); explicit operator bool() const {return chn && chn->is_open();} }; struct write_op : intrusive::list_base_hook<intrusive::link_mode<intrusive::auto_unlink> > { channel * chn; using ref_t = std::conditional_t< std::is_copy_constructible_v<T>, variant2::variant<T*, const T*>, T*>; ref_t ref; boost::source_location loc; bool cancelled = false, direct = false; asio::cancellation_slot cancel_slot{}; unique_handle<void> awaited_from{nullptr}; void (*begin_transaction)(void*) = nullptr; void transactional_unlink() { if (begin_transaction) begin_transaction(awaited_from.get()); this->unlink(); } struct cancel_impl; bool await_ready() { return !chn->buffer_.full(); } template<typename Promise> BOOST_NOINLINE std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h); void await_resume(); std::tuple<system::error_code> await_resume(const struct as_tuple_tag & ); system::result<void> await_resume(const struct as_result_tag &); explicit operator bool() const {return chn && chn->is_open();} }; boost::intrusive::list<read_op, intrusive::constant_time_size<false> > read_queue_; boost::intrusive::list<write_op, intrusive::constant_time_size<false> > write_queue_; public: read_op read(const boost::source_location & loc = BOOST_CURRENT_LOCATION) {return read_op{{}, this, loc}; } #if defined(BOOST_WINDOWS_API) BOOST_NOINLINE #endif write_op write(const T && value, const boost::source_location & loc = BOOST_CURRENT_LOCATION) requires std::is_copy_constructible_v<T> { return write_op{{}, this, &value, loc}; } #if defined(BOOST_WINDOWS_API) BOOST_NOINLINE #endif write_op write(const T & value, const boost::source_location & loc = BOOST_CURRENT_LOCATION) requires std::is_copy_constructible_v<T> { return write_op{{}, this, &value, loc}; } #if defined(BOOST_WINDOWS_API) BOOST_NOINLINE #endif write_op write( T && value, const boost::source_location & loc = BOOST_CURRENT_LOCATION) { return write_op{{}, this, &value, loc}; } #if defined(BOOST_WINDOWS_API) BOOST_NOINLINE #endif write_op write( T & value, const boost::source_location & loc = BOOST_CURRENT_LOCATION) { return write_op{{}, this, &value, loc}; } /* // tag::outline[] // an awaitable that yields T using __read_op__ = __unspecified__; // an awaitable that yields void using __write_op__ = __unspecified__; // read a value to a channel __read_op__ read(); // write a value to the channel __write_op__ write(const T && value); __write_op__ write(const T & value); __write_op__ write( T && value); __write_op__ write( T & value); // write a value to the channel if T is void __write_op__ write(); // end::outline[] */ // tag::outline[] }; // end::outline[] template<> struct channel<void> { explicit channel(std::size_t limit = 0u, executor executor = this_thread::get_executor()) : limit_(limit), executor_(executor) {} channel(channel &&) noexcept = delete; channel & operator=(channel && lhs) noexcept = delete; using executor_type = executor; const executor_type & get_executor() {return executor_;} BOOST_COBALT_DECL ~channel(); bool is_open() const {return !is_closed_;} BOOST_COBALT_DECL void close(); private: std::size_t limit_; std::size_t n_{0u}; executor_type executor_; bool is_closed_{false}; struct read_op : intrusive::list_base_hook<intrusive::link_mode<intrusive::auto_unlink> > { channel * chn; boost::source_location loc; bool cancelled = false, direct = false; asio::cancellation_slot cancel_slot{}; unique_handle<void> awaited_from{nullptr}; void (*begin_transaction)(void*) = nullptr; void transactional_unlink() { if (begin_transaction) begin_transaction(awaited_from.get()); this->unlink(); } struct cancel_impl; bool await_ready() { return (chn->n_ > 0); } template<typename Promise> BOOST_NOINLINE std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h); BOOST_COBALT_DECL void await_resume(); BOOST_COBALT_DECL std::tuple<system::error_code> await_resume(const struct as_tuple_tag & ); BOOST_COBALT_DECL system::result<void> await_resume(const struct as_result_tag &); explicit operator bool() const {return chn && chn->is_open();} }; struct write_op : intrusive::list_base_hook<intrusive::link_mode<intrusive::auto_unlink> > { channel * chn; boost::source_location loc; bool cancelled = false, direct = false; asio::cancellation_slot cancel_slot{}; unique_handle<void> awaited_from{nullptr}; void (*begin_transaction)(void*) = nullptr; void transactional_unlink() { if (begin_transaction) begin_transaction(awaited_from.get()); this->unlink(); } struct cancel_impl; bool await_ready() { return chn->n_ < chn->limit_; } template<typename Promise> BOOST_NOINLINE std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h); BOOST_COBALT_DECL void await_resume(); BOOST_COBALT_DECL std::tuple<system::error_code> await_resume(const struct as_tuple_tag & ); BOOST_COBALT_DECL system::result<void> await_resume(const struct as_result_tag &); explicit operator bool() const {return chn && chn->is_open();} }; boost::intrusive::list<read_op, intrusive::constant_time_size<false> > read_queue_; boost::intrusive::list<write_op, intrusive::constant_time_size<false> > write_queue_; public: read_op read(const boost::source_location & loc = BOOST_CURRENT_LOCATION) {return read_op{{}, this, loc}; } write_op write(const boost::source_location & loc = BOOST_CURRENT_LOCATION) {return write_op{{}, this, loc}; } }; template<typename T> struct channel_reader { channel_reader(channel<T> & chan, const boost::source_location & loc = BOOST_CURRENT_LOCATION) : chan_(&chan), loc_(loc) {} auto operator co_await () { return chan_->read(loc_); } explicit operator bool () const {return chan_ && chan_->is_open();} private: channel<T> * chan_; boost::source_location loc_; }; } #include <boost/cobalt/impl/channel.hpp> #endif //BOOST_COBALT_CHANNEL_HPP