connection_node.hpp 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. //
  2. // Copyright (c) 2019-2024 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_MYSQL_IMPL_INTERNAL_CONNECTION_POOL_CONNECTION_NODE_HPP
  8. #define BOOST_MYSQL_IMPL_INTERNAL_CONNECTION_POOL_CONNECTION_NODE_HPP
  9. #include <boost/mysql/any_connection.hpp>
  10. #include <boost/mysql/character_set.hpp>
  11. #include <boost/mysql/client_errc.hpp>
  12. #include <boost/mysql/diagnostics.hpp>
  13. #include <boost/mysql/error_code.hpp>
  14. #include <boost/mysql/pipeline.hpp>
  15. #include <boost/mysql/detail/connection_pool_fwd.hpp>
  16. #include <boost/mysql/impl/internal/connection_pool/internal_pool_params.hpp>
  17. #include <boost/mysql/impl/internal/connection_pool/run_with_timeout.hpp>
  18. #include <boost/mysql/impl/internal/connection_pool/sansio_connection_node.hpp>
  19. #include <boost/mysql/impl/internal/connection_pool/timer_list.hpp>
  20. #include <boost/asio/any_io_executor.hpp>
  21. #include <boost/asio/compose.hpp>
  22. #include <boost/asio/deferred.hpp>
  23. #include <boost/asio/steady_timer.hpp>
  24. #include <boost/intrusive/list.hpp>
  25. #include <boost/intrusive/list_hook.hpp>
  26. #include <chrono>
  27. #include <utility>
  28. #include <vector>
  29. namespace boost {
  30. namespace mysql {
  31. namespace detail {
  32. // Traits to use by default for nodes. Templating on traits provides
  33. // a way to mock dependencies in tests. Production code only uses
  34. // instantiations that use io_traits.
  35. // Having this as a traits type (as opposed to individual template params)
  36. // allows us to forward-declare io_traits without having to include steady_timer
  37. struct io_traits
  38. {
  39. using connection_type = any_connection;
  40. using timer_type = asio::steady_timer;
  41. };
  42. // State shared between connection tasks
  43. template <class IoTraits>
  44. struct conn_shared_state
  45. {
  46. intrusive::list<basic_connection_node<IoTraits>> idle_list;
  47. timer_list<typename IoTraits::timer_type> pending_requests;
  48. std::size_t num_pending_connections{0};
  49. error_code last_ec;
  50. diagnostics last_diag;
  51. };
  52. // The templated type is never exposed to the user. We template
  53. // so tests can inject mocks.
  54. template <class IoTraits>
  55. class basic_connection_node : public intrusive::list_base_hook<>,
  56. public sansio_connection_node<basic_connection_node<IoTraits>>
  57. {
  58. using this_type = basic_connection_node<IoTraits>;
  59. using connection_type = typename IoTraits::connection_type;
  60. using timer_type = typename IoTraits::timer_type;
  61. // Not thread-safe, must be manipulated within the pool's executor
  62. const internal_pool_params* params_;
  63. conn_shared_state<IoTraits>* shared_st_;
  64. connection_type conn_;
  65. timer_type timer_;
  66. diagnostics connect_diag_;
  67. timer_type collection_timer_; // Notifications about collections. A separate timer makes potential race
  68. // conditions not harmful
  69. const pipeline_request* reset_pipeline_req_;
  70. std::vector<stage_response> reset_pipeline_res_;
  71. // Thread-safe
  72. std::atomic<collection_state> collection_state_{collection_state::none};
  73. // Hooks for sansio_connection_node
  74. friend class sansio_connection_node<basic_connection_node<IoTraits>>;
  75. void entering_idle()
  76. {
  77. shared_st_->idle_list.push_back(*this);
  78. shared_st_->pending_requests.notify_one();
  79. }
  80. void exiting_idle() { shared_st_->idle_list.erase(shared_st_->idle_list.iterator_to(*this)); }
  81. void entering_pending() { ++shared_st_->num_pending_connections; }
  82. void exiting_pending() { --shared_st_->num_pending_connections; }
  83. // Helpers
  84. void propagate_connect_diag(error_code ec)
  85. {
  86. shared_st_->last_ec = ec;
  87. shared_st_->last_diag = connect_diag_;
  88. }
  89. struct connection_task_op
  90. {
  91. this_type& node_;
  92. next_connection_action last_act_{next_connection_action::none};
  93. connection_task_op(this_type& node) noexcept : node_(node) {}
  94. template <class Self>
  95. void operator()(Self& self, error_code ec = {})
  96. {
  97. // A collection status may be generated by idle_wait actions
  98. auto col_st = last_act_ == next_connection_action::idle_wait
  99. ? node_.collection_state_.exchange(collection_state::none)
  100. : collection_state::none;
  101. // Connect actions should set the shared diagnostics, so these
  102. // get reported to the user
  103. if (last_act_ == next_connection_action::connect)
  104. node_.propagate_connect_diag(ec);
  105. // Invoke the sans-io algorithm
  106. last_act_ = node_.resume(ec, col_st);
  107. // Apply the next action. run_with_timeout makes sure that all handlers
  108. // are dispatched using the timer's executor (that is, the pool executor)
  109. switch (last_act_)
  110. {
  111. case next_connection_action::connect:
  112. run_with_timeout(
  113. node_.conn_
  114. .async_connect(node_.params_->connect_config, node_.connect_diag_, asio::deferred),
  115. node_.timer_,
  116. node_.params_->connect_timeout,
  117. std::move(self)
  118. );
  119. break;
  120. case next_connection_action::sleep_connect_failed:
  121. node_.timer_.expires_after(node_.params_->retry_interval);
  122. node_.timer_.async_wait(std::move(self));
  123. break;
  124. case next_connection_action::ping:
  125. run_with_timeout(
  126. node_.conn_.async_ping(asio::deferred),
  127. node_.timer_,
  128. node_.params_->ping_timeout,
  129. std::move(self)
  130. );
  131. break;
  132. case next_connection_action::reset:
  133. run_with_timeout(
  134. node_.conn_.async_run_pipeline(
  135. *node_.reset_pipeline_req_,
  136. node_.reset_pipeline_res_,
  137. asio::deferred
  138. ),
  139. node_.timer_,
  140. node_.params_->ping_timeout,
  141. std::move(self)
  142. );
  143. break;
  144. case next_connection_action::idle_wait:
  145. run_with_timeout(
  146. node_.collection_timer_.async_wait(asio::deferred),
  147. node_.timer_,
  148. node_.params_->ping_interval,
  149. std::move(self)
  150. );
  151. break;
  152. case next_connection_action::none: self.complete(error_code()); break;
  153. default: BOOST_ASSERT(false);
  154. }
  155. }
  156. };
  157. public:
  158. basic_connection_node(
  159. internal_pool_params& params,
  160. boost::asio::any_io_executor ex,
  161. boost::asio::any_io_executor conn_ex,
  162. conn_shared_state<IoTraits>& shared_st,
  163. const pipeline_request* reset_pipeline_req
  164. )
  165. : params_(&params),
  166. shared_st_(&shared_st),
  167. conn_(std::move(conn_ex), params.make_ctor_params()),
  168. timer_(ex),
  169. collection_timer_(ex, (std::chrono::steady_clock::time_point::max)()),
  170. reset_pipeline_req_(reset_pipeline_req)
  171. {
  172. }
  173. void cancel()
  174. {
  175. sansio_connection_node<this_type>::cancel();
  176. timer_.cancel();
  177. collection_timer_.cancel();
  178. }
  179. // This initiation must be invoked within the pool's executor
  180. template <class CompletionToken>
  181. auto async_run(CompletionToken&& token
  182. ) -> decltype(asio::async_compose<CompletionToken, void(error_code)>(connection_task_op{*this}, token))
  183. {
  184. return asio::async_compose<CompletionToken, void(error_code)>(connection_task_op{*this}, token);
  185. }
  186. connection_type& connection() noexcept { return conn_; }
  187. const connection_type& connection() const noexcept { return conn_; }
  188. // Not thread-safe, must be called within the pool's executor
  189. void notify_collectable() { collection_timer_.cancel(); }
  190. // Thread-safe. May be safely be called from any thread.
  191. void mark_as_collectable(bool should_reset) noexcept
  192. {
  193. collection_state_.store(
  194. should_reset ? collection_state::needs_collect_with_reset : collection_state::needs_collect
  195. );
  196. }
  197. // Exposed for testing
  198. collection_state get_collection_state() const noexcept { return collection_state_; }
  199. };
  200. } // namespace detail
  201. } // namespace mysql
  202. } // namespace boost
  203. #endif