session.hpp 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. */
  10. #ifndef __ASIO2_SESSION_HPP__
  11. #define __ASIO2_SESSION_HPP__
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. #pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <asio2/base/detail/push_options.hpp>
  16. #include <cstdint>
  17. #include <memory>
  18. #include <chrono>
  19. #include <atomic>
  20. #include <string>
  21. #include <string_view>
  22. #include <asio2/base/iopool.hpp>
  23. #include <asio2/base/log.hpp>
  24. #include <asio2/base/listener.hpp>
  25. #include <asio2/base/session_mgr.hpp>
  26. #include <asio2/base/define.hpp>
  27. #include <asio2/base/detail/object.hpp>
  28. #include <asio2/base/detail/allocator.hpp>
  29. #include <asio2/base/detail/util.hpp>
  30. #include <asio2/base/detail/buffer_wrap.hpp>
  31. #include <asio2/base/detail/ecs.hpp>
  32. #include <asio2/base/impl/io_context_cp.hpp>
  33. #include <asio2/base/impl/thread_id_cp.hpp>
  34. #include <asio2/base/impl/connect_time_cp.hpp>
  35. #include <asio2/base/impl/alive_time_cp.hpp>
  36. #include <asio2/base/impl/user_data_cp.hpp>
  37. #include <asio2/base/impl/socket_cp.hpp>
  38. #include <asio2/base/impl/connect_cp.hpp>
  39. #include <asio2/base/impl/shutdown_cp.hpp>
  40. #include <asio2/base/impl/close_cp.hpp>
  41. #include <asio2/base/impl/disconnect_cp.hpp>
  42. #include <asio2/base/impl/user_timer_cp.hpp>
  43. #include <asio2/base/impl/silence_timer_cp.hpp>
  44. #include <asio2/base/impl/post_cp.hpp>
  45. #include <asio2/base/impl/connect_timeout_cp.hpp>
  46. #include <asio2/base/impl/event_queue_cp.hpp>
  47. #include <asio2/base/impl/condition_event_cp.hpp>
  48. #include <asio2/base/impl/send_cp.hpp>
  49. #include <asio2/component/rdc/rdc_call_cp.hpp>
  50. namespace asio2
  51. {
  52. class session
  53. {
  54. public:
  55. inline constexpr static bool is_session() noexcept { return true ; }
  56. inline constexpr static bool is_client () noexcept { return false; }
  57. inline constexpr static bool is_server () noexcept { return false; }
  58. };
  59. }
  60. namespace asio2::detail
  61. {
  62. ASIO2_CLASS_FORWARD_DECLARE_BASE;
  63. template<class derived_t, class args_t>
  64. class session_impl_t
  65. : public asio2::session
  66. , public object_t <derived_t >
  67. , public thread_id_cp <derived_t, args_t>
  68. , public io_context_cp <derived_t, args_t>
  69. , public event_queue_cp <derived_t, args_t>
  70. , public user_data_cp <derived_t, args_t>
  71. , public connect_time_cp <derived_t, args_t>
  72. , public alive_time_cp <derived_t, args_t>
  73. , public socket_cp <derived_t, args_t>
  74. , public connect_cp <derived_t, args_t>
  75. , public shutdown_cp <derived_t, args_t>
  76. , public close_cp <derived_t, args_t>
  77. , public disconnect_cp <derived_t, args_t>
  78. , public user_timer_cp <derived_t, args_t>
  79. , public silence_timer_cp <derived_t, args_t>
  80. , public connect_timeout_cp <derived_t, args_t>
  81. , public send_cp <derived_t, args_t>
  82. , public post_cp <derived_t, args_t>
  83. , public condition_event_cp <derived_t, args_t>
  84. , public rdc_call_cp <derived_t, args_t>
  85. {
  86. ASIO2_CLASS_FRIEND_DECLARE_BASE;
  87. public:
  88. using super = object_t <derived_t >;
  89. using self = session_impl_t<derived_t, args_t>;
  90. using args_type = args_t;
  91. using buffer_type = typename args_t::buffer_t;
  92. using send_cp<derived_t, args_t>::send;
  93. using send_cp<derived_t, args_t>::async_send;
  94. public:
  95. /**
  96. * @brief constructor
  97. * @throws maybe throw exception "Too many open files" (exception code : 24)
  98. * asio::error::no_descriptors - Too many open files
  99. */
  100. template<class ...Args>
  101. explicit session_impl_t(
  102. session_mgr_t<derived_t> & sessions,
  103. listener_t & listener,
  104. std::shared_ptr<io_t> rwio,
  105. std::size_t init_buf_size,
  106. std::size_t max_buf_size,
  107. Args&&... args
  108. )
  109. : super()
  110. , thread_id_cp <derived_t, args_t>()
  111. , io_context_cp <derived_t, args_t>(std::move(rwio))
  112. , event_queue_cp <derived_t, args_t>()
  113. , user_data_cp <derived_t, args_t>()
  114. , connect_time_cp <derived_t, args_t>()
  115. , alive_time_cp <derived_t, args_t>()
  116. , socket_cp <derived_t, args_t>(std::forward<Args>(args)...)
  117. , connect_cp <derived_t, args_t>()
  118. , shutdown_cp <derived_t, args_t>()
  119. , close_cp <derived_t, args_t>()
  120. , disconnect_cp <derived_t, args_t>()
  121. , user_timer_cp <derived_t, args_t>()
  122. , silence_timer_cp <derived_t, args_t>()
  123. , connect_timeout_cp <derived_t, args_t>()
  124. , send_cp <derived_t, args_t>()
  125. , post_cp <derived_t, args_t>()
  126. , condition_event_cp <derived_t, args_t>()
  127. , rdc_call_cp <derived_t, args_t>()
  128. , sessions_(sessions)
  129. , listener_(listener)
  130. , buffer_ (init_buf_size, max_buf_size)
  131. {
  132. }
  133. /**
  134. * @brief destructor
  135. */
  136. ~session_impl_t()
  137. {
  138. }
  139. protected:
  140. /**
  141. * @brief start session
  142. */
  143. inline void start()
  144. {
  145. this->derived().dispatch([this]() mutable
  146. {
  147. // init the running thread id
  148. this->derived().io_->init_thread_id();
  149. // start the timer of check connect timeout
  150. this->derived()._make_connect_timeout_timer(
  151. this->derived().selfptr(), this->connect_timeout_);
  152. });
  153. }
  154. public:
  155. /**
  156. * @brief stop session
  157. * note : this function must be noblocking,if it's blocking,
  158. * will cause circle lock in session_mgr::stop function
  159. */
  160. inline void stop()
  161. {
  162. ASIO2_ASSERT(this->io_->running_in_this_thread());
  163. // can't use post, we need ensure when the derived stop is called, the chain
  164. // must be executed completed.
  165. this->derived().dispatch([this]() mutable
  166. {
  167. // close silence timer
  168. this->_stop_silence_timer();
  169. // close connect timeout timer
  170. this->_stop_connect_timeout_timer();
  171. // close user custom timers
  172. this->_dispatch_stop_all_timers();
  173. // close all posted timed tasks
  174. this->_dispatch_stop_all_timed_events();
  175. // close all async_events
  176. this->notify_all_condition_events();
  177. // clear recv buffer
  178. this->buffer().consume(this->buffer().size());
  179. // destroy user data, maybe the user data is self shared_ptr,
  180. // if don't destroy it, will cause loop reference.
  181. // read/write user data in other thread which is not the io_context
  182. // thread maybe cause crash.
  183. this->user_data_.reset();
  184. // destroy the ecs, the user maybe saved the session ptr in the match role init
  185. // function, so we must destroy ecs, otherwise the server will can't be exited
  186. // forever.
  187. this->ecs_.reset();
  188. //
  189. this->reset_life_id();
  190. //
  191. this->counter_ptr_.reset();
  192. ASIO2_ASSERT(this->sessions_.find(this->derived().hash_key()) == nullptr);
  193. });
  194. }
  195. /**
  196. * @brief destroy the content of all member variables, this is used for solve the memory leaks.
  197. * After this function is called, this class object cannot be used again.
  198. */
  199. inline void destroy()
  200. {
  201. derived_t& derive = this->derived();
  202. derive.socket_.reset();
  203. derive.io_.reset();
  204. }
  205. /**
  206. * @brief check whether the session is started
  207. */
  208. inline bool is_started() const
  209. {
  210. return (this->state_ == state_t::started && this->socket().is_open());
  211. }
  212. /**
  213. * @brief check whether the session is stopped
  214. */
  215. inline bool is_stopped() const
  216. {
  217. return (this->state_ == state_t::stopped && !this->socket().is_open());
  218. }
  219. /**
  220. * @brief get the buffer object reference
  221. */
  222. inline buffer_wrap<buffer_type> & buffer() noexcept
  223. {
  224. return this->buffer_;
  225. }
  226. /**
  227. * @brief set the default remote call timeout for rpc/rdc
  228. */
  229. template<class Rep, class Period>
  230. inline derived_t & set_default_timeout(std::chrono::duration<Rep, Period> duration) noexcept
  231. {
  232. this->rc_timeout_ = duration;
  233. return (this->derived());
  234. }
  235. /**
  236. * @brief get the default remote call timeout for rpc/rdc
  237. */
  238. inline std::chrono::steady_clock::duration get_default_timeout() const noexcept
  239. {
  240. return this->rc_timeout_;
  241. }
  242. protected:
  243. inline session_mgr_t<derived_t> & sessions() noexcept { return this->sessions_; }
  244. inline listener_t & listener() noexcept { return this->listener_; }
  245. inline std::atomic<state_t> & state () noexcept { return this->state_; }
  246. inline constexpr bool life_id () noexcept { return true; }
  247. inline constexpr void reset_life_id () noexcept { }
  248. protected:
  249. /// asio::strand ,used to ensure socket multi thread safe,we must ensure that only one operator
  250. /// can operate the same socket at the same time,and strand can enuser that the event will
  251. /// be processed in the order of post, eg : strand.post(1);strand.post(2); the 2 will processed
  252. /// certaion after the 1,if 1 is block,the 2 won't be processed,util the 1 is processed completed
  253. /// more details see : http://bbs.csdn.net/topics/390931471
  254. /// session_mgr
  255. session_mgr_t<derived_t> & sessions_;
  256. /// listener
  257. listener_t & listener_;
  258. /// buffer
  259. buffer_wrap<buffer_type> buffer_;
  260. /// use to check whether the user call stop in the listener
  261. std::atomic<state_t> state_ = state_t::stopped;
  262. /// use this to ensure that server stop only after all sessions are closed
  263. std::shared_ptr<void> counter_ptr_;
  264. /// Remote call (rpc/rdc) response timeout.
  265. std::chrono::steady_clock::duration rc_timeout_ = std::chrono::milliseconds(http_execute_timeout);
  266. /// the pointer of ecs_t
  267. std::shared_ptr<ecs_base> ecs_;
  268. /// Whether the async_read... is called.
  269. bool reading_ = false;
  270. #if defined(_DEBUG) || defined(DEBUG)
  271. std::atomic<int> post_send_counter_ = 0;
  272. std::atomic<int> post_recv_counter_ = 0;
  273. #endif
  274. };
  275. }
  276. #include <asio2/base/detail/pop_options.hpp>
  277. #endif // !__ASIO2_SESSION_HPP__