// // Copyright (c) 2022 Klemens Morgenstern (klemens.morgenstern@gmx.net) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #ifndef BOOST_COBALT_CHANNEL_HPP #define BOOST_COBALT_CHANNEL_HPP #include #include #include #include #include #include #include #include #include #include namespace boost::cobalt { template struct channel_reader; // tag::outline[] template struct channel { // end::outline[] #if defined(BOOST_COBALT_NO_PMR) channel(std::size_t limit = 0u, executor executor = this_thread::get_executor()); #else // tag::outline[] // create a channel with a buffer limit, executor & resource. explicit channel(std::size_t limit = 0u, executor executor = this_thread::get_executor(), pmr::memory_resource * resource = this_thread::get_default_resource()); // end::outline[] #endif // tag::outline[] // not movable. channel(channel && rhs) noexcept = delete; channel & operator=(channel && lhs) noexcept = delete; using executor_type = executor; const executor_type & get_executor(); // Closes the channel ~channel(); bool is_open() const; // close the operation, will cancel all pending ops, too void close(); // end::outline[] private: #if !defined(BOOST_COBALT_NO_PMR) boost::circular_buffer> buffer_; #else boost::circular_buffer buffer_; #endif executor_type executor_; bool is_closed_{false}; struct read_op : intrusive::list_base_hook > { channel * chn; boost::source_location loc; bool cancelled = false; std::optional direct{}; asio::cancellation_slot cancel_slot{}; unique_handle awaited_from{nullptr}; void (*begin_transaction)(void*) = nullptr; void transactional_unlink() { if (begin_transaction) begin_transaction(awaited_from.get()); this->unlink(); } struct cancel_impl; bool await_ready() { return !chn->buffer_.empty(); } template BOOST_NOINLINE std::coroutine_handle await_suspend(std::coroutine_handle h); T await_resume(); std::tuple await_resume(const struct as_tuple_tag & ); system::result await_resume(const struct as_result_tag &); explicit operator bool() const {return chn && chn->is_open();} }; struct write_op : intrusive::list_base_hook > { channel * chn; using ref_t = std::conditional_t< std::is_copy_constructible_v, variant2::variant, T*>; ref_t ref; boost::source_location loc; bool cancelled = false, direct = false; asio::cancellation_slot cancel_slot{}; unique_handle awaited_from{nullptr}; void (*begin_transaction)(void*) = nullptr; void transactional_unlink() { if (begin_transaction) begin_transaction(awaited_from.get()); this->unlink(); } struct cancel_impl; bool await_ready() { return !chn->buffer_.full(); } template BOOST_NOINLINE std::coroutine_handle await_suspend(std::coroutine_handle h); void await_resume(); std::tuple await_resume(const struct as_tuple_tag & ); system::result await_resume(const struct as_result_tag &); explicit operator bool() const {return chn && chn->is_open();} }; boost::intrusive::list > read_queue_; boost::intrusive::list > write_queue_; public: read_op read(const boost::source_location & loc = BOOST_CURRENT_LOCATION) {return read_op{{}, this, loc}; } #if defined(BOOST_WINDOWS_API) BOOST_NOINLINE #endif write_op write(const T && value, const boost::source_location & loc = BOOST_CURRENT_LOCATION) requires std::is_copy_constructible_v { return write_op{{}, this, &value, loc}; } #if defined(BOOST_WINDOWS_API) BOOST_NOINLINE #endif write_op write(const T & value, const boost::source_location & loc = BOOST_CURRENT_LOCATION) requires std::is_copy_constructible_v { return write_op{{}, this, &value, loc}; } #if defined(BOOST_WINDOWS_API) BOOST_NOINLINE #endif write_op write( T && value, const boost::source_location & loc = BOOST_CURRENT_LOCATION) { return write_op{{}, this, &value, loc}; } #if defined(BOOST_WINDOWS_API) BOOST_NOINLINE #endif write_op write( T & value, const boost::source_location & loc = BOOST_CURRENT_LOCATION) { return write_op{{}, this, &value, loc}; } /* // tag::outline[] // an awaitable that yields T using __read_op__ = __unspecified__; // an awaitable that yields void using __write_op__ = __unspecified__; // read a value to a channel __read_op__ read(); // write a value to the channel __write_op__ write(const T && value); __write_op__ write(const T & value); __write_op__ write( T && value); __write_op__ write( T & value); // write a value to the channel if T is void __write_op__ write(); // end::outline[] */ // tag::outline[] }; // end::outline[] template<> struct channel { explicit channel(std::size_t limit = 0u, executor executor = this_thread::get_executor()) : limit_(limit), executor_(executor) {} channel(channel &&) noexcept = delete; channel & operator=(channel && lhs) noexcept = delete; using executor_type = executor; const executor_type & get_executor() {return executor_;} BOOST_COBALT_DECL ~channel(); bool is_open() const {return !is_closed_;} BOOST_COBALT_DECL void close(); private: std::size_t limit_; std::size_t n_{0u}; executor_type executor_; bool is_closed_{false}; struct read_op : intrusive::list_base_hook > { channel * chn; boost::source_location loc; bool cancelled = false, direct = false; asio::cancellation_slot cancel_slot{}; unique_handle awaited_from{nullptr}; void (*begin_transaction)(void*) = nullptr; void transactional_unlink() { if (begin_transaction) begin_transaction(awaited_from.get()); this->unlink(); } struct cancel_impl; bool await_ready() { return (chn->n_ > 0); } template BOOST_NOINLINE std::coroutine_handle await_suspend(std::coroutine_handle h); BOOST_COBALT_DECL void await_resume(); BOOST_COBALT_DECL std::tuple await_resume(const struct as_tuple_tag & ); BOOST_COBALT_DECL system::result await_resume(const struct as_result_tag &); explicit operator bool() const {return chn && chn->is_open();} }; struct write_op : intrusive::list_base_hook > { channel * chn; boost::source_location loc; bool cancelled = false, direct = false; asio::cancellation_slot cancel_slot{}; unique_handle awaited_from{nullptr}; void (*begin_transaction)(void*) = nullptr; void transactional_unlink() { if (begin_transaction) begin_transaction(awaited_from.get()); this->unlink(); } struct cancel_impl; bool await_ready() { return chn->n_ < chn->limit_; } template BOOST_NOINLINE std::coroutine_handle await_suspend(std::coroutine_handle h); BOOST_COBALT_DECL void await_resume(); BOOST_COBALT_DECL std::tuple await_resume(const struct as_tuple_tag & ); BOOST_COBALT_DECL system::result await_resume(const struct as_result_tag &); explicit operator bool() const {return chn && chn->is_open();} }; boost::intrusive::list > read_queue_; boost::intrusive::list > write_queue_; public: read_op read(const boost::source_location & loc = BOOST_CURRENT_LOCATION) {return read_op{{}, this, loc}; } write_op write(const boost::source_location & loc = BOOST_CURRENT_LOCATION) {return write_op{{}, this, loc}; } }; template struct channel_reader { channel_reader(channel & chan, const boost::source_location & loc = BOOST_CURRENT_LOCATION) : chan_(&chan), loc_(loc) {} auto operator co_await () { return chan_->read(loc_); } explicit operator bool () const {return chan_ && chan_->is_open();} private: channel * chan_; boost::source_location loc_; }; } #include #endif //BOOST_COBALT_CHANNEL_HPP