// // Copyright (c) 2022 Klemens Morgenstern (klemens.morgenstern@gmx.net) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #ifndef BOOST_COBALT_DETAIL_GENERATOR_HPP #define BOOST_COBALT_DETAIL_GENERATOR_HPP #include #include #include #include #include #include #include #include #include #include #include #include namespace boost::cobalt { template struct generator; namespace detail { template struct generator_yield_awaitable; template struct generator_receiver; template struct generator_receiver_base { std::optional pushed_value; auto get_awaitable(const Push & push) requires std::is_copy_constructible_v { using impl = generator_receiver; return typename impl::awaitable{static_cast(this), &push}; } auto get_awaitable( Push && push) { using impl = generator_receiver; return typename impl::awaitable{static_cast(this), &push}; } }; template struct generator_receiver_base { bool pushed_value{false}; auto get_awaitable() { using impl = generator_receiver; return typename impl::awaitable{static_cast(this), static_cast(nullptr)}; } }; template struct generator_promise; template struct generator_receiver : generator_receiver_base { std::exception_ptr exception; std::optional result, result_buffer; Yield get_result() { if (result_buffer) { auto res = *std::exchange(result, std::nullopt); if (result_buffer) result.emplace(*std::exchange(result_buffer, std::nullopt)); return res; } else return *std::exchange(result, std::nullopt); } bool done = false; unique_handle awaited_from{nullptr}; unique_handle> yield_from{nullptr}; bool lazy = false; bool ready() { return exception || result || done; } generator_receiver(noop n) : result(std::move(n.value)), done(true) {} generator_receiver() = default; generator_receiver(generator_receiver && lhs) : generator_receiver_base{std::move(lhs.pushed_value)}, exception(std::move(lhs.exception)), result(std::move(lhs.result)), result_buffer(std::move(lhs.result_buffer)), done(lhs.done), awaited_from(std::move(lhs.awaited_from)), yield_from{std::move(lhs.yield_from)}, lazy(lhs.lazy), reference(lhs.reference), cancel_signal(lhs.cancel_signal) { if (!lhs.done && !lhs.exception) { *reference = this; lhs.exception = moved_from_exception(); } lhs.done = true; } ~generator_receiver() { if (!done && *reference == this) *reference = nullptr; } generator_receiver(generator_receiver * &reference, asio::cancellation_signal & cancel_signal) : reference(&reference), cancel_signal(&cancel_signal) { reference = this; } generator_receiver& operator=(generator_receiver && lhs) noexcept { if (*reference == this) { *reference = nullptr; } generator_receiver_base::operator=(std::move(lhs)); exception = std::move(lhs.exception); done = lhs.done; result = std::move(lhs.result); result_buffer = std::move(lhs.result_buffer); awaited_from = std::move(lhs.awaited_from); yield_from = std::move(lhs.yield_from); lazy = lhs.lazy; reference = lhs.reference; cancel_signal = lhs.cancel_signal; if (!lhs.done && !lhs.exception) { *reference = this; lhs.exception = moved_from_exception(); } lhs.done = true; return *this; } generator_receiver **reference; asio::cancellation_signal * cancel_signal; using yield_awaitable = generator_yield_awaitable; yield_awaitable get_yield_awaitable(generator_promise * pro) {return {pro}; } static yield_awaitable terminator() {return {nullptr}; } template void yield_value(T && t) { if (!result) result.emplace(std::forward(t)); else { BOOST_ASSERT(!result_buffer); result_buffer.emplace(std::forward(t)); } } struct awaitable { generator_receiver *self; std::exception_ptr ex; asio::cancellation_slot cl; variant2::variant to_push; awaitable(generator_receiver * self, Push * to_push) : self(self), to_push(to_push) { } awaitable(generator_receiver * self, const Push * to_push) : self(self), to_push(to_push) { } awaitable(const awaitable & aw) noexcept : self(aw.self), to_push(aw.to_push) { } bool await_ready() const { BOOST_ASSERT(!ex); return self->ready(); } template std::coroutine_handle await_suspend(std::coroutine_handle h) { if (self->done) // ok, so we're actually done already, so noop return std::noop_coroutine(); if (!ex && self->awaited_from != nullptr) // generator already being awaited, that's an error! ex = already_awaited(); if (ex) return h; if constexpr (requires (Promise p) {p.get_cancellation_slot();}) if ((cl = h.promise().get_cancellation_slot()).is_connected()) cl.emplace(*self->cancel_signal); self->awaited_from.reset(h.address()); std::coroutine_handle res = std::noop_coroutine(); if (self->yield_from != nullptr) res = self->yield_from.release(); if ((to_push.index() > 0) && !self->pushed_value && self->lazy) { if constexpr (std::is_void_v) self->pushed_value = true; else { if (to_push.index() == 1) self->pushed_value.emplace(std::move(*variant2::get<1>(to_push))); else { if constexpr (std::is_copy_constructible_v) self->pushed_value.emplace(std::move(*variant2::get<2>(to_push))); else { BOOST_ASSERT(!"push value is not movable"); } } } to_push = variant2::monostate{}; } return std::coroutine_handle::from_address(res.address()); } Yield await_resume(const boost::source_location & loc = BOOST_CURRENT_LOCATION) { return await_resume(as_result_tag{}).value(loc); } std::tuple await_resume( const as_tuple_tag &) { auto res = await_resume(as_result_tag{}); if (res.has_error()) return {res.error(), Yield{}}; else return {nullptr, res.value()}; } system::result await_resume(const as_result_tag& ) { if (cl.is_connected()) cl.clear(); if (ex) return {system::in_place_error, ex}; if (self->exception) return {system::in_place_error, std::exchange(self->exception, nullptr)}; if (!self->result) // missing co_return this is accepted behaviour, if the compiler agrees return {system::in_place_error, std::make_exception_ptr(std::runtime_error("cobalt::generator returned void"))}; if (to_push.index() > 0) { BOOST_ASSERT(!self->pushed_value); if constexpr (std::is_void_v) self->pushed_value = true; else { if (to_push.index() == 1) self->pushed_value.emplace(std::move(*variant2::get<1>(to_push))); else { if constexpr (std::is_copy_constructible_v) self->pushed_value.emplace(std::move(*variant2::get<2>(to_push))); else { BOOST_ASSERT(!"push value is not movable"); } } } to_push = variant2::monostate{}; } // now we also want to resume the coroutine, so it starts work if (self->yield_from != nullptr && !self->lazy) { auto exec = self->yield_from->get_executor(); auto alloc = asio::get_associated_allocator(self->yield_from); asio::post( std::move(exec), asio::bind_allocator( alloc, [y = std::exchange(self->yield_from, nullptr)]() mutable { if (y->receiver) // make sure we only resume eagerly when attached to a generator object std::move(y)(); })); } return {system::in_place_value, self->get_result()}; } void interrupt_await() & { if (!self) return ; ex = detached_exception(); if (self->awaited_from) self->awaited_from.release().resume(); } }; void interrupt_await() & { exception = detached_exception(); awaited_from.release().resume(); } void rethrow_if() { if (exception) std::rethrow_exception(exception); } }; template struct generator_promise : promise_memory_resource_base, promise_cancellation_base, promise_throw_if_cancelled_base, enable_awaitables>, enable_await_allocator>, enable_await_executor< generator_promise>, enable_await_deferred { using promise_cancellation_base::await_transform; using promise_throw_if_cancelled_base::await_transform; using enable_awaitables>::await_transform; using enable_await_allocator>::await_transform; using enable_await_executor>::await_transform; using enable_await_deferred::await_transform; [[nodiscard]] generator get_return_object() { return generator{this}; } mutable asio::cancellation_signal signal; using executor_type = executor; executor_type exec; const executor_type & get_executor() const {return exec;} template generator_promise(Args & ...args) : #if !defined(BOOST_COBALT_NO_PMR) promise_memory_resource_base(detail::get_memory_resource_from_args(args...)), #endif exec{detail::get_executor_from_args(args...)} { this->reset_cancellation_source(signal.slot()); } std::suspend_never initial_suspend() noexcept {return {};} struct final_awaitable { generator_promise * generator; bool await_ready() const noexcept { return generator->receiver && generator->receiver->awaited_from.get() == nullptr; } auto await_suspend(std::coroutine_handle h) noexcept { std::coroutine_handle res = std::noop_coroutine(); if (generator->receiver && generator->receiver->awaited_from.get() != nullptr) res = generator->receiver->awaited_from.release(); if (generator->receiver) generator->receiver->done = true; if (auto & rec = h.promise().receiver; rec != nullptr) { if (!rec->done && !rec->exception) rec->exception = detail::completed_unexpected(); rec->done = true; rec->awaited_from.reset(nullptr); rec = nullptr; } detail::self_destroy(h); return res; } void await_resume() noexcept { if (generator->receiver) generator->receiver->done = true; } }; auto final_suspend() noexcept { return final_awaitable{this}; } void unhandled_exception() { if (this->receiver) this->receiver->exception = std::current_exception(); else throw ; } void return_value(const Yield & res) requires std::is_copy_constructible_v { if (this->receiver) this->receiver->yield_value(res); } void return_value(Yield && res) { if (this->receiver) this->receiver->yield_value(std::move(res)); } generator_receiver* receiver{nullptr}; auto await_transform(this_coro::initial_t val) { if(receiver) { receiver->lazy = true; return receiver->get_yield_awaitable(this); } else return generator_receiver::terminator(); } template auto yield_value(Yield_ && ret) { if(receiver) { // if this is lazy, there might still be a value in there. receiver->yield_value(std::forward(ret)); return receiver->get_yield_awaitable(this); } else return generator_receiver::terminator(); } void interrupt_await() & { if (this->receiver) { this->receiver->exception = detached_exception(); std::coroutine_handle::from_address(this->receiver->awaited_from.release()).resume(); } } ~generator_promise() { if (this->receiver) { if (!this->receiver->done && !this->receiver->exception) this->receiver->exception = detail::completed_unexpected(); this->receiver->done = true; this->receiver->awaited_from.reset(nullptr); } } }; template struct generator_yield_awaitable { generator_promise *self; constexpr bool await_ready() const { return self && self->receiver && self->receiver->pushed_value && !self->receiver->result; } std::coroutine_handle await_suspend( std::coroutine_handle> h #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING) , const boost::source_location & loc = BOOST_CURRENT_LOCATION #endif ) { if (self == nullptr) // we're a terminator, kill it { if (auto & rec = h.promise().receiver; rec != nullptr) { if (!rec->done && !rec->exception) rec->exception = detail::completed_unexpected(); rec->done = true; rec->awaited_from.reset(nullptr); rec = nullptr; } detail::self_destroy(h); return std::noop_coroutine(); } std::coroutine_handle res = std::noop_coroutine(); if (self->receiver->awaited_from.get() != nullptr) res = self->receiver->awaited_from.release(); #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING) self->receiver->yield_from.reset(&h.promise(), loc); #else self->receiver->yield_from.reset(&h.promise()); #endif return res; } Push await_resume() { BOOST_ASSERT(self->receiver); BOOST_ASSERT(self->receiver->pushed_value); return *std::exchange(self->receiver->pushed_value, std::nullopt); } }; template struct generator_yield_awaitable { generator_promise *self; constexpr bool await_ready() { return self && self->receiver && self->receiver->pushed_value; } std::coroutine_handle<> await_suspend( std::coroutine_handle> h #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING) , const boost::source_location & loc = BOOST_CURRENT_LOCATION #endif ) { if (self == nullptr) // we're a terminator, kill it { if (auto & rec = h.promise().receiver; rec != nullptr) { if (!rec->done && !rec->exception) rec->exception = detail::completed_unexpected(); rec->done = true; rec->awaited_from.reset(nullptr); rec = nullptr; } detail::self_destroy(h); return std::noop_coroutine(); } std::coroutine_handle res = std::noop_coroutine(); BOOST_ASSERT(self); if (self->receiver->awaited_from.get() != nullptr) res = self->receiver->awaited_from.release(); #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING) self->receiver->yield_from.reset(&h.promise(), loc); #else self->receiver->yield_from.reset(&h.promise()); #endif return res; } void await_resume() { BOOST_ASSERT(self->receiver->pushed_value); self->receiver->pushed_value = false; } }; template struct generator_base { auto operator()( Push && push) { return static_cast*>(this)->receiver_.get_awaitable(std::move(push)); } auto operator()(const Push & push) requires std::is_copy_constructible_v { return static_cast*>(this)->receiver_.get_awaitable(push); } }; template struct generator_base { auto operator co_await () { return static_cast*>(this)->receiver_.get_awaitable(); } }; template struct generator_with_awaitable { generator_base &g; std::optional::awaitable> awaitable; template void await_suspend(std::coroutine_handle h) { g.cancel(); awaitable.emplace(g.operator co_await()); return awaitable->await_suspend(h); } void await_resume() {} }; } } #endif //BOOST_COBALT_DETAIL_GENERATOR_HPP