connection_pool_impl.hpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. //
  2. // Copyright (c) 2019-2024 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_MYSQL_IMPL_INTERNAL_CONNECTION_POOL_CONNECTION_POOL_IMPL_HPP
  8. #define BOOST_MYSQL_IMPL_INTERNAL_CONNECTION_POOL_CONNECTION_POOL_IMPL_HPP
  9. #include <boost/mysql/any_connection.hpp>
  10. #include <boost/mysql/character_set.hpp>
  11. #include <boost/mysql/client_errc.hpp>
  12. #include <boost/mysql/diagnostics.hpp>
  13. #include <boost/mysql/error_code.hpp>
  14. #include <boost/mysql/pool_params.hpp>
  15. #include <boost/mysql/detail/config.hpp>
  16. #include <boost/mysql/impl/internal/connection_pool/connection_node.hpp>
  17. #include <boost/mysql/impl/internal/connection_pool/internal_pool_params.hpp>
  18. #include <boost/mysql/impl/internal/connection_pool/timer_list.hpp>
  19. #include <boost/mysql/impl/internal/connection_pool/wait_group.hpp>
  20. #include <boost/mysql/impl/internal/coroutine.hpp>
  21. #include <boost/asio/any_completion_handler.hpp>
  22. #include <boost/asio/any_io_executor.hpp>
  23. #include <boost/asio/bind_executor.hpp>
  24. #include <boost/asio/compose.hpp>
  25. #include <boost/asio/deferred.hpp>
  26. #include <boost/asio/dispatch.hpp>
  27. #include <boost/asio/error.hpp>
  28. #include <boost/asio/post.hpp>
  29. #include <boost/core/ignore_unused.hpp>
  30. #include <chrono>
  31. #include <cstddef>
  32. #include <list>
  33. #include <memory>
  34. namespace boost {
  35. namespace mysql {
  36. namespace detail {
  37. inline pipeline_request make_reset_pipeline()
  38. {
  39. pipeline_request req;
  40. req.add_reset_connection().add_set_character_set(utf8mb4_charset);
  41. return req;
  42. }
  43. // Templating on ConnectionWrapper is useful for mocking in tests.
  44. // Production code always uses ConnectionWrapper = pooled_connection.
  45. template <class IoTraits, class ConnectionWrapper>
  46. class basic_pool_impl : public std::enable_shared_from_this<basic_pool_impl<IoTraits, ConnectionWrapper>>
  47. {
  48. using this_type = basic_pool_impl<IoTraits, ConnectionWrapper>;
  49. using node_type = basic_connection_node<IoTraits>;
  50. using timer_type = typename IoTraits::timer_type;
  51. using timer_block_type = timer_block<timer_type>;
  52. using shared_state_type = conn_shared_state<IoTraits>;
  53. enum class state_t
  54. {
  55. initial,
  56. running,
  57. cancelled,
  58. };
  59. state_t state_{state_t::initial};
  60. internal_pool_params params_;
  61. asio::any_io_executor ex_;
  62. asio::any_io_executor conn_ex_;
  63. std::list<node_type> all_conns_;
  64. shared_state_type shared_st_;
  65. wait_group wait_gp_;
  66. timer_type cancel_timer_;
  67. const pipeline_request reset_pipeline_req_{make_reset_pipeline()};
  68. std::shared_ptr<this_type> shared_from_this_wrapper()
  69. {
  70. // Some compilers get confused without this explicit cast
  71. return static_cast<std::enable_shared_from_this<this_type>*>(this)->shared_from_this();
  72. }
  73. void create_connection()
  74. {
  75. all_conns_.emplace_back(params_, ex_, conn_ex_, shared_st_, &reset_pipeline_req_);
  76. wait_gp_.run_task(all_conns_.back().async_run(asio::deferred));
  77. }
  78. error_code get_diagnostics(diagnostics* diag) const
  79. {
  80. if (state_ == state_t::cancelled)
  81. {
  82. return client_errc::cancelled;
  83. }
  84. else if (shared_st_.last_ec)
  85. {
  86. if (diag)
  87. *diag = shared_st_.last_diag;
  88. return shared_st_.last_ec;
  89. }
  90. else
  91. {
  92. return client_errc::timeout;
  93. }
  94. }
  95. struct run_op
  96. {
  97. int resume_point_{0};
  98. std::shared_ptr<this_type> obj_;
  99. run_op(std::shared_ptr<this_type> obj) noexcept : obj_(std::move(obj)) {}
  100. template <class Self>
  101. void operator()(Self& self, error_code ec = {})
  102. {
  103. // TODO: per-operation cancellation here doesn't do the right thing
  104. boost::ignore_unused(ec);
  105. switch (resume_point_)
  106. {
  107. case 0:
  108. // Ensure we run within the pool executor (possibly a strand)
  109. BOOST_MYSQL_YIELD(resume_point_, 1, asio::dispatch(obj_->ex_, std::move(self)))
  110. // Check that we're not running and set the state adequately
  111. BOOST_ASSERT(obj_->state_ == state_t::initial);
  112. obj_->state_ = state_t::running;
  113. // Create the initial connections
  114. for (std::size_t i = 0; i < obj_->params_.initial_size; ++i)
  115. obj_->create_connection();
  116. // Wait for the cancel notification to arrive.
  117. BOOST_MYSQL_YIELD(resume_point_, 2, obj_->cancel_timer_.async_wait(std::move(self)))
  118. // If the token passed to async_run had a bound executor,
  119. // the handler will be invoked within that executor.
  120. // Dispatch so we run within the pool's executor.
  121. BOOST_MYSQL_YIELD(resume_point_, 3, asio::dispatch(obj_->ex_, std::move(self)))
  122. // Deliver the cancel notification to all other tasks
  123. obj_->state_ = state_t::cancelled;
  124. for (auto& conn : obj_->all_conns_)
  125. conn.cancel();
  126. obj_->shared_st_.pending_requests.notify_all();
  127. // Wait for all connection tasks to exit
  128. BOOST_MYSQL_YIELD(resume_point_, 4, obj_->wait_gp_.async_wait(std::move(self)))
  129. // Done
  130. obj_.reset();
  131. self.complete(error_code());
  132. }
  133. }
  134. };
  135. struct get_connection_op
  136. {
  137. int resume_point_{0};
  138. std::shared_ptr<this_type> obj_;
  139. std::chrono::steady_clock::time_point timeout_;
  140. diagnostics* diag_;
  141. std::unique_ptr<timer_block_type> timer_;
  142. error_code stored_ec_;
  143. get_connection_op(
  144. std::shared_ptr<this_type> obj,
  145. std::chrono::steady_clock::time_point timeout,
  146. diagnostics* diag
  147. ) noexcept
  148. : obj_(std::move(obj)), timeout_(timeout), diag_(diag)
  149. {
  150. }
  151. template <class Self>
  152. void do_complete(Self& self, error_code ec, ConnectionWrapper conn)
  153. {
  154. // Resetting the timer will remove it from the list thanks to the auto-unlink feature
  155. timer_.reset();
  156. obj_.reset();
  157. self.complete(ec, std::move(conn));
  158. }
  159. template <class Self>
  160. void complete_success(Self& self, node_type& node)
  161. {
  162. node.mark_as_in_use();
  163. do_complete(self, error_code(), ConnectionWrapper(node, std::move(obj_)));
  164. }
  165. template <class Self>
  166. void operator()(Self& self, error_code ec = {})
  167. {
  168. switch (resume_point_)
  169. {
  170. case 0:
  171. // Clear diagnostics
  172. if (diag_)
  173. diag_->clear();
  174. // Ensure we run within the pool's executor (or the handler's) (possibly a strand)
  175. BOOST_MYSQL_YIELD(resume_point_, 1, asio::post(obj_->ex_, std::move(self)))
  176. // This loop guards us against possible race conditions
  177. // between waiting on the pending request timer and getting the connection
  178. while (true)
  179. {
  180. // If we're not running yet, or were cancelled, just return
  181. if (obj_->state_ != state_t::running)
  182. {
  183. do_complete(
  184. self,
  185. obj_->state_ == state_t::initial ? client_errc::pool_not_running
  186. : client_errc::cancelled,
  187. ConnectionWrapper()
  188. );
  189. return;
  190. }
  191. // Try to get a connection without blocking
  192. if (!obj_->shared_st_.idle_list.empty())
  193. {
  194. // There was a connection. Done.
  195. complete_success(self, obj_->shared_st_.idle_list.front());
  196. return;
  197. }
  198. // No luck. If there is room for more connections, create one.
  199. // Don't create new connections if we have other connections pending
  200. // (i.e. being connected, reset... ) - otherwise pool size increases for
  201. // no reason when there is no connectivity.
  202. if (obj_->all_conns_.size() < obj_->params_.max_size &&
  203. obj_->shared_st_.num_pending_connections == 0u)
  204. {
  205. obj_->create_connection();
  206. }
  207. // Allocate a timer to perform waits.
  208. if (!timer_)
  209. {
  210. timer_.reset(new timer_block_type(obj_->ex_));
  211. obj_->shared_st_.pending_requests.push_back(*timer_);
  212. }
  213. // Wait to be notified, or until a timeout happens
  214. timer_->timer.expires_at(timeout_);
  215. BOOST_MYSQL_YIELD(resume_point_, 2, timer_->timer.async_wait(std::move(self)))
  216. stored_ec_ = ec;
  217. // If the token passed to async_run had a bound executor,
  218. // the handler will be invoked within that executor.
  219. // Dispatch so we run within the pool's executor.
  220. BOOST_MYSQL_YIELD(resume_point_, 3, asio::dispatch(obj_->ex_, std::move(self)))
  221. if (!stored_ec_)
  222. {
  223. // We've got a timeout. Try to give as much info as possible
  224. do_complete(self, obj_->get_diagnostics(diag_), ConnectionWrapper());
  225. return;
  226. }
  227. }
  228. }
  229. }
  230. };
  231. public:
  232. basic_pool_impl(pool_executor_params&& ex_params, pool_params&& params)
  233. : params_(make_internal_pool_params(std::move(params))),
  234. ex_(std::move(ex_params.pool_executor)),
  235. conn_ex_(std::move(ex_params.connection_executor)),
  236. wait_gp_(ex_),
  237. cancel_timer_(ex_, (std::chrono::steady_clock::time_point::max)())
  238. {
  239. }
  240. using executor_type = asio::any_io_executor;
  241. executor_type get_executor() { return ex_; }
  242. template <class CompletionToken>
  243. BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, void(error_code))
  244. async_run(CompletionToken&& token)
  245. {
  246. return asio::async_compose<CompletionToken, void(error_code)>(
  247. run_op(shared_from_this_wrapper()),
  248. token,
  249. ex_
  250. );
  251. }
  252. // Not thread-safe
  253. void cancel_unsafe() { cancel_timer_.expires_at((std::chrono::steady_clock::time_point::min)()); }
  254. template <class CompletionToken>
  255. BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, void(error_code, ConnectionWrapper))
  256. async_get_connection(
  257. std::chrono::steady_clock::time_point timeout,
  258. diagnostics* diag,
  259. CompletionToken&& token
  260. )
  261. {
  262. return asio::async_compose<CompletionToken, void(error_code, ConnectionWrapper)>(
  263. get_connection_op(shared_from_this_wrapper(), timeout, diag),
  264. token,
  265. ex_
  266. );
  267. }
  268. template <class CompletionToken>
  269. BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, void(error_code, ConnectionWrapper))
  270. async_get_connection(
  271. std::chrono::steady_clock::duration timeout,
  272. diagnostics* diag,
  273. CompletionToken&& token
  274. )
  275. {
  276. return async_get_connection(
  277. timeout.count() > 0 ? std::chrono::steady_clock::now() + timeout
  278. : (std::chrono::steady_clock::time_point::max)(),
  279. diag,
  280. std::forward<CompletionToken>(token)
  281. );
  282. }
  283. // Exposed for testing
  284. std::list<node_type>& nodes() noexcept { return all_conns_; }
  285. shared_state_type& shared_state() noexcept { return shared_st_; }
  286. internal_pool_params& params() noexcept { return params_; }
  287. asio::any_io_executor connection_ex() noexcept { return conn_ex_; }
  288. const pipeline_request& reset_pipeline_request() const { return reset_pipeline_req_; }
  289. };
  290. } // namespace detail
  291. } // namespace mysql
  292. } // namespace boost
  293. #endif