// Copyright (c) 2023 Klemens D. Morgenstern // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) #ifndef BOOST_COBALT_DETAIL_FORK_HPP #define BOOST_COBALT_DETAIL_FORK_HPP #include #include #include #include #include #if defined(BOOST_COBALT_NO_PMR) #include #endif #include #include #include namespace boost::cobalt::detail { struct fork { fork() = default; struct shared_state { #if !defined(BOOST_COBALT_NO_PMR) pmr::monotonic_buffer_resource resource; template shared_state(Args && ... args) : resource(std::forward(args)..., this_thread::get_default_resource()) { } #else detail::monotonic_resource resource; template shared_state(Args && ... args) : resource(std::forward(args)...) { } #endif // the coro awaiting the fork statement, e.g. awaiting race unique_handle coro; std::size_t use_count = 0u; friend void intrusive_ptr_add_ref(shared_state * st) {st->use_count++;} friend void intrusive_ptr_release(shared_state * st) { if (st->use_count-- == 1u) st->coro.reset(); } bool outstanding_work() {return use_count != 0u;} const executor * exec = nullptr; bool wired_up() {return exec != nullptr;} using executor_type = executor; const executor_type & get_executor() const { BOOST_ASSERT(exec != nullptr); return *exec; } #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING) boost::source_location loc; #endif }; template struct static_shared_state : private std::array, shared_state { static_shared_state() : shared_state{std::array::data(), std::array::size()} {} }; struct wired_up_t {}; constexpr static wired_up_t wired_up{}; struct set_transaction_function { void * begin_transaction_this = nullptr; void (*begin_transaction_func)(void*) = nullptr; template set_transaction_function(BeginTransaction & transaction) : begin_transaction_this(&transaction) , begin_transaction_func( +[](void * ptr) { (*static_cast(ptr))(); }) { } }; struct promise_type { template void * operator new(const std::size_t size, State & st, Rest &&...) { return st.resource.allocate(size); } void operator delete(void *) noexcept {} template promise_type(shared_state & st, Rest & ...) : state(&st) { } intrusive_ptr state; asio::cancellation_slot cancel; using executor_type = executor; const executor_type & get_executor() const { return state->get_executor(); } #if defined(BOOST_COBALT_NO_PMR) using allocator_type = detail::monotonic_allocator; const allocator_type get_allocator() const { return &state->resource; } #else using allocator_type = pmr::polymorphic_allocator; const allocator_type get_allocator() const { return &state->resource; } #endif using cancellation_slot_type = asio::cancellation_slot; cancellation_slot_type get_cancellation_slot() const { return cancel; } constexpr static std::suspend_never initial_suspend() noexcept {return {};} struct final_awaitable { promise_type * self; bool await_ready() noexcept { return self->state->use_count != 1u; } std::coroutine_handle await_suspend(std::coroutine_handle h) noexcept { auto pp = h.promise().state.detach(); #if defined(BOOST_COBALT_NO_SELF_DELETE) h.promise().~promise_type(); #else // mem is in a monotonic_resource, this is fine on msvc- gcc doesn't like it though h.destroy(); #endif pp->use_count--; BOOST_ASSERT(pp->use_count == 0u); if (pp->coro) return pp->coro.release(); else return std::noop_coroutine(); } constexpr static void await_resume() noexcept {} }; final_awaitable final_suspend() noexcept { if (cancel.is_connected()) cancel.clear(); return final_awaitable{this}; } void return_void() { } template Aw> struct wrapped_awaitable { Aw & aw; constexpr static bool await_ready() noexcept { return false; } auto await_suspend(std::coroutine_handle h) { BOOST_ASSERT(h.promise().state->wired_up()); #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING) if constexpr (requires {aw.await_suspend(h, boost::source_location ());}) return aw.await_suspend(h, h.promise().state->loc); #endif return aw.await_suspend(h); } auto await_resume() { return aw.await_resume(); } }; template Aw> auto await_transform(Aw & aw) { return wrapped_awaitable{aw}; } struct wired_up_awaitable { promise_type * promise; bool await_ready() const noexcept { return promise->state->wired_up(); } void await_suspend(std::coroutine_handle) { } constexpr static void await_resume() noexcept {} }; auto await_transform(wired_up_t) { return wired_up_awaitable{this}; } auto await_transform(set_transaction_function sf) { begin_transaction_this = sf.begin_transaction_this; begin_transaction_func = sf.begin_transaction_func; return std::suspend_never(); } auto await_transform(asio::cancellation_slot slot) { this->cancel = slot; return std::suspend_never(); } [[noreturn]] void unhandled_exception() noexcept {std::terminate();} void * begin_transaction_this = nullptr; void (*begin_transaction_func)(void*) = nullptr; void begin_transaction() { if (begin_transaction_this) begin_transaction_func(begin_transaction_this); } fork get_return_object() { return this; } }; [[nodiscard]] bool done() const { return ! handle_ || handle_.done(); } auto release() -> std::coroutine_handle { return handle_.release(); } private: fork(promise_type * pt) : handle_(pt) {} unique_handle handle_; }; } #endif //BOOST_COBALT_DETAIL_FORK_HPP