123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665 |
- #ifndef __ASIO2_CONNECT_COMPONENT_HPP__
- #define __ASIO2_CONNECT_COMPONENT_HPP__
- #if defined(_MSC_VER) && (_MSC_VER >= 1200)
- #pragma once
- #endif
- #include <memory>
- #include <future>
- #include <utility>
- #include <string_view>
- #include <asio2/base/iopool.hpp>
- #include <asio2/base/listener.hpp>
- #include <asio2/base/detail/ecs.hpp>
- #include <asio2/base/detail/keepalive_options.hpp>
- #include <asio2/base/impl/event_queue_cp.hpp>
- namespace asio2::detail
- {
- template<class SocketT>
- class run_connect_op : public asio::coroutine
- {
- public:
- using socket_t = SocketT;
- using decay_socket_t = typename std::remove_cv_t<std::remove_reference_t<socket_t>>;
- using lowest_layer_t = typename decay_socket_t::lowest_layer_type;
- using resolver_type = typename asio::ip::basic_resolver<typename lowest_layer_t::protocol_type>;
- using endpoints_type = typename resolver_type::results_type;
- using endpoints_iterator = typename endpoints_type::iterator;
- std::string host_{}, port_{};
- SocketT& socket_;
- std::unique_ptr<resolver_type> resolver_ptr;
- std::unique_ptr<endpoints_type> endpoints_ptr;
- endpoints_iterator iter;
- template<class SKT>
- run_connect_op(std::string host, std::string port, SKT& skt)
- : host_ (std::move(host))
- , port_ (std::move(port))
- , socket_ (skt)
- {
- resolver_ptr = std::make_unique<resolver_type>(socket_.get_executor());
- }
- template <typename Self>
- void operator()(Self& self, error_code ec = {}, endpoints_type endpoints = {})
- {
- detail::ignore_unused(ec, endpoints);
- ASIO_CORO_REENTER(*this)
- {
- ASIO_CORO_YIELD
- resolver_ptr->async_resolve(host_, port_, std::move(self));
- if (ec)
- goto end;
- endpoints_ptr = std::make_unique<endpoints_type>(std::move(endpoints));
- iter = endpoints_ptr->begin();
- loop:
- ASIO_CORO_YIELD
- socket_.async_connect(iter->endpoint(), std::move(self));
- if (!ec)
- goto end;
- iter++;
- if (iter == endpoints_ptr->end())
- {
- ec = asio::error::host_unreachable;
- goto end;
- }
- else
- {
- goto loop;
- }
- end:
- self.complete(ec);
- }
- }
- };
-
- template<class SKT>
- run_connect_op(std::string, std::string, SKT&) -> run_connect_op<SKT>;
- }
- namespace asio2
- {
-
- template <typename SocketT, typename CompletionToken>
- auto async_connect(
- std::string host, std::string port, SocketT& socket, CompletionToken&& token)
- -> decltype(asio::async_compose<CompletionToken, void(asio::error_code)>(
- std::declval<detail::run_connect_op<SocketT>>(), token, socket))
- {
- return asio::async_compose<CompletionToken, void(asio::error_code)>(
- detail::run_connect_op<SocketT>{
- std::move(host), std::move(port), socket},
- token, socket);
- }
- }
- namespace asio2::detail
- {
- template<class derived_t, class args_t, bool IsSession>
- class connect_cp_member_variables;
- template<class derived_t, class args_t>
- class connect_cp_member_variables<derived_t, args_t, true>
- {
- };
- template<class derived_t, class args_t>
- class connect_cp_member_variables<derived_t, args_t, false>
- {
- public:
-
- template<typename String>
- inline derived_t& set_host(String&& host)
- {
- this->host_ = detail::to_string(std::forward<String>(host));
- return (static_cast<derived_t&>(*this));
- }
-
- template<typename StrOrInt>
- inline derived_t& set_port(StrOrInt&& port)
- {
- this->port_ = detail::to_string(std::forward<StrOrInt>(port));
- return (static_cast<derived_t&>(*this));
- }
-
- inline const std::string& get_host() noexcept
- {
- return this->host_;
- }
-
- inline const std::string& get_host() const noexcept
- {
- return this->host_;
- }
-
- inline const std::string& get_port() noexcept
- {
- return this->port_;
- }
-
- inline const std::string& get_port() const noexcept
- {
- return this->port_;
- }
- protected:
-
- std::string host_, port_;
- };
-
- template<class derived_t, class args_t>
- class connect_cp : public connect_cp_member_variables<derived_t, args_t, args_t::is_session>
- {
- public:
- using socket_t = typename args_t::socket_t;
- using decay_socket_t = typename std::remove_cv_t<std::remove_reference_t<socket_t>>;
- using lowest_layer_t = typename decay_socket_t::lowest_layer_type;
- using resolver_type = typename asio::ip::basic_resolver<typename lowest_layer_t::protocol_type>;
- using endpoints_type = typename resolver_type::results_type;
- using endpoints_iterator = typename endpoints_type::iterator;
- using self = connect_cp<derived_t, args_t>;
- public:
-
- connect_cp() noexcept {}
-
- ~connect_cp() = default;
- protected:
- template<bool IsAsync, typename C, typename DeferEvent, bool IsSession = args_t::is_session>
- inline typename std::enable_if_t<!IsSession, void>
- _start_connect(std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
- {
- derived_t& derive = static_cast<derived_t&>(*this);
- ASIO2_ASSERT(derive.io_->running_in_this_thread());
- clear_last_error();
- #if defined(_DEBUG) || defined(DEBUG)
- derive.is_stop_reconnect_timer_called_ = false;
- derive.is_stop_connect_timeout_timer_called_ = false;
- derive.is_disconnect_called_ = false;
- #endif
- state_t expected = state_t::starting;
- if (!derive.state_.compare_exchange_strong(expected, state_t::starting))
- {
- ASIO2_ASSERT(false);
- derive._handle_connect(asio::error::operation_aborted,
- std::move(this_ptr), std::move(ecs), std::move(chain));
- return;
- }
- derive._make_reconnect_timer(this_ptr, ecs);
-
- derive._make_connect_timeout_timer(this_ptr, derive.get_connect_timeout());
- derive._post_resolve(std::move(this_ptr), std::move(ecs), std::move(chain));
- }
- template<typename C>
- std::string_view _get_real_host(std::shared_ptr<derived_t>&, std::shared_ptr<ecs_t<C>>& ecs)
- {
- if constexpr (ecs_helper::has_socks5<C>())
- {
- auto sock5 = ecs->get_component().socks5_option(std::in_place);
- return sock5->host();
- }
- else
- {
- return this->host_;
- }
- }
- template<typename C>
- std::string_view _get_real_port(std::shared_ptr<derived_t>&, std::shared_ptr<ecs_t<C>>& ecs)
- {
- if constexpr (ecs_helper::has_socks5<C>())
- {
- auto sock5 = ecs->get_component().socks5_option(std::in_place);
- return sock5->port();
- }
- else
- {
- return this->port_;
- }
- }
- template<typename C, typename DeferEvent, bool IsSession = args_t::is_session>
- inline typename std::enable_if_t<!IsSession, void>
- _post_resolve(std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
- {
- derived_t& derive = static_cast<derived_t&>(*this);
- std::string_view h = derive._get_real_host(this_ptr, ecs);
- std::string_view p = derive._get_real_port(this_ptr, ecs);
-
- std::unique_ptr<resolver_type> resolver_ptr = std::make_unique<resolver_type>(
- derive.io_->context());
- resolver_type* resolver_rptr = resolver_ptr.get();
-
-
- resolver_rptr->async_resolve(h, p,
- [&derive, this_ptr = std::move(this_ptr), ecs = std::move(ecs),
- resolver_ptr = std::move(resolver_ptr), chain = std::move(chain)]
- (error_code ec, endpoints_type endpoints) mutable
- {
-
-
- if (!derive.connect_timeout_timer_)
- {
- ec = asio::error::timed_out;
- }
- std::unique_ptr<endpoints_type> eps = std::make_unique<endpoints_type>(std::move(endpoints));
- endpoints_type* p = eps.get();
- if (ec)
- derive._handle_connect(ec,
- std::move(this_ptr), std::move(ecs), std::move(chain));
- else
- derive._post_connect(ec, std::move(eps), p->begin(),
- std::move(this_ptr), std::move(ecs), std::move(chain));
- });
- }
- template<typename C, typename DeferEvent, bool IsSession = args_t::is_session>
- typename std::enable_if_t<!IsSession, void>
- inline _post_connect(
- error_code ec, std::unique_ptr<endpoints_type> eps, endpoints_iterator iter,
- std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
- {
- derived_t& derive = static_cast<derived_t&>(*this);
- ASIO2_ASSERT(derive.io_->running_in_this_thread());
- state_t expected = state_t::starting;
- if (!derive.state_.compare_exchange_strong(expected, state_t::starting))
- {
-
- derive._handle_connect(asio::error::operation_aborted,
- std::move(this_ptr), std::move(ecs), std::move(chain));
- return;
- }
- if (iter == eps->end())
- {
-
- derive._handle_connect(ec ? ec : asio::error::host_unreachable,
- std::move(this_ptr), std::move(ecs), std::move(chain));
- return;
- }
- auto& socket = derive.socket();
-
-
-
- error_code ec_ignore{};
-
-
- if (socket.is_open())
- {
- auto oldep = socket.local_endpoint(ec_ignore);
- if (ec_ignore || oldep.protocol() != iter->endpoint().protocol())
- {
- socket.cancel(ec_ignore);
- socket.close(ec_ignore);
- }
- }
- if (!socket.is_open())
- {
- socket.open(iter->endpoint().protocol(), ec);
- if (ec)
- {
- derive._handle_connect(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
- return;
- }
-
- socket.set_option(asio::socket_base::reuse_address(true), ec_ignore);
-
- detail::set_keepalive_options(socket);
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- clear_last_error();
-
-
-
-
- derive._do_init(ecs);
-
- derive._fire_init();
- }
- else
- {
- ASIO2_LOG_ERROR("The client socket is opened already.");
- }
-
- socket.async_connect(iter->endpoint(), make_allocator(derive.rallocator(),
- [&derive, this_ptr = std::move(this_ptr), ecs = std::move(ecs), chain = std::move(chain),
- eps = std::move(eps), iter]
- (const error_code & ec) mutable
- {
- if (ec && ec != asio::error::operation_aborted)
- derive._post_connect(ec, std::move(eps), ++iter,
- std::move(this_ptr), std::move(ecs), std::move(chain));
- else
- derive._post_proxy(ec,
- std::move(this_ptr), std::move(ecs), std::move(chain));
- }));
- }
- template<typename C, typename DeferEvent>
- inline void _post_proxy(
- const error_code& ec,
- std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
- {
- set_last_error(ec);
- derived_t& derive = static_cast<derived_t&>(*this);
- ASIO2_ASSERT(derive.io_->running_in_this_thread());
- error_code ec_ignore{};
- auto ep = derive.socket_->lowest_layer().remote_endpoint(ec_ignore);
- if (!ec_ignore)
- {
- derive.remote_endpoint_ = std::move(ep);
- }
- state_t expected = state_t::starting;
- if (!derive.state_.compare_exchange_strong(expected, state_t::starting))
- {
- derive._handle_connect(asio::error::operation_aborted,
- std::move(this_ptr), std::move(ecs), std::move(chain));
- return;
- }
- if constexpr (std::is_base_of_v<component_tag, detail::remove_cvref_t<C>>)
- {
- if (ec)
- {
- return derive._handle_proxy(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
- }
-
-
-
-
-
-
-
-
-
-
-
- if constexpr (C::has_socks5())
- derive._socks5_start(std::move(this_ptr), std::move(ecs), std::move(chain));
- else
- derive._handle_proxy(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
- }
- else
- {
- derive._handle_proxy(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
- }
- }
- template<typename C, typename DeferEvent>
- inline void _handle_proxy(
- const error_code& ec,
- std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
- {
- set_last_error(ec);
- derived_t& derive = static_cast<derived_t&>(*this);
- derive._handle_connect(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
- }
- template<typename C, typename DeferEvent>
- inline void _handle_connect(
- const error_code& ec,
- std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
- {
- set_last_error(ec);
- derived_t& derive = static_cast<derived_t&>(*this);
- if constexpr (args_t::is_session)
- {
- ASIO2_ASSERT(derive.sessions_.io_->running_in_this_thread());
- }
- else
- {
- ASIO2_ASSERT(derive.io_->running_in_this_thread());
- }
- derive._done_connect(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
- }
- template<typename C, typename DeferEvent>
- inline void _done_connect(
- error_code ec, std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
- {
- derived_t& derive = static_cast<derived_t&>(*this);
-
-
- if constexpr (args_t::is_session)
- {
- ASIO2_ASSERT(derive.sessions_.io_->running_in_this_thread());
-
-
- if (!derive.socket().is_open())
- {
- ec = asio::error::timed_out;
- }
- }
- else
- {
- ASIO2_ASSERT(derive.io_->running_in_this_thread());
-
-
-
- if (!derive.connect_timeout_timer_)
- {
- ec = asio::error::timed_out;
- }
- }
- state_t expected;
-
-
- if (!ec)
- {
- expected = state_t::starting;
- if (!derive.state_.compare_exchange_strong(expected, state_t::started))
- ec = asio::error::operation_aborted;
- }
-
- set_last_error(ec);
-
- if constexpr (args_t::is_session)
- {
- ASIO2_ASSERT(derive.sessions_.io_->running_in_this_thread());
- if (!ec)
- {
- expected = state_t::started;
- if (derive.state_.compare_exchange_strong(expected, state_t::started))
- {
- derive._fire_connect(this_ptr, ecs);
- }
- }
- }
-
- else
- {
- ASIO2_ASSERT(derive.io_->running_in_this_thread());
-
- expected = state_t::stopped;
- if (!derive.state_.compare_exchange_strong(expected, state_t::stopped))
- {
- derive._fire_connect(this_ptr, ecs);
- }
- }
- if (!ec)
- {
- expected = state_t::started;
- if (!derive.state_.compare_exchange_strong(expected, state_t::started))
- ec = asio::error::operation_aborted;
- }
-
- derive._stop_connect_timeout_timer();
-
- set_last_error(ec);
- if (ec)
- {
-
-
-
-
-
- {
- [[maybe_unused]] detail::defer_event t{ chain.move_event() };
- }
- derive._do_disconnect(ec, std::move(this_ptr), defer_event(chain.move_guard()));
- return;
- }
- derive._do_start(std::move(this_ptr), std::move(ecs), std::move(chain));
- }
- };
- }
- #endif
|