engine_impl.hpp 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. //
  2. // Copyright (c) 2019-2024 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_MYSQL_DETAIL_ENGINE_IMPL_HPP
  8. #define BOOST_MYSQL_DETAIL_ENGINE_IMPL_HPP
  9. #include <boost/mysql/error_code.hpp>
  10. #include <boost/mysql/detail/any_resumable_ref.hpp>
  11. #include <boost/mysql/detail/engine.hpp>
  12. #include <boost/mysql/detail/next_action.hpp>
  13. #include <boost/mysql/impl/internal/coroutine.hpp>
  14. #include <boost/asio/any_io_executor.hpp>
  15. #include <boost/asio/buffer.hpp>
  16. #include <boost/asio/compose.hpp>
  17. #include <boost/asio/post.hpp>
  18. #include <boost/assert.hpp>
  19. #include <cstddef>
  20. #include <utility>
  21. namespace boost {
  22. namespace mysql {
  23. namespace detail {
  24. inline asio::mutable_buffer to_buffer(span<std::uint8_t> buff) noexcept
  25. {
  26. return asio::mutable_buffer(buff.data(), buff.size());
  27. }
  28. template <class EngineStream>
  29. struct run_algo_op
  30. {
  31. int resume_point_{0};
  32. EngineStream& stream_;
  33. any_resumable_ref resumable_;
  34. bool has_done_io_{false};
  35. error_code stored_ec_;
  36. run_algo_op(EngineStream& stream, any_resumable_ref algo) noexcept : stream_(stream), resumable_(algo) {}
  37. template <class Self>
  38. void operator()(Self& self, error_code io_ec = {}, std::size_t bytes_transferred = 0)
  39. {
  40. next_action act;
  41. switch (resume_point_)
  42. {
  43. case 0:
  44. while (true)
  45. {
  46. // Run the op
  47. act = resumable_.resume(io_ec, bytes_transferred);
  48. if (act.is_done())
  49. {
  50. stored_ec_ = act.error();
  51. if (!has_done_io_)
  52. {
  53. BOOST_MYSQL_YIELD(
  54. resume_point_,
  55. 1,
  56. asio::post(stream_.get_executor(), std::move(self))
  57. )
  58. }
  59. self.complete(stored_ec_);
  60. return;
  61. }
  62. else if (act.type() == next_action_type::read)
  63. {
  64. BOOST_MYSQL_YIELD(
  65. resume_point_,
  66. 2,
  67. stream_.async_read_some(
  68. to_buffer(act.read_args().buffer),
  69. act.read_args().use_ssl,
  70. std::move(self)
  71. )
  72. )
  73. has_done_io_ = true;
  74. }
  75. else if (act.type() == next_action_type::write)
  76. {
  77. BOOST_MYSQL_YIELD(
  78. resume_point_,
  79. 3,
  80. stream_.async_write_some(
  81. asio::buffer(act.write_args().buffer),
  82. act.write_args().use_ssl,
  83. std::move(self)
  84. )
  85. )
  86. has_done_io_ = true;
  87. }
  88. else if (act.type() == next_action_type::ssl_handshake)
  89. {
  90. BOOST_MYSQL_YIELD(resume_point_, 4, stream_.async_ssl_handshake(std::move(self)))
  91. has_done_io_ = true;
  92. }
  93. else if (act.type() == next_action_type::ssl_shutdown)
  94. {
  95. BOOST_MYSQL_YIELD(resume_point_, 5, stream_.async_ssl_shutdown(std::move(self)))
  96. has_done_io_ = true;
  97. }
  98. else if (act.type() == next_action_type::connect)
  99. {
  100. BOOST_MYSQL_YIELD(resume_point_, 6, stream_.async_connect(std::move(self)))
  101. has_done_io_ = true;
  102. }
  103. else
  104. {
  105. BOOST_ASSERT(act.type() == next_action_type::close);
  106. stream_.close(io_ec);
  107. }
  108. }
  109. }
  110. }
  111. };
  112. // EngineStream is an "extended" stream concept, with the following operations:
  113. // using executor_type = asio::any_io_executor;
  114. // executor_type get_executor();
  115. // bool supports_ssl() const;
  116. // void set_endpoint(const void* endpoint);
  117. // std::size_t read_some(asio::mutable_buffer, bool use_ssl, error_code&);
  118. // void async_read_some(asio::mutable_buffer, bool use_ssl, CompletinToken&&);
  119. // std::size_t write_some(asio::const_buffer, bool use_ssl, error_code&);
  120. // void async_write_some(asio::const_buffer, bool use_ssl, CompletinToken&&);
  121. // void ssl_handshake(error_code&);
  122. // void async_ssl_handshake(CompletionToken&&);
  123. // void ssl_shutdown(error_code&);
  124. // void async_ssl_shutdown(CompletionToken&&);
  125. // void connect(error_code&);
  126. // void async_connect(CompletionToken&&);
  127. // void close(error_code&);
  128. // Async operations are only required to support callback types
  129. // See stream_adaptor for an implementation
  130. template <class EngineStream>
  131. class engine_impl final : public engine
  132. {
  133. EngineStream stream_;
  134. public:
  135. template <class... Args>
  136. engine_impl(Args&&... args) : stream_(std::forward<Args>(args)...)
  137. {
  138. }
  139. EngineStream& stream() { return stream_; }
  140. const EngineStream& stream() const { return stream_; }
  141. using executor_type = asio::any_io_executor;
  142. executor_type get_executor() override final { return stream_.get_executor(); }
  143. bool supports_ssl() const override final { return stream_.supports_ssl(); }
  144. void set_endpoint(const void* endpoint) override final { stream_.set_endpoint(endpoint); }
  145. void run(any_resumable_ref resumable, error_code& ec) override final
  146. {
  147. ec.clear();
  148. error_code io_ec;
  149. std::size_t bytes_transferred = 0;
  150. while (true)
  151. {
  152. // Run the op
  153. auto act = resumable.resume(io_ec, bytes_transferred);
  154. // Apply the next action
  155. bytes_transferred = 0;
  156. if (act.is_done())
  157. {
  158. ec = act.error();
  159. return;
  160. }
  161. else if (act.type() == next_action_type::read)
  162. {
  163. bytes_transferred = stream_.read_some(
  164. to_buffer(act.read_args().buffer),
  165. act.read_args().use_ssl,
  166. io_ec
  167. );
  168. }
  169. else if (act.type() == next_action_type::write)
  170. {
  171. bytes_transferred = stream_.write_some(
  172. asio::buffer(act.write_args().buffer),
  173. act.write_args().use_ssl,
  174. io_ec
  175. );
  176. }
  177. else if (act.type() == next_action_type::ssl_handshake)
  178. {
  179. stream_.ssl_handshake(io_ec);
  180. }
  181. else if (act.type() == next_action_type::ssl_shutdown)
  182. {
  183. stream_.ssl_shutdown(io_ec);
  184. }
  185. else if (act.type() == next_action_type::connect)
  186. {
  187. stream_.connect(io_ec);
  188. }
  189. else
  190. {
  191. BOOST_ASSERT(act.type() == next_action_type::close);
  192. stream_.close(io_ec);
  193. }
  194. }
  195. }
  196. void async_run(any_resumable_ref resumable, asio::any_completion_handler<void(error_code)> h)
  197. override final
  198. {
  199. return asio::async_compose<asio::any_completion_handler<void(error_code)>, void(error_code)>(
  200. run_algo_op<EngineStream>(stream_, resumable),
  201. h,
  202. stream_
  203. );
  204. }
  205. };
  206. } // namespace detail
  207. } // namespace mysql
  208. } // namespace boost
  209. #endif