server.hpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. */
  10. #ifndef __ASIO2_SERVER_HPP__
  11. #define __ASIO2_SERVER_HPP__
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. #pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <asio2/base/detail/push_options.hpp>
  16. #include <cstdint>
  17. #include <memory>
  18. #include <chrono>
  19. #include <atomic>
  20. #include <string>
  21. #include <string_view>
  22. #include <asio2/base/iopool.hpp>
  23. #include <asio2/base/log.hpp>
  24. #include <asio2/base/listener.hpp>
  25. #include <asio2/base/session_mgr.hpp>
  26. #include <asio2/base/define.hpp>
  27. #include <asio2/base/detail/object.hpp>
  28. #include <asio2/base/detail/allocator.hpp>
  29. #include <asio2/base/detail/util.hpp>
  30. #include <asio2/base/detail/buffer_wrap.hpp>
  31. #include <asio2/base/detail/ecs.hpp>
  32. #include <asio2/base/impl/io_context_cp.hpp>
  33. #include <asio2/base/impl/thread_id_cp.hpp>
  34. #include <asio2/base/impl/user_data_cp.hpp>
  35. #include <asio2/base/impl/user_timer_cp.hpp>
  36. #include <asio2/base/impl/post_cp.hpp>
  37. #include <asio2/base/impl/event_queue_cp.hpp>
  38. #include <asio2/base/impl/condition_event_cp.hpp>
  39. namespace asio2
  40. {
  41. class server
  42. {
  43. public:
  44. inline constexpr static bool is_session() noexcept { return false; }
  45. inline constexpr static bool is_client () noexcept { return false; }
  46. inline constexpr static bool is_server () noexcept { return true ; }
  47. };
  48. }
  49. namespace asio2::detail
  50. {
  51. ASIO2_CLASS_FORWARD_DECLARE_BASE;
  52. template<class derived_t, class session_t>
  53. class server_impl_t
  54. : public asio2::server
  55. , public object_t <derived_t>
  56. , public iopool_cp <derived_t>
  57. , public io_context_cp <derived_t>
  58. , public thread_id_cp <derived_t>
  59. , public event_queue_cp <derived_t>
  60. , public user_data_cp <derived_t>
  61. , public user_timer_cp <derived_t>
  62. , public post_cp <derived_t>
  63. , public condition_event_cp<derived_t>
  64. {
  65. ASIO2_CLASS_FRIEND_DECLARE_BASE;
  66. public:
  67. using super = object_t <derived_t>;
  68. using self = server_impl_t<derived_t, session_t>;
  69. using iopoolcp = iopool_cp <derived_t>;
  70. using args_type = typename session_t::args_type;
  71. using key_type = std::size_t;
  72. public:
  73. /**
  74. * @brief constructor
  75. */
  76. template<class ThreadCountOrScheduler>
  77. explicit server_impl_t(ThreadCountOrScheduler&& tcos)
  78. : object_t <derived_t>()
  79. , iopool_cp <derived_t>(std::forward<ThreadCountOrScheduler>(tcos))
  80. , io_context_cp <derived_t>(iopoolcp::_get_io(0))
  81. , thread_id_cp <derived_t>()
  82. , event_queue_cp <derived_t>()
  83. , user_data_cp <derived_t>()
  84. , user_timer_cp <derived_t>()
  85. , post_cp <derived_t>()
  86. , condition_event_cp<derived_t>()
  87. , rallocator_()
  88. , wallocator_()
  89. , listener_ ()
  90. , sessions_ (this->io_, this->state_)
  91. {
  92. }
  93. /**
  94. * @brief destructor
  95. */
  96. ~server_impl_t()
  97. {
  98. }
  99. /**
  100. * @brief start the server
  101. */
  102. inline bool start() noexcept
  103. {
  104. ASIO2_ASSERT(this->io_->running_in_this_thread());
  105. return true;
  106. }
  107. /**
  108. * @brief stop the server
  109. */
  110. inline void stop()
  111. {
  112. ASIO2_ASSERT(this->io_->running_in_this_thread());
  113. // can't use post, we need ensure when the derived stop is called, the chain
  114. // must be executed completed.
  115. this->derived().dispatch([this]() mutable
  116. {
  117. // close user custom timers
  118. this->_dispatch_stop_all_timers();
  119. // close all posted timed tasks
  120. this->_dispatch_stop_all_timed_events();
  121. // close all async_events
  122. this->notify_all_condition_events();
  123. // destroy user data, maybe the user data is self shared_ptr,
  124. // if don't destroy it, will cause loop reference.
  125. // read/write user data in other thread which is not the io_context
  126. // thread maybe cause crash.
  127. this->user_data_.reset();
  128. // destroy the ecs
  129. this->ecs_.reset();
  130. });
  131. }
  132. /**
  133. * @brief destroy the content of all member variables, this is used for solve the memory leaks.
  134. * After this function is called, this class object cannot be used again.
  135. */
  136. inline void destroy()
  137. {
  138. derived_t& derive = this->derived();
  139. derive.io_.reset();
  140. derive.listener_.clear();
  141. derive.destroy_iopool();
  142. }
  143. /**
  144. * @brief check whether the server is started
  145. */
  146. inline bool is_started() const noexcept
  147. {
  148. return (this->state_ == state_t::started);
  149. }
  150. /**
  151. * @brief check whether the server is stopped
  152. */
  153. inline bool is_stopped() const noexcept
  154. {
  155. return (this->state_ == state_t::stopped);
  156. }
  157. /**
  158. * @brief get this object hash key
  159. */
  160. inline key_type hash_key() const noexcept
  161. {
  162. return reinterpret_cast<key_type>(this);
  163. }
  164. /**
  165. * @brief Asynchronous send data for each session
  166. * supporting multi data formats,see asio::buffer(...) in /asio/buffer.hpp
  167. * You can call this function on the communication thread and anywhere,it's multi thread safed.
  168. * PodType * : async_send("abc");
  169. * PodType (&data)[N] : double m[10]; async_send(m);
  170. * std::array<PodType, N> : std::array<int,10> m; async_send(m);
  171. * std::vector<PodType, Allocator> : std::vector<float> m; async_send(m);
  172. * std::basic_string<Elem, Traits, Allocator> : std::string m; async_send(m);
  173. */
  174. template<class T>
  175. inline derived_t & async_send(const T& data)
  176. {
  177. this->sessions_.quick_for_each([&data](std::shared_ptr<session_t>& session_ptr) mutable
  178. {
  179. session_ptr->async_send(data);
  180. });
  181. return this->derived();
  182. }
  183. /**
  184. * @brief Asynchronous send data for each session
  185. * You can call this function on the communication thread and anywhere,it's multi thread safed.
  186. * PodType * : async_send("abc");
  187. */
  188. template<class CharT, class Traits = std::char_traits<CharT>>
  189. inline typename std::enable_if_t<detail::is_char_v<CharT>, derived_t&> async_send(CharT* s)
  190. {
  191. return this->async_send(s, s ? Traits::length(s) : 0);
  192. }
  193. /**
  194. * @brief Asynchronous send data for each session
  195. * You can call this function on the communication thread and anywhere,it's multi thread safed.
  196. * PodType (&data)[N] : double m[10]; async_send(m,5);
  197. */
  198. template<class CharT, class SizeT>
  199. inline typename std::enable_if_t<std::is_integral_v<detail::remove_cvref_t<SizeT>>, derived_t&>
  200. async_send(CharT* s, SizeT count)
  201. {
  202. if (s)
  203. {
  204. this->sessions_.quick_for_each([s, count](std::shared_ptr<session_t>& session_ptr) mutable
  205. {
  206. session_ptr->async_send(s, count);
  207. });
  208. }
  209. return this->derived();
  210. }
  211. public:
  212. /**
  213. * @brief get the acceptor reference, derived classes must override this function
  214. */
  215. inline auto & acceptor() noexcept { return this->derived().acceptor(); }
  216. /**
  217. * @brief get the acceptor reference, derived classes must override this function
  218. */
  219. inline auto const& acceptor() const noexcept { return this->derived().acceptor(); }
  220. /**
  221. * @brief get the listen address, same as get_listen_address
  222. */
  223. inline std::string listen_address() const noexcept
  224. {
  225. return this->get_listen_address();
  226. }
  227. /**
  228. * @brief get the listen address
  229. */
  230. inline std::string get_listen_address() const noexcept
  231. {
  232. try
  233. {
  234. return this->acceptor().local_endpoint().address().to_string();
  235. }
  236. catch (system_error & e) { set_last_error(e); }
  237. return std::string();
  238. }
  239. /**
  240. * @brief get the listen port, same as get_listen_port
  241. */
  242. inline unsigned short listen_port() const noexcept
  243. {
  244. return this->get_listen_port();
  245. }
  246. /**
  247. * @brief get the listen port
  248. */
  249. inline unsigned short get_listen_port() const noexcept
  250. {
  251. return this->acceptor().local_endpoint(get_last_error()).port();
  252. }
  253. /**
  254. * @brief get connected session count, same as get_session_count
  255. */
  256. inline std::size_t session_count() const noexcept { return this->get_session_count(); }
  257. /**
  258. * @brief get connected session count
  259. */
  260. inline std::size_t get_session_count() const noexcept { return this->sessions_.size(); }
  261. /**
  262. * @brief Applies the given function object fn for each session.
  263. * @param fn - The handler to be called for each session.
  264. * Function signature :
  265. * void(std::shared_ptr<asio2::xxx_session>& session_ptr)
  266. */
  267. template<class Fun>
  268. inline derived_t & foreach_session(Fun&& fn)
  269. {
  270. this->sessions_.for_each(std::forward<Fun>(fn));
  271. return this->derived();
  272. }
  273. /**
  274. * @brief find the session by session's hash key
  275. */
  276. template<class KeyType>
  277. inline std::shared_ptr<session_t> find_session(const KeyType& key)
  278. {
  279. return this->sessions_.find(key);
  280. }
  281. /**
  282. * @brief find the session by user custom role
  283. * @param fn - The handler to be called when search the session.
  284. * Function signature :
  285. * bool(std::shared_ptr<asio2::xxx_session>& session_ptr)
  286. * @return std::shared_ptr<asio2::xxx_session>
  287. */
  288. template<class Fun>
  289. inline std::shared_ptr<session_t> find_session_if(Fun&& fn)
  290. {
  291. return std::shared_ptr<session_t>(this->sessions_.find_if(std::forward<Fun>(fn)));
  292. }
  293. protected:
  294. /**
  295. * @brief get the recv/read allocator object reference
  296. */
  297. inline auto & rallocator() noexcept { return this->rallocator_; }
  298. /**
  299. * @brief get the send/write/post allocator object reference
  300. */
  301. inline auto & wallocator() noexcept { return this->wallocator_; }
  302. inline session_mgr_t<session_t> & sessions() noexcept { return this->sessions_; }
  303. inline listener_t & listener() noexcept { return this->listener_; }
  304. inline std::atomic<state_t> & state () noexcept { return this->state_; }
  305. protected:
  306. // The memory to use for handler-based custom memory allocation. used for acceptor.
  307. handler_memory<std::true_type , assizer<args_type>> rallocator_;
  308. /// The memory to use for handler-based custom memory allocation. used fo send/write/post.
  309. handler_memory<std::false_type, assizer<args_type>> wallocator_;
  310. /// listener
  311. listener_t listener_;
  312. /// state
  313. std::atomic<state_t> state_ = state_t::stopped;
  314. /// session_mgr
  315. session_mgr_t<session_t> sessions_;
  316. /// use this to ensure that server stop only after all sessions are closed
  317. std::shared_ptr<void> counter_ptr_;
  318. /// the pointer of ecs_t
  319. std::shared_ptr<ecs_base> ecs_;
  320. #if defined(_DEBUG) || defined(DEBUG)
  321. std::atomic<int> post_send_counter_ = 0;
  322. std::atomic<int> post_recv_counter_ = 0;
  323. #endif
  324. };
  325. }
  326. #include <asio2/base/detail/pop_options.hpp>
  327. #endif // !__ASIO2_SERVER_HPP__