123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234 |
- //
- // Copyright (c) 2019-2024 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
- //
- // Distributed under the Boost Software License, Version 1.0. (See accompanying
- // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
- //
- #ifndef BOOST_MYSQL_DETAIL_ENGINE_IMPL_HPP
- #define BOOST_MYSQL_DETAIL_ENGINE_IMPL_HPP
- #include <boost/mysql/error_code.hpp>
- #include <boost/mysql/detail/any_resumable_ref.hpp>
- #include <boost/mysql/detail/engine.hpp>
- #include <boost/mysql/detail/next_action.hpp>
- #include <boost/mysql/impl/internal/coroutine.hpp>
- #include <boost/asio/any_io_executor.hpp>
- #include <boost/asio/buffer.hpp>
- #include <boost/asio/compose.hpp>
- #include <boost/asio/post.hpp>
- #include <boost/assert.hpp>
- #include <cstddef>
- #include <utility>
- namespace boost {
- namespace mysql {
- namespace detail {
- inline asio::mutable_buffer to_buffer(span<std::uint8_t> buff) noexcept
- {
- return asio::mutable_buffer(buff.data(), buff.size());
- }
- template <class EngineStream>
- struct run_algo_op
- {
- int resume_point_{0};
- EngineStream& stream_;
- any_resumable_ref resumable_;
- bool has_done_io_{false};
- error_code stored_ec_;
- run_algo_op(EngineStream& stream, any_resumable_ref algo) noexcept : stream_(stream), resumable_(algo) {}
- template <class Self>
- void operator()(Self& self, error_code io_ec = {}, std::size_t bytes_transferred = 0)
- {
- next_action act;
- switch (resume_point_)
- {
- case 0:
- while (true)
- {
- // Run the op
- act = resumable_.resume(io_ec, bytes_transferred);
- if (act.is_done())
- {
- stored_ec_ = act.error();
- if (!has_done_io_)
- {
- BOOST_MYSQL_YIELD(
- resume_point_,
- 1,
- asio::post(stream_.get_executor(), std::move(self))
- )
- }
- self.complete(stored_ec_);
- return;
- }
- else if (act.type() == next_action_type::read)
- {
- BOOST_MYSQL_YIELD(
- resume_point_,
- 2,
- stream_.async_read_some(
- to_buffer(act.read_args().buffer),
- act.read_args().use_ssl,
- std::move(self)
- )
- )
- has_done_io_ = true;
- }
- else if (act.type() == next_action_type::write)
- {
- BOOST_MYSQL_YIELD(
- resume_point_,
- 3,
- stream_.async_write_some(
- asio::buffer(act.write_args().buffer),
- act.write_args().use_ssl,
- std::move(self)
- )
- )
- has_done_io_ = true;
- }
- else if (act.type() == next_action_type::ssl_handshake)
- {
- BOOST_MYSQL_YIELD(resume_point_, 4, stream_.async_ssl_handshake(std::move(self)))
- has_done_io_ = true;
- }
- else if (act.type() == next_action_type::ssl_shutdown)
- {
- BOOST_MYSQL_YIELD(resume_point_, 5, stream_.async_ssl_shutdown(std::move(self)))
- has_done_io_ = true;
- }
- else if (act.type() == next_action_type::connect)
- {
- BOOST_MYSQL_YIELD(resume_point_, 6, stream_.async_connect(std::move(self)))
- has_done_io_ = true;
- }
- else
- {
- BOOST_ASSERT(act.type() == next_action_type::close);
- stream_.close(io_ec);
- }
- }
- }
- }
- };
- // EngineStream is an "extended" stream concept, with the following operations:
- // using executor_type = asio::any_io_executor;
- // executor_type get_executor();
- // bool supports_ssl() const;
- // void set_endpoint(const void* endpoint);
- // std::size_t read_some(asio::mutable_buffer, bool use_ssl, error_code&);
- // void async_read_some(asio::mutable_buffer, bool use_ssl, CompletinToken&&);
- // std::size_t write_some(asio::const_buffer, bool use_ssl, error_code&);
- // void async_write_some(asio::const_buffer, bool use_ssl, CompletinToken&&);
- // void ssl_handshake(error_code&);
- // void async_ssl_handshake(CompletionToken&&);
- // void ssl_shutdown(error_code&);
- // void async_ssl_shutdown(CompletionToken&&);
- // void connect(error_code&);
- // void async_connect(CompletionToken&&);
- // void close(error_code&);
- // Async operations are only required to support callback types
- // See stream_adaptor for an implementation
- template <class EngineStream>
- class engine_impl final : public engine
- {
- EngineStream stream_;
- public:
- template <class... Args>
- engine_impl(Args&&... args) : stream_(std::forward<Args>(args)...)
- {
- }
- EngineStream& stream() { return stream_; }
- const EngineStream& stream() const { return stream_; }
- using executor_type = asio::any_io_executor;
- executor_type get_executor() override final { return stream_.get_executor(); }
- bool supports_ssl() const override final { return stream_.supports_ssl(); }
- void set_endpoint(const void* endpoint) override final { stream_.set_endpoint(endpoint); }
- void run(any_resumable_ref resumable, error_code& ec) override final
- {
- ec.clear();
- error_code io_ec;
- std::size_t bytes_transferred = 0;
- while (true)
- {
- // Run the op
- auto act = resumable.resume(io_ec, bytes_transferred);
- // Apply the next action
- bytes_transferred = 0;
- if (act.is_done())
- {
- ec = act.error();
- return;
- }
- else if (act.type() == next_action_type::read)
- {
- bytes_transferred = stream_.read_some(
- to_buffer(act.read_args().buffer),
- act.read_args().use_ssl,
- io_ec
- );
- }
- else if (act.type() == next_action_type::write)
- {
- bytes_transferred = stream_.write_some(
- asio::buffer(act.write_args().buffer),
- act.write_args().use_ssl,
- io_ec
- );
- }
- else if (act.type() == next_action_type::ssl_handshake)
- {
- stream_.ssl_handshake(io_ec);
- }
- else if (act.type() == next_action_type::ssl_shutdown)
- {
- stream_.ssl_shutdown(io_ec);
- }
- else if (act.type() == next_action_type::connect)
- {
- stream_.connect(io_ec);
- }
- else
- {
- BOOST_ASSERT(act.type() == next_action_type::close);
- stream_.close(io_ec);
- }
- }
- }
- void async_run(any_resumable_ref resumable, asio::any_completion_handler<void(error_code)> h)
- override final
- {
- return asio::async_compose<asio::any_completion_handler<void(error_code)>, void(error_code)>(
- run_algo_op<EngineStream>(stream_, resumable),
- h,
- stream_
- );
- }
- };
- } // namespace detail
- } // namespace mysql
- } // namespace boost
- #endif
|