session_mgr.hpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. */
  10. #ifndef __ASIO2_SESSION_MGR_HPP__
  11. #define __ASIO2_SESSION_MGR_HPP__
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. #pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <algorithm>
  16. #include <memory>
  17. #include <functional>
  18. #include <unordered_map>
  19. #include <type_traits>
  20. #include <asio2/base/iopool.hpp>
  21. #include <asio2/base/define.hpp>
  22. #include <asio2/base/log.hpp>
  23. #include <asio2/base/detail/allocator.hpp>
  24. #include <asio2/base/detail/util.hpp>
  25. #include <asio2/base/detail/shared_mutex.hpp>
  26. namespace asio2::detail
  27. {
  28. ASIO2_CLASS_FORWARD_DECLARE_BASE;
  29. ASIO2_CLASS_FORWARD_DECLARE_TCP_BASE;
  30. ASIO2_CLASS_FORWARD_DECLARE_TCP_SERVER;
  31. ASIO2_CLASS_FORWARD_DECLARE_TCP_SESSION;
  32. ASIO2_CLASS_FORWARD_DECLARE_UDP_BASE;
  33. ASIO2_CLASS_FORWARD_DECLARE_UDP_SERVER;
  34. ASIO2_CLASS_FORWARD_DECLARE_UDP_SESSION;
  35. /**
  36. * the session manager interface
  37. */
  38. template<class session_t>
  39. class session_mgr_t
  40. {
  41. friend session_t; // C++11
  42. ASIO2_CLASS_FRIEND_DECLARE_BASE;
  43. ASIO2_CLASS_FRIEND_DECLARE_TCP_BASE;
  44. ASIO2_CLASS_FRIEND_DECLARE_TCP_SERVER;
  45. ASIO2_CLASS_FRIEND_DECLARE_TCP_SESSION;
  46. ASIO2_CLASS_FRIEND_DECLARE_UDP_BASE;
  47. ASIO2_CLASS_FRIEND_DECLARE_UDP_SERVER;
  48. ASIO2_CLASS_FRIEND_DECLARE_UDP_SESSION;
  49. protected:
  50. #if defined(_DEBUG) || defined(DEBUG)
  51. class [[maybe_unused]] deadlock_checker_value
  52. {
  53. public:
  54. [[maybe_unused]] static bool& get() noexcept
  55. {
  56. thread_local static bool b = false;
  57. return b;
  58. }
  59. };
  60. struct deadlock_checker_guard
  61. {
  62. deadlock_checker_guard(bool& b) : b_(b)
  63. {
  64. ASIO2_ASSERT(b_ == false);
  65. b_ = true;
  66. }
  67. ~deadlock_checker_guard()
  68. {
  69. ASIO2_ASSERT(b_ == true);
  70. b_ = false;
  71. }
  72. bool& b_;
  73. };
  74. #endif
  75. public:
  76. using self = session_mgr_t<session_t>;
  77. using args_type = typename session_t::args_type;
  78. using key_type = typename session_t::key_type;
  79. /**
  80. * @brief constructor
  81. */
  82. explicit session_mgr_t(std::shared_ptr<io_t> acceptor_io, std::atomic<state_t>& server_state)
  83. : io_ (std::move(acceptor_io))
  84. , state_(server_state)
  85. {
  86. this->sessions_.reserve(64);
  87. }
  88. /**
  89. * @brief destructor
  90. */
  91. ~session_mgr_t() = default;
  92. /**
  93. * @brief emplace the session
  94. * @callback : void(bool inserted);
  95. */
  96. template<class Fun>
  97. inline void emplace(std::shared_ptr<session_t> session_ptr, Fun&& callback)
  98. {
  99. if (!session_ptr)
  100. return;
  101. asio::dispatch(this->io_->context(), make_allocator(this->allocator_,
  102. [this, session_ptr = std::move(session_ptr), callback = std::forward<Fun>(callback)]
  103. () mutable
  104. {
  105. bool inserted = false;
  106. // when run to here, the server state maybe started or stopping or stopped,
  107. // if server state is not started, must can't push the session to the map
  108. // again, and we need disconnect the session directly, otherwise the server
  109. // maybe stopping, and the iopool's wait_iothreas is running in the "sleep"
  110. // this will cause the server.stop() never return;
  111. if (this->state_ == state_t::started)
  112. {
  113. // when code run to here, user maybe call server.stop() at other thread,
  114. // if user do this, at this time, the state_ is not started already( it
  115. // will be stopping ), but beacuase the server's sessions_.for_each ->
  116. // session_ptr->stop(); is execute in the thread 0, and this code is
  117. // executed in thread 0 too, so when code run to here, beacuse we have
  118. // " if (this->state_ == state_t::started) " judgment statement, so the
  119. // server's sessions_.for_each -> session_ptr->stop(); must not be
  120. // executed yet, so even if we put the session in the map here, it will
  121. // not have a problem, beacuse the server's sessions_.for_each ->
  122. // session_ptr->stop(); will be called a later, and this session will be
  123. // stopped at there.
  124. // we use a assert to check the server's sessions_.for_each ->
  125. // session_ptr->stop(); must not be executed yet.
  126. #if defined(_DEBUG) || defined(DEBUG)
  127. ASIO2_ASSERT(is_all_session_stop_called_ == false);
  128. ASIO2_ASSERT(deadlock_checker_value::get() == false);
  129. #endif
  130. // this thread is same as the server's io thread, when code run to here,
  131. // the server's _post_stop must not be executed, so the server's sessions_.for_each
  132. // -> session_ptr->stop() must not be executed.
  133. asio2::unique_locker guard(this->mutex_);
  134. inserted = this->sessions_.try_emplace(session_ptr->hash_key(), session_ptr).second;
  135. #if defined(_DEBUG) || defined(DEBUG)
  136. ASIO2_ASSERT(is_all_session_stop_called_ == false);
  137. #endif
  138. }
  139. (callback)(inserted);
  140. }));
  141. }
  142. /**
  143. * @brief erase the session
  144. * @callback : void(bool erased);
  145. */
  146. template<class Fun>
  147. inline void erase(std::shared_ptr<session_t> session_ptr, Fun&& callback)
  148. {
  149. if (!session_ptr)
  150. return;
  151. asio::dispatch(this->io_->context(), make_allocator(this->allocator_,
  152. [this, session_ptr = std::move(session_ptr), callback = std::forward<Fun>(callback)]
  153. () mutable
  154. {
  155. bool erased = false;
  156. #if defined(_DEBUG) || defined(DEBUG)
  157. ASIO2_ASSERT(deadlock_checker_value::get() == false);
  158. #endif
  159. {
  160. asio2::unique_locker guard(this->mutex_);
  161. erased = (this->sessions_.erase(session_ptr->hash_key()) > 0);
  162. }
  163. (callback)(erased);
  164. }));
  165. }
  166. /**
  167. * @brief Submits a completion token or function object for execution.
  168. * @task : void();
  169. */
  170. template<class Fun>
  171. inline void post(Fun&& task)
  172. {
  173. asio::post(this->io_->context(), make_allocator(this->allocator_, std::forward<Fun>(task)));
  174. }
  175. /**
  176. * @brief Submits a completion token or function object for execution.
  177. * @task : void();
  178. */
  179. template<class Fun>
  180. inline void dispatch(Fun&& task)
  181. {
  182. asio::dispatch(this->io_->context(), make_allocator(this->allocator_, std::forward<Fun>(task)));
  183. }
  184. /**
  185. * @brief call user custom callback function for every session
  186. * the custom callback function is like this :
  187. * void on_callback(std::shared_ptr<tcp_session> & session_ptr)
  188. */
  189. template<class Fun>
  190. inline void for_each(Fun&& fn)
  191. {
  192. // thred safety for each
  193. //
  194. std::vector<std::shared_ptr<session_t>> sessions;
  195. #if defined(_DEBUG) || defined(DEBUG)
  196. ASIO2_ASSERT(deadlock_checker_value::get() == false);
  197. #endif
  198. {
  199. asio2::shared_locker guard(this->mutex_);
  200. sessions.reserve(this->sessions_.size());
  201. for (const auto& [k, session_ptr] : this->sessions_)
  202. {
  203. std::ignore = k;
  204. sessions.emplace_back(session_ptr);
  205. }
  206. }
  207. for (std::shared_ptr<session_t>& session_ptr : sessions)
  208. {
  209. fn(session_ptr);
  210. }
  211. // if the unique locker was called in the callback inner, then will cause deadlock.
  212. // and if the callback is a time-consuming operation, the new session will can't enter.
  213. //
  214. //asio2::shared_locker guard(this->mutex_);
  215. //for (auto& [k, session_ptr] : this->sessions_)
  216. //{
  217. // std::ignore = k;
  218. // fn(session_ptr);
  219. //}
  220. }
  221. /**
  222. * @brief call user custom callback function for every session
  223. * the custom callback function is like this :
  224. * void on_callback(std::shared_ptr<tcp_session> & session_ptr)
  225. */
  226. template<class Fun>
  227. inline void quick_for_each(Fun&& fn)
  228. {
  229. // if the unique locker was called in the callback inner, then will cause deadlock.
  230. // and if the callback is a time-consuming operation, the new session will can't enter.
  231. asio2::shared_locker guard(this->mutex_);
  232. #if defined(_DEBUG) || defined(DEBUG)
  233. [[maybe_unused]] deadlock_checker_guard leg(deadlock_checker_value::get());
  234. #endif
  235. for (auto& [k, session_ptr] : this->sessions_)
  236. {
  237. std::ignore = k;
  238. fn(session_ptr);
  239. }
  240. }
  241. /**
  242. * @brief find the session by map key
  243. */
  244. inline std::shared_ptr<session_t> find(const key_type & key)
  245. {
  246. #if defined(_DEBUG) || defined(DEBUG)
  247. ASIO2_ASSERT(deadlock_checker_value::get() == false);
  248. #endif
  249. asio2::shared_locker guard(this->mutex_);
  250. auto iter = this->sessions_.find(key);
  251. return (iter == this->sessions_.end() ? std::shared_ptr<session_t>() : iter->second);
  252. }
  253. /**
  254. * @brief find the session by user custom role
  255. * bool on_callback(std::shared_ptr<tcp_session> & session_ptr)
  256. */
  257. template<class Fun>
  258. inline std::shared_ptr<session_t> find_if(Fun&& fn)
  259. {
  260. #if defined(_DEBUG) || defined(DEBUG)
  261. ASIO2_ASSERT(deadlock_checker_value::get() == false);
  262. #endif
  263. // if the unique locker was called in the callback inner, then will cause deadlock.
  264. asio2::shared_locker guard(this->mutex_);
  265. auto iter = std::find_if(this->sessions_.begin(), this->sessions_.end(),
  266. [&fn](auto &pair) mutable
  267. {
  268. return fn(pair.second);
  269. });
  270. return (iter == this->sessions_.end() ? std::shared_ptr<session_t>() : iter->second);
  271. }
  272. /**
  273. * @brief get session count
  274. */
  275. inline std::size_t size() const noexcept
  276. {
  277. #if defined(_DEBUG) || defined(DEBUG)
  278. ASIO2_ASSERT(deadlock_checker_value::get() == false);
  279. #endif
  280. asio2::shared_locker guard(this->mutex_);
  281. // is std map.size() thread safety?
  282. //
  283. // https://stackoverflow.com/questions/2170541/what-operations-are-thread-safe-on-stdmap
  284. // https://en.cppreference.com/w/cpp/container
  285. // https://timsong-cpp.github.io/cppwp/n3337/container.requirements.dataraces
  286. // https://stackoverflow.com/questions/15067160/stdmap-thread-safety
  287. // https://stackoverflow.com/questions/14127379/does-const-mean-thread-safe-in-c11
  288. // There is no one consensus:
  289. // The C++11 standard guarantees that const method access to containers is safe from
  290. // different threads (ie, both use const methods).
  291. // It should be thread-safe to call a const function from multiple threads simultaneously,
  292. // without calling a non-const function at the same time in another thread.
  293. // after my test on windows and linux:
  294. // multithread call const function without mutex, and multithread call no-const function
  295. // with mutex at the same time, the result of the const function seems to be no problem.
  296. //#include <unordered_map>
  297. //#include <shared_mutex>
  298. //#include <thread>
  299. //
  300. //int main()
  301. //{
  302. // std::shared_mutex mtx;
  303. // std::unordered_map<int, int> map;
  304. //
  305. // std::srand((unsigned int)std::time(nullptr));
  306. //
  307. // for (std::size_t i = 0; i < std::thread::hardware_concurrency() * 2; i++)
  308. // {
  309. // std::thread([&]() mutable
  310. // {
  311. // for (;;)
  312. // {
  313. // int n = std::rand();
  314. // std::unique_lock g(mtx);
  315. // if (map.size() < 1000)
  316. // map.emplace(n, n);
  317. // else
  318. // std::this_thread::sleep_for(std::chrono::milliseconds(0));
  319. // }
  320. // }).detach();
  321. // }
  322. //
  323. // for (std::size_t i = 0; i < std::thread::hardware_concurrency() * 2; i++)
  324. // {
  325. // std::thread([&]() mutable
  326. // {
  327. // for (;;)
  328. // {
  329. // std::unique_lock g(mtx);
  330. // if (map.size() > 500)
  331. // map.erase(map.begin());
  332. // else
  333. // std::this_thread::sleep_for(std::chrono::milliseconds(0));
  334. // }
  335. // }).detach();
  336. // }
  337. //
  338. // std::this_thread::sleep_for(std::chrono::seconds(5));
  339. //
  340. // for (std::size_t i = 0; i < std::thread::hardware_concurrency() * 2; i++)
  341. // {
  342. // std::thread([&]() mutable
  343. // {
  344. // for (;;)
  345. // {
  346. // int n = int(map.size());
  347. // if (n < 500 || n > 1000 || map.empty())
  348. // {
  349. // printf("error %d\n", n);
  350. // }
  351. // }
  352. // }).detach();
  353. // }
  354. //
  355. // while (std::getchar() != '\n');
  356. //
  357. // return 0;
  358. //}
  359. return this->sessions_.size();
  360. }
  361. /**
  362. * @brief Checks if the session container has no elements
  363. */
  364. inline bool empty() const noexcept
  365. {
  366. #if defined(_DEBUG) || defined(DEBUG)
  367. ASIO2_ASSERT(deadlock_checker_value::get() == false);
  368. #endif
  369. asio2::shared_locker guard(this->mutex_);
  370. return this->sessions_.empty();
  371. }
  372. /**
  373. * @brief get the io object reference
  374. */
  375. inline io_t & io() noexcept
  376. {
  377. return (*this->io_);
  378. }
  379. /**
  380. * @brief get the io object reference
  381. */
  382. inline io_t const& io() const noexcept
  383. {
  384. return (*this->io_);
  385. }
  386. protected:
  387. /// use rwlock to make this session map thread safe
  388. mutable asio2::shared_mutexer mutex_;
  389. /// session unorder map,these session is already connected session
  390. std::unordered_map<key_type, std::shared_ptr<session_t>> sessions_ ASIO2_GUARDED_BY(mutex_);
  391. /// the zero io_context reference in the iopool
  392. std::shared_ptr<io_t> io_;
  393. /// The memory to use for handler-based custom memory allocation.
  394. handler_memory<std::false_type, assizer<args_type>> allocator_;
  395. /// server state reference
  396. std::atomic<state_t> & state_;
  397. #if defined(_DEBUG) || defined(DEBUG)
  398. bool is_all_session_stop_called_ = false;
  399. #endif
  400. };
  401. }
  402. #endif // !__ASIO2_SESSION_MGR_HPP__