run_pipeline.hpp 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. //
  2. // Copyright (c) 2019-2024 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_MYSQL_IMPL_INTERNAL_SANSIO_RUN_PIPELINE_HPP
  8. #define BOOST_MYSQL_IMPL_INTERNAL_SANSIO_RUN_PIPELINE_HPP
  9. #include <boost/mysql/character_set.hpp>
  10. #include <boost/mysql/diagnostics.hpp>
  11. #include <boost/mysql/error_code.hpp>
  12. #include <boost/mysql/is_fatal_error.hpp>
  13. #include <boost/mysql/pipeline.hpp>
  14. #include <boost/mysql/detail/access.hpp>
  15. #include <boost/mysql/detail/algo_params.hpp>
  16. #include <boost/mysql/detail/next_action.hpp>
  17. #include <boost/mysql/detail/pipeline.hpp>
  18. #include <boost/mysql/impl/internal/sansio/connection_state_data.hpp>
  19. #include <boost/mysql/impl/internal/sansio/execute.hpp>
  20. #include <boost/mysql/impl/internal/sansio/ping.hpp>
  21. #include <boost/mysql/impl/internal/sansio/prepare_statement.hpp>
  22. #include <boost/mysql/impl/internal/sansio/reset_connection.hpp>
  23. #include <boost/mysql/impl/internal/sansio/set_character_set.hpp>
  24. #include <boost/assert.hpp>
  25. #include <boost/core/span.hpp>
  26. #include <cstddef>
  27. #include <vector>
  28. namespace boost {
  29. namespace mysql {
  30. namespace detail {
  31. class run_pipeline_algo
  32. {
  33. union any_read_algo
  34. {
  35. std::nullptr_t nothing;
  36. read_execute_response_algo execute;
  37. read_prepare_statement_response_algo prepare_statement;
  38. read_reset_connection_response_algo reset_connection;
  39. read_ping_response_algo ping;
  40. read_set_character_set_response_algo set_character_set;
  41. any_read_algo() noexcept : nothing{} {}
  42. };
  43. diagnostics* diag_;
  44. span<const std::uint8_t> request_buffer_;
  45. span<const pipeline_request_stage> stages_;
  46. std::vector<stage_response>* response_;
  47. int resume_point_{0};
  48. std::size_t current_stage_index_{0};
  49. error_code pipeline_ec_; // Result of the entire operation
  50. bool has_hatal_error_{}; // If true, fail further stages with pipeline_ec_
  51. any_read_algo read_response_algo_;
  52. diagnostics temp_diag_;
  53. void setup_response()
  54. {
  55. if (response_)
  56. {
  57. // Create as many response items as request stages
  58. response_->resize(stages_.size());
  59. // Setup them
  60. for (std::size_t i = 0u; i < stages_.size(); ++i)
  61. {
  62. // Execution stages need to be initialized to results objects.
  63. // Otherwise, clear any previous content
  64. auto& impl = access::get_impl((*response_)[i]);
  65. if (stages_[i].kind == pipeline_stage_kind::execute)
  66. impl.emplace_results();
  67. else
  68. impl.emplace_error();
  69. }
  70. }
  71. }
  72. void setup_current_stage(const connection_state_data& st)
  73. {
  74. // Reset previous data
  75. temp_diag_.clear();
  76. // Setup read algo
  77. auto stage = stages_[current_stage_index_];
  78. switch (stage.kind)
  79. {
  80. case pipeline_stage_kind::execute:
  81. {
  82. BOOST_ASSERT(response_ != nullptr); // we don't support execution ignoring the response
  83. auto& processor = access::get_impl((*response_)[current_stage_index_]).get_processor();
  84. processor.reset(stage.stage_specific.enc, st.meta_mode);
  85. processor.sequence_number() = stage.seqnum;
  86. read_response_algo_.execute = {&temp_diag_, &processor};
  87. break;
  88. }
  89. case pipeline_stage_kind::prepare_statement:
  90. read_response_algo_.prepare_statement = {&temp_diag_, stage.seqnum};
  91. break;
  92. case pipeline_stage_kind::close_statement:
  93. // Close statement doesn't have a response
  94. read_response_algo_.nothing = nullptr;
  95. break;
  96. case pipeline_stage_kind::set_character_set:
  97. read_response_algo_.set_character_set = {&temp_diag_, stage.stage_specific.charset, stage.seqnum};
  98. break;
  99. case pipeline_stage_kind::reset_connection:
  100. read_response_algo_.reset_connection = {&temp_diag_, stage.seqnum};
  101. break;
  102. case pipeline_stage_kind::ping: read_response_algo_.ping = {&temp_diag_, stage.seqnum}; break;
  103. default: BOOST_ASSERT(false);
  104. }
  105. }
  106. void set_stage_error(error_code ec, diagnostics&& diag)
  107. {
  108. if (response_)
  109. {
  110. access::get_impl((*response_)[current_stage_index_]).set_error(ec, std::move(diag));
  111. }
  112. }
  113. void on_stage_finished(const connection_state_data& st, error_code stage_ec)
  114. {
  115. if (stage_ec)
  116. {
  117. // If the error was fatal, fail successive stages.
  118. // This error is the result of the operation
  119. if (is_fatal_error(stage_ec))
  120. {
  121. pipeline_ec_ = stage_ec;
  122. *diag_ = temp_diag_;
  123. has_hatal_error_ = true;
  124. }
  125. else if (!pipeline_ec_)
  126. {
  127. // In the absence of fatal errors, the first error we encounter is the result of the operation
  128. pipeline_ec_ = stage_ec;
  129. *diag_ = temp_diag_;
  130. }
  131. // Propagate the error
  132. if (response_ != nullptr)
  133. {
  134. set_stage_error(stage_ec, std::move(temp_diag_));
  135. }
  136. }
  137. else
  138. {
  139. if (stages_[current_stage_index_].kind == pipeline_stage_kind::prepare_statement)
  140. {
  141. // Propagate results. We don't support prepare statements ignoring the response
  142. BOOST_ASSERT(response_ != nullptr);
  143. access::get_impl((*response_)[current_stage_index_])
  144. .set_result(read_response_algo_.prepare_statement.result(st));
  145. }
  146. }
  147. }
  148. next_action resume_read_algo(connection_state_data& st, error_code ec)
  149. {
  150. switch (stages_[current_stage_index_].kind)
  151. {
  152. case pipeline_stage_kind::execute: return read_response_algo_.execute.resume(st, ec);
  153. case pipeline_stage_kind::prepare_statement:
  154. return read_response_algo_.prepare_statement.resume(st, ec);
  155. case pipeline_stage_kind::reset_connection:
  156. return read_response_algo_.reset_connection.resume(st, ec);
  157. case pipeline_stage_kind::set_character_set:
  158. return read_response_algo_.set_character_set.resume(st, ec);
  159. case pipeline_stage_kind::ping: return read_response_algo_.ping.resume(st, ec);
  160. case pipeline_stage_kind::close_statement: return next_action(); // has no response
  161. default: BOOST_ASSERT(false); return next_action();
  162. }
  163. }
  164. public:
  165. run_pipeline_algo(run_pipeline_algo_params params) noexcept
  166. : diag_(params.diag),
  167. request_buffer_(params.request_buffer),
  168. stages_(params.request_stages),
  169. response_(params.response)
  170. {
  171. }
  172. next_action resume(connection_state_data& st, error_code ec)
  173. {
  174. next_action act;
  175. switch (resume_point_)
  176. {
  177. case 0:
  178. // Clear previous state
  179. diag_->clear();
  180. setup_response();
  181. // If the request is empty, don't do anything
  182. if (stages_.empty())
  183. break;
  184. // Write the request. use_ssl is attached by top_level_algo
  185. BOOST_MYSQL_YIELD(resume_point_, 1, next_action::write({request_buffer_, false}))
  186. // If writing the request failed, fail all the stages with the given error code
  187. if (ec)
  188. {
  189. pipeline_ec_ = ec;
  190. has_hatal_error_ = true;
  191. }
  192. // For each stage
  193. for (; current_stage_index_ < stages_.size(); ++current_stage_index_)
  194. {
  195. // If there was a fatal error, just set the error and move forward
  196. if (has_hatal_error_)
  197. {
  198. set_stage_error(pipeline_ec_, diagnostics(*diag_));
  199. continue;
  200. }
  201. // Setup the stage
  202. setup_current_stage(st);
  203. // Run it until completion
  204. ec.clear();
  205. while (!(act = resume_read_algo(st, ec)).is_done())
  206. BOOST_MYSQL_YIELD(resume_point_, 2, act)
  207. // Process the stage's result
  208. on_stage_finished(st, act.error());
  209. }
  210. }
  211. return pipeline_ec_;
  212. }
  213. };
  214. } // namespace detail
  215. } // namespace mysql
  216. } // namespace boost
  217. #endif