123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- //
- // Copyright (c) 2019-2024 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
- //
- // Distributed under the Boost Software License, Version 1.0. (See accompanying
- // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
- //
- #ifndef BOOST_MYSQL_IMPL_INTERNAL_CONNECTION_POOL_CONNECTION_NODE_HPP
- #define BOOST_MYSQL_IMPL_INTERNAL_CONNECTION_POOL_CONNECTION_NODE_HPP
- #include <boost/mysql/any_connection.hpp>
- #include <boost/mysql/character_set.hpp>
- #include <boost/mysql/client_errc.hpp>
- #include <boost/mysql/diagnostics.hpp>
- #include <boost/mysql/error_code.hpp>
- #include <boost/mysql/pipeline.hpp>
- #include <boost/mysql/detail/connection_pool_fwd.hpp>
- #include <boost/mysql/impl/internal/connection_pool/internal_pool_params.hpp>
- #include <boost/mysql/impl/internal/connection_pool/run_with_timeout.hpp>
- #include <boost/mysql/impl/internal/connection_pool/sansio_connection_node.hpp>
- #include <boost/mysql/impl/internal/connection_pool/timer_list.hpp>
- #include <boost/asio/any_io_executor.hpp>
- #include <boost/asio/compose.hpp>
- #include <boost/asio/deferred.hpp>
- #include <boost/asio/steady_timer.hpp>
- #include <boost/intrusive/list.hpp>
- #include <boost/intrusive/list_hook.hpp>
- #include <chrono>
- #include <utility>
- #include <vector>
- namespace boost {
- namespace mysql {
- namespace detail {
- // Traits to use by default for nodes. Templating on traits provides
- // a way to mock dependencies in tests. Production code only uses
- // instantiations that use io_traits.
- // Having this as a traits type (as opposed to individual template params)
- // allows us to forward-declare io_traits without having to include steady_timer
- struct io_traits
- {
- using connection_type = any_connection;
- using timer_type = asio::steady_timer;
- };
- // State shared between connection tasks
- template <class IoTraits>
- struct conn_shared_state
- {
- intrusive::list<basic_connection_node<IoTraits>> idle_list;
- timer_list<typename IoTraits::timer_type> pending_requests;
- std::size_t num_pending_connections{0};
- error_code last_ec;
- diagnostics last_diag;
- };
- // The templated type is never exposed to the user. We template
- // so tests can inject mocks.
- template <class IoTraits>
- class basic_connection_node : public intrusive::list_base_hook<>,
- public sansio_connection_node<basic_connection_node<IoTraits>>
- {
- using this_type = basic_connection_node<IoTraits>;
- using connection_type = typename IoTraits::connection_type;
- using timer_type = typename IoTraits::timer_type;
- // Not thread-safe, must be manipulated within the pool's executor
- const internal_pool_params* params_;
- conn_shared_state<IoTraits>* shared_st_;
- connection_type conn_;
- timer_type timer_;
- diagnostics connect_diag_;
- timer_type collection_timer_; // Notifications about collections. A separate timer makes potential race
- // conditions not harmful
- const pipeline_request* reset_pipeline_req_;
- std::vector<stage_response> reset_pipeline_res_;
- // Thread-safe
- std::atomic<collection_state> collection_state_{collection_state::none};
- // Hooks for sansio_connection_node
- friend class sansio_connection_node<basic_connection_node<IoTraits>>;
- void entering_idle()
- {
- shared_st_->idle_list.push_back(*this);
- shared_st_->pending_requests.notify_one();
- }
- void exiting_idle() { shared_st_->idle_list.erase(shared_st_->idle_list.iterator_to(*this)); }
- void entering_pending() { ++shared_st_->num_pending_connections; }
- void exiting_pending() { --shared_st_->num_pending_connections; }
- // Helpers
- void propagate_connect_diag(error_code ec)
- {
- shared_st_->last_ec = ec;
- shared_st_->last_diag = connect_diag_;
- }
- struct connection_task_op
- {
- this_type& node_;
- next_connection_action last_act_{next_connection_action::none};
- connection_task_op(this_type& node) noexcept : node_(node) {}
- template <class Self>
- void operator()(Self& self, error_code ec = {})
- {
- // A collection status may be generated by idle_wait actions
- auto col_st = last_act_ == next_connection_action::idle_wait
- ? node_.collection_state_.exchange(collection_state::none)
- : collection_state::none;
- // Connect actions should set the shared diagnostics, so these
- // get reported to the user
- if (last_act_ == next_connection_action::connect)
- node_.propagate_connect_diag(ec);
- // Invoke the sans-io algorithm
- last_act_ = node_.resume(ec, col_st);
- // Apply the next action. run_with_timeout makes sure that all handlers
- // are dispatched using the timer's executor (that is, the pool executor)
- switch (last_act_)
- {
- case next_connection_action::connect:
- run_with_timeout(
- node_.conn_
- .async_connect(node_.params_->connect_config, node_.connect_diag_, asio::deferred),
- node_.timer_,
- node_.params_->connect_timeout,
- std::move(self)
- );
- break;
- case next_connection_action::sleep_connect_failed:
- node_.timer_.expires_after(node_.params_->retry_interval);
- node_.timer_.async_wait(std::move(self));
- break;
- case next_connection_action::ping:
- run_with_timeout(
- node_.conn_.async_ping(asio::deferred),
- node_.timer_,
- node_.params_->ping_timeout,
- std::move(self)
- );
- break;
- case next_connection_action::reset:
- run_with_timeout(
- node_.conn_.async_run_pipeline(
- *node_.reset_pipeline_req_,
- node_.reset_pipeline_res_,
- asio::deferred
- ),
- node_.timer_,
- node_.params_->ping_timeout,
- std::move(self)
- );
- break;
- case next_connection_action::idle_wait:
- run_with_timeout(
- node_.collection_timer_.async_wait(asio::deferred),
- node_.timer_,
- node_.params_->ping_interval,
- std::move(self)
- );
- break;
- case next_connection_action::none: self.complete(error_code()); break;
- default: BOOST_ASSERT(false);
- }
- }
- };
- public:
- basic_connection_node(
- internal_pool_params& params,
- boost::asio::any_io_executor ex,
- boost::asio::any_io_executor conn_ex,
- conn_shared_state<IoTraits>& shared_st,
- const pipeline_request* reset_pipeline_req
- )
- : params_(¶ms),
- shared_st_(&shared_st),
- conn_(std::move(conn_ex), params.make_ctor_params()),
- timer_(ex),
- collection_timer_(ex, (std::chrono::steady_clock::time_point::max)()),
- reset_pipeline_req_(reset_pipeline_req)
- {
- }
- void cancel()
- {
- sansio_connection_node<this_type>::cancel();
- timer_.cancel();
- collection_timer_.cancel();
- }
- // This initiation must be invoked within the pool's executor
- template <class CompletionToken>
- auto async_run(CompletionToken&& token
- ) -> decltype(asio::async_compose<CompletionToken, void(error_code)>(connection_task_op{*this}, token))
- {
- return asio::async_compose<CompletionToken, void(error_code)>(connection_task_op{*this}, token);
- }
- connection_type& connection() noexcept { return conn_; }
- const connection_type& connection() const noexcept { return conn_; }
- // Not thread-safe, must be called within the pool's executor
- void notify_collectable() { collection_timer_.cancel(); }
- // Thread-safe. May be safely be called from any thread.
- void mark_as_collectable(bool should_reset) noexcept
- {
- collection_state_.store(
- should_reset ? collection_state::needs_collect_with_reset : collection_state::needs_collect
- );
- }
- // Exposed for testing
- collection_state get_collection_state() const noexcept { return collection_state_; }
- };
- } // namespace detail
- } // namespace mysql
- } // namespace boost
- #endif
|