123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768 |
- #ifndef __ASIO2_TCP_CLIENT_HPP__
- #define __ASIO2_TCP_CLIENT_HPP__
- #if defined(_MSC_VER) && (_MSC_VER >= 1200)
- #pragma once
- #endif
- #include <asio2/base/detail/push_options.hpp>
- #include <asio2/base/client.hpp>
- #include <asio2/tcp/impl/tcp_keepalive_cp.hpp>
- #include <asio2/tcp/impl/tcp_send_op.hpp>
- #include <asio2/tcp/impl/tcp_recv_op.hpp>
- namespace asio2::detail
- {
- struct template_args_tcp_client : public tcp_tag
- {
- static constexpr bool is_session = false;
- static constexpr bool is_client = true;
- static constexpr bool is_server = false;
- using socket_t = asio::ip::tcp::socket;
- using buffer_t = asio::streambuf;
- using send_data_t = std::string_view;
- using recv_data_t = std::string_view;
- };
- ASIO2_CLASS_FORWARD_DECLARE_BASE;
- ASIO2_CLASS_FORWARD_DECLARE_TCP_BASE;
- ASIO2_CLASS_FORWARD_DECLARE_TCP_CLIENT;
- template<class derived_t, class args_t = template_args_tcp_client>
- class tcp_client_impl_t
- : public client_impl_t <derived_t, args_t>
- , public tcp_keepalive_cp <derived_t, args_t>
- , public tcp_send_op <derived_t, args_t>
- , public tcp_recv_op <derived_t, args_t>
- , public tcp_tag
- {
- ASIO2_CLASS_FRIEND_DECLARE_BASE;
- ASIO2_CLASS_FRIEND_DECLARE_TCP_BASE;
- ASIO2_CLASS_FRIEND_DECLARE_TCP_CLIENT;
- public:
- using super = client_impl_t <derived_t, args_t>;
- using self = tcp_client_impl_t<derived_t, args_t>;
- using args_type = args_t;
- using buffer_type = typename args_t::buffer_t;
- using send_data_t = typename args_t::send_data_t;
- using recv_data_t = typename args_t::recv_data_t;
- public:
-
- explicit tcp_client_impl_t(
- std::size_t init_buf_size = tcp_frame_size,
- std::size_t max_buf_size = max_buffer_size,
- std::size_t concurrency = 1
- )
- : super(init_buf_size, max_buf_size, concurrency)
- , tcp_keepalive_cp<derived_t, args_t>()
- , tcp_send_op <derived_t, args_t>()
- , tcp_recv_op <derived_t, args_t>()
- {
- this->set_connect_timeout(std::chrono::milliseconds(tcp_connect_timeout));
- }
- template<class Scheduler, std::enable_if_t<!std::is_integral_v<detail::remove_cvref_t<Scheduler>>, int> = 0>
- explicit tcp_client_impl_t(
- std::size_t init_buf_size,
- std::size_t max_buf_size,
- Scheduler&& scheduler
- )
- : super(init_buf_size, max_buf_size, std::forward<Scheduler>(scheduler))
- , tcp_keepalive_cp<derived_t, args_t>()
- , tcp_send_op <derived_t, args_t>()
- , tcp_recv_op <derived_t, args_t>()
- {
- this->set_connect_timeout(std::chrono::milliseconds(tcp_connect_timeout));
- }
- template<class Scheduler, std::enable_if_t<!std::is_integral_v<detail::remove_cvref_t<Scheduler>>, int> = 0>
- explicit tcp_client_impl_t(Scheduler&& scheduler)
- : tcp_client_impl_t(tcp_frame_size, max_buffer_size, std::forward<Scheduler>(scheduler))
- {
- }
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- ~tcp_client_impl_t()
- {
- this->stop();
- }
-
- template<typename String, typename StrOrInt, typename... Args>
- inline bool start(String&& host, StrOrInt&& port, Args&&... args)
- {
- return this->derived().template _do_connect<false>(
- std::forward<String>(host), std::forward<StrOrInt>(port),
- ecs_helper::make_ecs(asio::transfer_at_least(1), std::forward<Args>(args)...));
- }
-
- template<typename String, typename StrOrInt, typename... Args>
- inline bool async_start(String&& host, StrOrInt&& port, Args&&... args)
- {
- return this->derived().template _do_connect<true>(
- std::forward<String>(host), std::forward<StrOrInt>(port),
- ecs_helper::make_ecs(asio::transfer_at_least(1), std::forward<Args>(args)...));
- }
-
- inline void stop()
- {
- if (this->is_iopool_stopped())
- return;
- derived_t& derive = this->derived();
- derive.io_->unregobj(&derive);
-
- std::promise<state_t> promise;
- std::future<state_t> future = promise.get_future();
-
- detail::defer_event pg
- {
- [this, p = std::move(promise)]() mutable
- {
- p.set_value(this->state_.load());
- }
- };
-
- derive.post_event([&derive, this_ptr = derive.selfptr(), pg = std::move(pg)]
- (event_queue_guard<derived_t> g) mutable
- {
-
- derive._stop_reconnect_timer();
- derive._do_disconnect(asio::error::operation_aborted, derive.selfptr(), defer_event
- {
- [&derive, this_ptr = std::move(this_ptr), pg = std::move(pg)]
- (event_queue_guard<derived_t> g) mutable
- {
- derive._do_stop(asio::error::operation_aborted, std::move(this_ptr), defer_event
- {
- [pg = std::move(pg)](event_queue_guard<derived_t> g) mutable
- {
- detail::ignore_unused(pg, g);
-
-
-
- {
- [[maybe_unused]] detail::defer_event t{ std::move(pg) };
- }
- }, std::move(g)
- });
- }, std::move(g)
- });
- });
-
-
- while (!derive.running_in_this_thread())
- {
- std::future_status status = future.wait_for(std::chrono::milliseconds(100));
- if (status == std::future_status::ready)
- {
- ASIO2_ASSERT(future.get() == state_t::stopped);
- break;
- }
- else
- {
- if (derive.get_thread_id() == std::thread::id{})
- break;
- if (derive.io_->context().stopped())
- break;
- }
- }
- this->stop_iopool();
- }
- public:
-
- template<class F, class ...C>
- inline derived_t & bind_recv(F&& fun, C&&... obj)
- {
- this->listener_.bind(event_type::recv,
- observer_t<std::string_view>(std::forward<F>(fun), std::forward<C>(obj)...));
- return (this->derived());
- }
-
- template<class F, class ...C>
- inline derived_t & bind_connect(F&& fun, C&&... obj)
- {
- this->listener_.bind(event_type::connect,
- observer_t<>(std::forward<F>(fun), std::forward<C>(obj)...));
- return (this->derived());
- }
-
- template<class F, class ...C>
- inline derived_t & bind_disconnect(F&& fun, C&&... obj)
- {
- this->listener_.bind(event_type::disconnect,
- observer_t<>(std::forward<F>(fun), std::forward<C>(obj)...));
- return (this->derived());
- }
-
- template<class F, class ...C>
- inline derived_t & bind_init(F&& fun, C&&... obj)
- {
- this->listener_.bind(event_type::init,
- observer_t<>(std::forward<F>(fun), std::forward<C>(obj)...));
- return (this->derived());
- }
- protected:
- template<bool IsAsync, typename String, typename StrOrInt, typename C>
- inline bool _do_connect(String&& host, StrOrInt&& port, std::shared_ptr<ecs_t<C>> ecs)
- {
- derived_t& derive = this->derived();
-
-
- #if defined(ASIO2_ENABLE_LOG)
- asio2::detail::get_logger();
- #endif
- this->start_iopool();
- if (!this->is_iopool_started())
- {
- set_last_error(asio::error::operation_aborted);
- return false;
- }
- asio::dispatch(derive.io_->context(), [&derive, this_ptr = derive.selfptr()]() mutable
- {
- detail::ignore_unused(this_ptr);
-
- derive.io_->init_thread_id();
- });
-
- std::promise<error_code> promise;
- std::future<error_code> future = promise.get_future();
-
- detail::defer_event pg
- {
- [promise = std::move(promise)]() mutable
- {
- promise.set_value(get_last_error());
- }
- };
-
- derive.post_event(
- [this, this_ptr = derive.selfptr(), ecs = std::move(ecs),
- host = std::forward<String>(host), port = std::forward<StrOrInt>(port), pg = std::move(pg)]
- (event_queue_guard<derived_t> g) mutable
- {
- derived_t& derive = this->derived();
- defer_event chain
- {
- [pg = std::move(pg)] (event_queue_guard<derived_t> g) mutable
- {
- detail::ignore_unused(pg, g);
-
-
-
- {
- [[maybe_unused]] detail::defer_event t{ std::move(pg) };
- }
- }, std::move(g)
- };
- state_t expected = state_t::stopped;
- if (!derive.state_.compare_exchange_strong(expected, state_t::starting))
- {
-
- set_last_error(asio::error::already_started);
- return;
- }
-
- derive.ecs_ = ecs;
- clear_last_error();
- derive.io_->regobj(&derive);
- #if defined(_DEBUG) || defined(DEBUG)
- this->is_stop_reconnect_timer_called_ = false;
- this->is_post_reconnect_timer_called_ = false;
- this->is_stop_connect_timeout_timer_called_ = false;
- this->is_disconnect_called_ = false;
- #endif
-
- this->host_ = detail::to_string(std::move(host));
- this->port_ = detail::to_string(std::move(port));
- super::start();
- derive._do_init(ecs);
-
- derive._rdc_init(ecs);
- derive._socks5_init(ecs);
- derive.template _start_connect<IsAsync>(std::move(this_ptr), std::move(ecs), std::move(chain));
- });
- if constexpr (IsAsync)
- {
- set_last_error(asio::error::in_progress);
- return true;
- }
- else
- {
- if (!derive.io_->running_in_this_thread())
- {
- set_last_error(future.get());
-
-
-
-
-
- return static_cast<bool>(!get_last_error());
- }
- else
- {
- set_last_error(asio::error::in_progress);
- }
-
-
-
-
- return derive.is_started();
- }
- }
- template<typename C>
- inline void _do_init(std::shared_ptr<ecs_t<C>>&) noexcept
- {
- #if defined(ASIO2_ENABLE_LOG)
-
- static_assert(tcp_send_op<derived_t, args_t>::template has_member_dgram<self>::value,
- "The behavior of different compilers is not consistent");
- #endif
- if constexpr (std::is_same_v<typename ecs_t<C>::condition_lowest_type, use_dgram_t>)
- this->dgram_ = true;
- else
- this->dgram_ = false;
- }
- template<typename C, typename DeferEvent>
- inline void _do_start(
- std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
- {
- this->derived().update_alive_time();
- this->derived().reset_connect_time();
- this->derived()._start_recv(std::move(this_ptr), std::move(ecs), std::move(chain));
- }
- template<typename DeferEvent>
- inline void _handle_disconnect(const error_code& ec, std::shared_ptr<derived_t> this_ptr, DeferEvent chain)
- {
- ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
- ASIO2_ASSERT(this->state_ == state_t::stopped);
- ASIO2_LOG_DEBUG("tcp_client::_handle_disconnect: {} {}", ec.value(), ec.message());
- this->derived()._rdc_stop();
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- if (this->socket().is_open())
- {
- error_code ec_linger{}, ec_ignore{};
- asio::socket_base::linger lnger{};
- this->socket().lowest_layer().get_option(lnger, ec_linger);
-
-
-
- if (!ec_linger && !(lnger.enabled() == true && lnger.timeout() == 0))
- {
- this->socket().shutdown(asio::socket_base::shutdown_both, ec_ignore);
- }
-
-
-
- this->socket().cancel(ec_ignore);
-
- this->socket().close(ec_ignore);
- }
- super::_handle_disconnect(ec, std::move(this_ptr), std::move(chain));
- }
- template<typename DeferEvent>
- inline void _do_stop(const error_code& ec, std::shared_ptr<derived_t> this_ptr, DeferEvent chain)
- {
-
-
-
- ASIO2_ASSERT(this->state_ == state_t::stopped);
- this->derived()._post_stop(ec, std::move(this_ptr), std::move(chain));
- }
- template<typename DeferEvent>
- inline void _post_stop(const error_code& ec, std::shared_ptr<derived_t> this_ptr, DeferEvent chain)
- {
-
- this->derived().disp_event([this, ec, this_ptr = std::move(this_ptr), e = chain.move_event()]
- (event_queue_guard<derived_t> g) mutable
- {
- set_last_error(ec);
- defer_event chain(std::move(e), std::move(g));
-
- super::stop();
-
- this->derived()._handle_stop(ec, std::move(this_ptr), std::move(chain));
- }, chain.move_guard());
- }
- template<typename DeferEvent>
- inline void _handle_stop(const error_code& ec, std::shared_ptr<derived_t> this_ptr, DeferEvent chain)
- {
- detail::ignore_unused(ec, this_ptr, chain);
- this->derived()._socks5_stop();
- ASIO2_ASSERT(this->state_ == state_t::stopped);
- }
- template<typename C, typename DeferEvent>
- inline void _start_recv(
- std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
- {
-
- asio::dispatch(this->derived().io_->context(), make_allocator(this->derived().wallocator(),
- [this, this_ptr = std::move(this_ptr), ecs = std::move(ecs), chain = std::move(chain)]
- () mutable
- {
- using condition_lowest_type = typename ecs_t<C>::condition_lowest_type;
- detail::ignore_unused(chain);
- if constexpr (!std::is_same_v<condition_lowest_type, asio2::detail::hook_buffer_t>)
- {
- this->derived().buffer().consume(this->derived().buffer().size());
- }
- else
- {
- std::ignore = true;
- }
- this->derived()._post_recv(std::move(this_ptr), std::move(ecs));
- }));
- }
- template<class Data, class Callback>
- inline bool _do_send(Data& data, Callback&& callback)
- {
- return this->derived()._tcp_send(data, std::forward<Callback>(callback));
- }
- template<class Data>
- inline send_data_t _rdc_convert_to_send_data(Data& data) noexcept
- {
- auto buffer = asio::buffer(data);
- return send_data_t{ reinterpret_cast<
- std::string_view::const_pointer>(buffer.data()),buffer.size() };
- }
- template<class Invoker>
- inline void _rdc_invoke_with_none(const error_code& ec, Invoker& invoker)
- {
- if (invoker)
- invoker(ec, send_data_t{}, recv_data_t{});
- }
- template<class Invoker>
- inline void _rdc_invoke_with_recv(const error_code& ec, Invoker& invoker, recv_data_t data)
- {
- if (invoker)
- invoker(ec, send_data_t{}, data);
- }
- template<class Invoker>
- inline void _rdc_invoke_with_send(const error_code& ec, Invoker& invoker, send_data_t data)
- {
- if (invoker)
- invoker(ec, data, recv_data_t{});
- }
- protected:
- template<typename C>
- inline void _post_recv(std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
- {
- this->derived()._tcp_post_recv(std::move(this_ptr), std::move(ecs));
- }
- template<typename C>
- inline void _handle_recv(const error_code & ec, std::size_t bytes_recvd,
- std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
- {
- this->derived()._tcp_handle_recv(ec, bytes_recvd, std::move(this_ptr), std::move(ecs));
- }
- inline void _fire_init()
- {
-
- ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
- ASIO2_ASSERT(!get_last_error());
- this->listener_.notify(event_type::init);
- }
- template<typename C>
- inline void _fire_recv(
- std::shared_ptr<derived_t>& this_ptr, std::shared_ptr<ecs_t<C>>& ecs, std::string_view data)
- {
- data = detail::call_data_filter_before_recv(this->derived(), data);
- this->listener_.notify(event_type::recv, data);
- this->derived()._rdc_handle_recv(this_ptr, ecs, data);
- }
- template<typename C>
- inline void _fire_connect(std::shared_ptr<derived_t>& this_ptr, std::shared_ptr<ecs_t<C>>& ecs)
- {
-
- ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
- #if defined(_DEBUG) || defined(DEBUG)
- ASIO2_ASSERT(this->is_disconnect_called_ == false);
- #endif
- if (!get_last_error())
- {
- this->derived()._rdc_start(this_ptr, ecs);
- }
- this->listener_.notify(event_type::connect);
- }
- inline void _fire_disconnect(std::shared_ptr<derived_t>& this_ptr)
- {
-
- ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
- #if defined(_DEBUG) || defined(DEBUG)
- this->is_disconnect_called_ = true;
- #endif
- detail::ignore_unused(this_ptr);
- this->listener_.notify(event_type::disconnect);
- }
- protected:
- bool dgram_ = false;
- #if defined(_DEBUG) || defined(DEBUG)
- bool is_disconnect_called_ = false;
- #endif
- };
- }
- namespace asio2
- {
- using tcp_client_args = detail::template_args_tcp_client;
- template<class derived_t, class args_t>
- using tcp_client_impl_t = detail::tcp_client_impl_t<derived_t, args_t>;
-
- template<class derived_t>
- class tcp_client_t : public detail::tcp_client_impl_t<derived_t, detail::template_args_tcp_client>
- {
- public:
- using detail::tcp_client_impl_t<derived_t, detail::template_args_tcp_client>::tcp_client_impl_t;
- };
-
- class tcp_client : public tcp_client_t<tcp_client>
- {
- public:
- using tcp_client_t<tcp_client>::tcp_client_t;
- };
- }
- #if defined(ASIO2_INCLUDE_RATE_LIMIT)
- #include <asio2/tcp/tcp_stream.hpp>
- namespace asio2
- {
- struct tcp_rate_client_args : public tcp_client_args
- {
- using socket_t = asio2::tcp_stream<asio2::simple_rate_policy>;
- };
- template<class derived_t>
- class tcp_rate_client_t : public asio2::tcp_client_impl_t<derived_t, tcp_rate_client_args>
- {
- public:
- using asio2::tcp_client_impl_t<derived_t, tcp_rate_client_args>::tcp_client_impl_t;
- };
- class tcp_rate_client : public asio2::tcp_rate_client_t<tcp_rate_client>
- {
- public:
- using asio2::tcp_rate_client_t<tcp_rate_client>::tcp_rate_client_t;
- };
- }
- #endif
- #include <asio2/base/detail/pop_options.hpp>
- #endif
|