| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234 | //// Copyright (c) 2019-2024 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)//// Distributed under the Boost Software License, Version 1.0. (See accompanying// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)//#ifndef BOOST_MYSQL_DETAIL_ENGINE_IMPL_HPP#define BOOST_MYSQL_DETAIL_ENGINE_IMPL_HPP#include <boost/mysql/error_code.hpp>#include <boost/mysql/detail/any_resumable_ref.hpp>#include <boost/mysql/detail/engine.hpp>#include <boost/mysql/detail/next_action.hpp>#include <boost/mysql/impl/internal/coroutine.hpp>#include <boost/asio/any_io_executor.hpp>#include <boost/asio/buffer.hpp>#include <boost/asio/compose.hpp>#include <boost/asio/post.hpp>#include <boost/assert.hpp>#include <cstddef>#include <utility>namespace boost {namespace mysql {namespace detail {inline asio::mutable_buffer to_buffer(span<std::uint8_t> buff) noexcept{    return asio::mutable_buffer(buff.data(), buff.size());}template <class EngineStream>struct run_algo_op{    int resume_point_{0};    EngineStream& stream_;    any_resumable_ref resumable_;    bool has_done_io_{false};    error_code stored_ec_;    run_algo_op(EngineStream& stream, any_resumable_ref algo) noexcept : stream_(stream), resumable_(algo) {}    template <class Self>    void operator()(Self& self, error_code io_ec = {}, std::size_t bytes_transferred = 0)    {        next_action act;        switch (resume_point_)        {        case 0:            while (true)            {                // Run the op                act = resumable_.resume(io_ec, bytes_transferred);                if (act.is_done())                {                    stored_ec_ = act.error();                    if (!has_done_io_)                    {                        BOOST_MYSQL_YIELD(                            resume_point_,                            1,                            asio::post(stream_.get_executor(), std::move(self))                        )                    }                    self.complete(stored_ec_);                    return;                }                else if (act.type() == next_action_type::read)                {                    BOOST_MYSQL_YIELD(                        resume_point_,                        2,                        stream_.async_read_some(                            to_buffer(act.read_args().buffer),                            act.read_args().use_ssl,                            std::move(self)                        )                    )                    has_done_io_ = true;                }                else if (act.type() == next_action_type::write)                {                    BOOST_MYSQL_YIELD(                        resume_point_,                        3,                        stream_.async_write_some(                            asio::buffer(act.write_args().buffer),                            act.write_args().use_ssl,                            std::move(self)                        )                    )                    has_done_io_ = true;                }                else if (act.type() == next_action_type::ssl_handshake)                {                    BOOST_MYSQL_YIELD(resume_point_, 4, stream_.async_ssl_handshake(std::move(self)))                    has_done_io_ = true;                }                else if (act.type() == next_action_type::ssl_shutdown)                {                    BOOST_MYSQL_YIELD(resume_point_, 5, stream_.async_ssl_shutdown(std::move(self)))                    has_done_io_ = true;                }                else if (act.type() == next_action_type::connect)                {                    BOOST_MYSQL_YIELD(resume_point_, 6, stream_.async_connect(std::move(self)))                    has_done_io_ = true;                }                else                {                    BOOST_ASSERT(act.type() == next_action_type::close);                    stream_.close(io_ec);                }            }        }    }};// EngineStream is an "extended" stream concept, with the following operations://    using executor_type = asio::any_io_executor;//    executor_type get_executor();//    bool supports_ssl() const;//    void set_endpoint(const void* endpoint);//    std::size_t read_some(asio::mutable_buffer, bool use_ssl, error_code&);//    void async_read_some(asio::mutable_buffer, bool use_ssl, CompletinToken&&);//    std::size_t write_some(asio::const_buffer, bool use_ssl, error_code&);//    void async_write_some(asio::const_buffer, bool use_ssl, CompletinToken&&);//    void ssl_handshake(error_code&);//    void async_ssl_handshake(CompletionToken&&);//    void ssl_shutdown(error_code&);//    void async_ssl_shutdown(CompletionToken&&);//    void connect(error_code&);//    void async_connect(CompletionToken&&);//    void close(error_code&);// Async operations are only required to support callback types// See stream_adaptor for an implementationtemplate <class EngineStream>class engine_impl final : public engine{    EngineStream stream_;public:    template <class... Args>    engine_impl(Args&&... args) : stream_(std::forward<Args>(args)...)    {    }    EngineStream& stream() { return stream_; }    const EngineStream& stream() const { return stream_; }    using executor_type = asio::any_io_executor;    executor_type get_executor() override final { return stream_.get_executor(); }    bool supports_ssl() const override final { return stream_.supports_ssl(); }    void set_endpoint(const void* endpoint) override final { stream_.set_endpoint(endpoint); }    void run(any_resumable_ref resumable, error_code& ec) override final    {        ec.clear();        error_code io_ec;        std::size_t bytes_transferred = 0;        while (true)        {            // Run the op            auto act = resumable.resume(io_ec, bytes_transferred);            // Apply the next action            bytes_transferred = 0;            if (act.is_done())            {                ec = act.error();                return;            }            else if (act.type() == next_action_type::read)            {                bytes_transferred = stream_.read_some(                    to_buffer(act.read_args().buffer),                    act.read_args().use_ssl,                    io_ec                );            }            else if (act.type() == next_action_type::write)            {                bytes_transferred = stream_.write_some(                    asio::buffer(act.write_args().buffer),                    act.write_args().use_ssl,                    io_ec                );            }            else if (act.type() == next_action_type::ssl_handshake)            {                stream_.ssl_handshake(io_ec);            }            else if (act.type() == next_action_type::ssl_shutdown)            {                stream_.ssl_shutdown(io_ec);            }            else if (act.type() == next_action_type::connect)            {                stream_.connect(io_ec);            }            else            {                BOOST_ASSERT(act.type() == next_action_type::close);                stream_.close(io_ec);            }        }    }    void async_run(any_resumable_ref resumable, asio::any_completion_handler<void(error_code)> h)        override final    {        return asio::async_compose<asio::any_completion_handler<void(error_code)>, void(error_code)>(            run_algo_op<EngineStream>(stream_, resumable),            h,            stream_        );    }};}  // namespace detail}  // namespace mysql}  // namespace boost#endif
 |