client.hpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. */
  10. #ifndef __ASIO2_CLIENT_HPP__
  11. #define __ASIO2_CLIENT_HPP__
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. #pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <asio2/base/detail/push_options.hpp>
  16. #include <cstdint>
  17. #include <memory>
  18. #include <chrono>
  19. #include <atomic>
  20. #include <string>
  21. #include <string_view>
  22. #include <asio2/base/iopool.hpp>
  23. #include <asio2/base/log.hpp>
  24. #include <asio2/base/listener.hpp>
  25. #include <asio2/base/define.hpp>
  26. #include <asio2/base/detail/object.hpp>
  27. #include <asio2/base/detail/allocator.hpp>
  28. #include <asio2/base/detail/util.hpp>
  29. #include <asio2/base/detail/buffer_wrap.hpp>
  30. #include <asio2/base/detail/ecs.hpp>
  31. #include <asio2/base/impl/io_context_cp.hpp>
  32. #include <asio2/base/impl/thread_id_cp.hpp>
  33. #include <asio2/base/impl/connect_time_cp.hpp>
  34. #include <asio2/base/impl/alive_time_cp.hpp>
  35. #include <asio2/base/impl/user_data_cp.hpp>
  36. #include <asio2/base/impl/socket_cp.hpp>
  37. #include <asio2/base/impl/connect_cp.hpp>
  38. #include <asio2/base/impl/shutdown_cp.hpp>
  39. #include <asio2/base/impl/close_cp.hpp>
  40. #include <asio2/base/impl/disconnect_cp.hpp>
  41. #include <asio2/base/impl/user_timer_cp.hpp>
  42. #include <asio2/base/impl/post_cp.hpp>
  43. #include <asio2/base/impl/connect_timeout_cp.hpp>
  44. #include <asio2/base/impl/event_queue_cp.hpp>
  45. #include <asio2/base/impl/condition_event_cp.hpp>
  46. #include <asio2/base/impl/reconnect_timer_cp.hpp>
  47. #include <asio2/base/impl/send_cp.hpp>
  48. #include <asio2/component/rdc/rdc_call_cp.hpp>
  49. #include <asio2/component/socks/socks5_client_cp.hpp>
  50. namespace asio2
  51. {
  52. class client
  53. {
  54. public:
  55. inline constexpr static bool is_session() noexcept { return false; }
  56. inline constexpr static bool is_client () noexcept { return true ; }
  57. inline constexpr static bool is_server () noexcept { return false; }
  58. };
  59. }
  60. namespace asio2::detail
  61. {
  62. ASIO2_CLASS_FORWARD_DECLARE_BASE;
  63. template<class derived_t, class args_t>
  64. class client_impl_t
  65. : public asio2::client
  66. , public object_t <derived_t >
  67. , public iopool_cp <derived_t, args_t>
  68. , public io_context_cp <derived_t, args_t>
  69. , public thread_id_cp <derived_t, args_t>
  70. , public event_queue_cp <derived_t, args_t>
  71. , public user_data_cp <derived_t, args_t>
  72. , public connect_time_cp <derived_t, args_t>
  73. , public alive_time_cp <derived_t, args_t>
  74. , public socket_cp <derived_t, args_t>
  75. , public connect_cp <derived_t, args_t>
  76. , public shutdown_cp <derived_t, args_t>
  77. , public close_cp <derived_t, args_t>
  78. , public disconnect_cp <derived_t, args_t>
  79. , public reconnect_timer_cp <derived_t, args_t>
  80. , public user_timer_cp <derived_t, args_t>
  81. , public connect_timeout_cp <derived_t, args_t>
  82. , public send_cp <derived_t, args_t>
  83. , public post_cp <derived_t, args_t>
  84. , public condition_event_cp <derived_t, args_t>
  85. , public rdc_call_cp <derived_t, args_t>
  86. , public socks5_client_cp <derived_t, args_t>
  87. {
  88. ASIO2_CLASS_FRIEND_DECLARE_BASE;
  89. public:
  90. using super = object_t <derived_t >;
  91. using self = client_impl_t<derived_t, args_t>;
  92. using iopoolcp = iopool_cp <derived_t, args_t>;
  93. using args_type = args_t;
  94. using key_type = std::size_t;
  95. using buffer_type = typename args_t::buffer_t;
  96. using send_cp<derived_t, args_t>::send;
  97. using send_cp<derived_t, args_t>::async_send;
  98. public:
  99. /**
  100. * @brief constructor
  101. * @throws maybe throw exception "Too many open files" (exception code : 24)
  102. * asio::error::no_descriptors - Too many open files
  103. */
  104. template<class ThreadCountOrScheduler, class ...Args>
  105. explicit client_impl_t(
  106. std::size_t init_buf_size,
  107. std::size_t max_buf_size,
  108. ThreadCountOrScheduler&& tcos
  109. )
  110. : super()
  111. , iopool_cp <derived_t, args_t>(std::forward<ThreadCountOrScheduler>(tcos))
  112. , io_context_cp <derived_t, args_t>(iopoolcp::_get_io(0))
  113. , event_queue_cp <derived_t, args_t>()
  114. , user_data_cp <derived_t, args_t>()
  115. , connect_time_cp <derived_t, args_t>()
  116. , alive_time_cp <derived_t, args_t>()
  117. , socket_cp <derived_t, args_t>(iopoolcp::_get_io(0)->context())
  118. , connect_cp <derived_t, args_t>()
  119. , shutdown_cp <derived_t, args_t>()
  120. , close_cp <derived_t, args_t>()
  121. , disconnect_cp <derived_t, args_t>()
  122. , reconnect_timer_cp <derived_t, args_t>()
  123. , user_timer_cp <derived_t, args_t>()
  124. , connect_timeout_cp <derived_t, args_t>()
  125. , send_cp <derived_t, args_t>()
  126. , post_cp <derived_t, args_t>()
  127. , condition_event_cp <derived_t, args_t>()
  128. , rdc_call_cp <derived_t, args_t>()
  129. , rallocator_()
  130. , wallocator_()
  131. , listener_ ()
  132. , buffer_ (init_buf_size, max_buf_size)
  133. {
  134. #if defined(ASIO2_ENABLE_LOG)
  135. #if defined(ASIO2_ALLOCATOR_STORAGE_SIZE)
  136. static_assert(decltype(rallocator_)::storage_size == ASIO2_ALLOCATOR_STORAGE_SIZE);
  137. static_assert(decltype(wallocator_)::storage_size == ASIO2_ALLOCATOR_STORAGE_SIZE);
  138. #endif
  139. #endif
  140. }
  141. /**
  142. * @brief destructor
  143. */
  144. ~client_impl_t()
  145. {
  146. }
  147. /**
  148. * @brief start the client
  149. */
  150. inline bool start() noexcept
  151. {
  152. ASIO2_ASSERT(this->io_->running_in_this_thread());
  153. this->stopped_ = false;
  154. return true;
  155. }
  156. /**
  157. * @brief async start the client
  158. */
  159. inline bool async_start() noexcept
  160. {
  161. ASIO2_ASSERT(this->io_->running_in_this_thread());
  162. this->stopped_ = false;
  163. return true;
  164. }
  165. /**
  166. * @brief stop the client
  167. */
  168. inline void stop()
  169. {
  170. ASIO2_ASSERT(this->io_->running_in_this_thread());
  171. // can't use post, we need ensure when the derived stop is called, the chain
  172. // must be executed completed.
  173. this->derived().dispatch([this]() mutable
  174. {
  175. // close reconnect timer
  176. this->_stop_reconnect_timer();
  177. // close connect timeout timer
  178. this->_stop_connect_timeout_timer();
  179. // close user custom timers
  180. this->_dispatch_stop_all_timers();
  181. // close all posted timed tasks
  182. this->_dispatch_stop_all_timed_events();
  183. // close all async_events
  184. this->notify_all_condition_events();
  185. // can't use push event to close the socket, beacuse when used with websocket,
  186. // the websocket's async_close will be called, and the chain will passed into
  187. // the async_close, but the async_close will cause the chain interrupted, and
  188. // we don't know when the async_close will be completed, if another push event
  189. // was called during async_close executing, then here push event will after
  190. // the another event in the queue.
  191. // clear recv buffer
  192. this->buffer().consume(this->buffer().size());
  193. // destroy user data, maybe the user data is self shared_ptr, if
  194. // don't destroy it, will cause loop reference.
  195. // read/write user data in other thread which is not the io_context
  196. // thread maybe cause crash.
  197. this->user_data_.reset();
  198. // destroy the ecs
  199. this->ecs_.reset();
  200. //
  201. this->reset_life_id();
  202. //
  203. this->stopped_ = true;
  204. });
  205. }
  206. /**
  207. * @brief destroy the content of all member variables, this is used for solve the memory leaks.
  208. * After this function is called, this class object cannot be used again.
  209. */
  210. inline void destroy()
  211. {
  212. derived_t& derive = this->derived();
  213. derive.socket_.reset();
  214. derive.io_.reset();
  215. derive.listener_.clear();
  216. derive.destroy_iopool();
  217. }
  218. /**
  219. * @brief check whether the client is started
  220. */
  221. inline bool is_started() const
  222. {
  223. return (this->state_ == state_t::started && this->socket().is_open());
  224. }
  225. /**
  226. * @brief check whether the client is stopped
  227. */
  228. inline bool is_stopped() const
  229. {
  230. return (this->state_ == state_t::stopped && !this->socket().is_open() && this->stopped_);
  231. }
  232. /**
  233. * @brief get this object hash key
  234. */
  235. inline key_type hash_key() const noexcept
  236. {
  237. return reinterpret_cast<key_type>(this);
  238. }
  239. /**
  240. * @brief get the buffer object reference
  241. */
  242. inline buffer_wrap<buffer_type> & buffer() noexcept { return this->buffer_; }
  243. /**
  244. * @brief set the default remote call timeout for rpc/rdc
  245. */
  246. template<class Rep, class Period>
  247. inline derived_t & set_default_timeout(std::chrono::duration<Rep, Period> duration) noexcept
  248. {
  249. this->rc_timeout_ = duration;
  250. return (this->derived());
  251. }
  252. /**
  253. * @brief get the default remote call timeout for rpc/rdc
  254. */
  255. inline std::chrono::steady_clock::duration get_default_timeout() const noexcept
  256. {
  257. return this->rc_timeout_;
  258. }
  259. protected:
  260. /**
  261. * @brief get the recv/read allocator object reference
  262. */
  263. inline auto & rallocator() noexcept { return this->rallocator_; }
  264. /**
  265. * @brief get the send/write allocator object reference
  266. */
  267. inline auto & wallocator() noexcept { return this->wallocator_; }
  268. inline listener_t & listener() noexcept { return this->listener_; }
  269. inline std::atomic<state_t> & state () noexcept { return this->state_; }
  270. inline const char* life_id () noexcept { return this->life_id_.get(); }
  271. inline void reset_life_id () noexcept { this->life_id_ = std::make_unique<char>(); }
  272. protected:
  273. /// The memory to use for handler-based custom memory allocation. used fo recv/read.
  274. handler_memory<std::true_type , assizer<args_t>> rallocator_;
  275. /// The memory to use for handler-based custom memory allocation. used fo send/write.
  276. handler_memory<std::false_type, assizer<args_t>> wallocator_;
  277. /// listener
  278. listener_t listener_;
  279. /// buffer
  280. buffer_wrap<buffer_type> buffer_;
  281. /// state
  282. std::atomic<state_t> state_ = state_t::stopped;
  283. /// Remote call (rpc/rdc) response timeout.
  284. std::chrono::steady_clock::duration rc_timeout_ = std::chrono::milliseconds(http_execute_timeout);
  285. /// the pointer of ecs_t
  286. std::shared_ptr<ecs_base> ecs_;
  287. /// client has two status:
  288. /// 1. completely stopped.
  289. /// 2. disconnected but not stopped, the timer or other event are running.
  290. bool stopped_ = true;
  291. /// Whether the async_read... is called.
  292. bool reading_ = false;
  293. /// Used to solve this problem:
  294. ///
  295. /// client_ptr->bind_connect([client_ptr]()
  296. /// {
  297. /// client_ptr->async_send(...);
  298. /// });
  299. ///
  300. /// client_ptr->post([client_ptr]()
  301. /// {
  302. /// client_ptr->stop();
  303. /// client_ptr->start(...);
  304. ///
  305. /// client_ptr->stop();
  306. /// client_ptr->start(...);
  307. /// });
  308. ///
  309. /// We wanted is :
  310. ///
  311. /// stop event
  312. /// start event
  313. /// async send event
  314. /// stop event
  315. /// start event
  316. /// async send event
  317. ///
  318. /// but beacuse the async send will push event into the event queue, so the
  319. /// event queue will be like this:
  320. ///
  321. /// stop event
  322. /// start event
  323. /// stop event
  324. /// start event
  325. /// async send event
  326. /// async send event
  327. ///
  328. /// the async send will be called twice for the last time started client.
  329. std::unique_ptr<char> life_id_ = std::make_unique<char>();
  330. #if defined(_DEBUG) || defined(DEBUG)
  331. std::atomic<int> post_send_counter_ = 0;
  332. std::atomic<int> post_recv_counter_ = 0;
  333. #endif
  334. };
  335. }
  336. #include <asio2/base/detail/pop_options.hpp>
  337. #endif // !__ASIO2_CLIENT_HPP__