close_cp.hpp 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. */
  10. #ifndef __ASIO2_CLOSE_COMPONENT_HPP__
  11. #define __ASIO2_CLOSE_COMPONENT_HPP__
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. #pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <memory>
  16. #include <future>
  17. #include <utility>
  18. #include <string_view>
  19. #include <asio2/base/iopool.hpp>
  20. #include <asio2/base/listener.hpp>
  21. #include <asio2/base/impl/event_queue_cp.hpp>
  22. namespace asio2::detail
  23. {
  24. template<class derived_t, class args_t>
  25. class close_cp
  26. {
  27. public:
  28. using self = close_cp<derived_t, args_t>;
  29. public:
  30. /**
  31. * @brief constructor
  32. */
  33. close_cp() noexcept {}
  34. /**
  35. * @brief destructor
  36. */
  37. ~close_cp() = default;
  38. protected:
  39. template<typename DeferEvent>
  40. inline void _do_close(const error_code& ec, std::shared_ptr<derived_t> this_ptr, DeferEvent chain)
  41. {
  42. derived_t& derive = static_cast<derived_t&>(*this);
  43. ASIO2_ASSERT(derive.io_->running_in_this_thread());
  44. ASIO2_LOG_DEBUG("close_cp::_do_close enter: {} {}", ec.value(), ec.message());
  45. // Normally, do close function will be called in the io_context thread, but if some
  46. // exception occured, do close maybe called in the "catch(){ ... }", then this maybe
  47. // not in the io_context thread.
  48. // When the stop() is called for each session in the server's _post_stop, this maybe also
  49. // not in the io_context thread.
  50. // If the session_ptr->stop() is called not in io_context thread, then this will be not in
  51. // the io_context thread.
  52. // If we don't ensure this function is called in the io_context thread, the session status
  53. // maybe stopping in the bind_recv callback.
  54. // use disp event to change the state to avoid this problem:
  55. // 1. when the event queue is not empty, call client stop, then the stop event will pushed
  56. // into the event queue.
  57. // 2. in the io_context thread, the do close is called directly (without event queue),
  58. // it will change the state to stopping.
  59. // 3. when the client _do stop is called, the state is not equal to stopped, it is equal to
  60. // stopping.
  61. derive.disp_event(
  62. [&derive, ec, this_ptr = std::move(this_ptr), e = chain.move_event()]
  63. (event_queue_guard<derived_t> g) mutable
  64. {
  65. set_last_error(ec);
  66. defer_event chain(std::move(e), std::move(g));
  67. ASIO2_LOG_DEBUG("close_cp::_do_close leave: {} {} state={}",
  68. ec.value(), ec.message(), detail::to_string(derive.state_.load()));
  69. state_t expected = state_t::started;
  70. if (derive.state_.compare_exchange_strong(expected, state_t::stopping))
  71. {
  72. return derive._post_close(ec, std::move(this_ptr), expected, std::move(chain));
  73. }
  74. expected = state_t::starting;
  75. if (derive.state_.compare_exchange_strong(expected, state_t::stopping))
  76. {
  77. return derive._post_close(ec, std::move(this_ptr), expected, std::move(chain));
  78. }
  79. asio::post(derive.io_->context(), make_allocator(derive.wallocator(),
  80. [this_ptr = std::move(this_ptr), chain = std::move(chain)]() mutable
  81. {
  82. detail::ignore_unused(this_ptr, chain);
  83. }));
  84. }, chain.move_guard());
  85. }
  86. template<typename DeferEvent, bool IsSession = args_t::is_session>
  87. inline typename std::enable_if_t<!IsSession, void>
  88. _post_close(const error_code& ec, std::shared_ptr<derived_t> this_ptr, state_t old_state, DeferEvent chain)
  89. {
  90. derived_t& derive = static_cast<derived_t&>(*this);
  91. ASIO2_ASSERT(derive.io_->running_in_this_thread());
  92. ASIO2_LOG_DEBUG("close_cp::_post_close: {} {}", ec.value(), ec.message());
  93. // All pending sending events will be cancelled after enter the callback below.
  94. derive.disp_event(
  95. [&derive, ec, this_ptr = std::move(this_ptr), old_state, e = chain.move_event()]
  96. (event_queue_guard<derived_t> g) mutable
  97. {
  98. set_last_error(ec);
  99. defer_event chain(std::move(e), std::move(g));
  100. // When the connection is closed, should we set the state to stopping or stopped?
  101. // If the state is set to stopped, then the user wants to use client.is_stopped() to
  102. // determine whether the client has stopped. The result is inaccurate because the client
  103. // has not stopped completely, such as the timer is still running.
  104. // If the state is set to stopping, the user will fail to reconnect the client using
  105. // client.start(...) in the bind_disconnect callback. because the client.start(...)
  106. // function will detects the value of state and the client.start(...) will only executed
  107. // if the state is stopped.
  108. state_t expected = state_t::stopping;
  109. if (derive.state_.compare_exchange_strong(expected, state_t::stopped))
  110. {
  111. if (old_state == state_t::started)
  112. {
  113. derive._fire_disconnect(this_ptr);
  114. }
  115. }
  116. else
  117. {
  118. ASIO2_ASSERT(false);
  119. }
  120. if (chain.empty())
  121. {
  122. derive._handle_close(ec, this_ptr, defer_event
  123. {
  124. [&derive, this_ptr](event_queue_guard<derived_t> g) mutable
  125. {
  126. // Use disp_event to ensure that reconnection will not executed until
  127. // all events are completed.
  128. derive.disp_event([&derive, this_ptr = std::move(this_ptr)]
  129. (event_queue_guard<derived_t> g) mutable
  130. {
  131. detail::ignore_unused(this_ptr, g);
  132. if (derive.reconnect_enable_)
  133. derive._wake_reconnect_timer();
  134. else
  135. derive._stop_reconnect_timer();
  136. }, std::move(g));
  137. }, chain.move_guard()
  138. });
  139. }
  140. else
  141. {
  142. derive._handle_close(ec, std::move(this_ptr), std::move(chain));
  143. }
  144. // can't call derive._do_stop() here, it will cause the auto reconnect invalid when
  145. // server is closed. so we use the chain to determine whether we should call
  146. // derive._do_stop()
  147. }, chain.move_guard());
  148. }
  149. template<typename DeferEvent, bool IsSession = args_t::is_session>
  150. inline typename std::enable_if_t<IsSession, void>
  151. _post_close(const error_code& ec, std::shared_ptr<derived_t> this_ptr, state_t old_state, DeferEvent chain)
  152. {
  153. derived_t& derive = static_cast<derived_t&>(*this);
  154. ASIO2_ASSERT(derive.io_->running_in_this_thread());
  155. ASIO2_LOG_DEBUG("close_cp::_post_close: {} {}", ec.value(), ec.message());
  156. // close the socket by post a event
  157. // asio don't allow operate the same socket in multi thread,if you close socket
  158. // in one thread and another thread is calling socket's async_... function,it
  159. // will crash.so we must care for operate the socket. when need close the
  160. // socket, we use the context to post a event, make sure the socket's close
  161. // operation is in the same thread.
  162. // First ensure that all send and recv events are not executed again
  163. derive.disp_event(
  164. [&derive, ec, old_state, this_ptr = std::move(this_ptr), e = chain.move_event()]
  165. (event_queue_guard<derived_t> g) mutable
  166. {
  167. // All pending sending events will be cancelled when code run to here.
  168. // We must use the asio::post function to execute the task, otherwise :
  169. // when the server acceptor thread is same as this session thread,
  170. // when the server stop, will call sessions_.for_each -> session_ptr->stop() ->
  171. // derived().disp_event -> sessions_.erase => this can leads to a dead lock
  172. defer_event chain(std::move(e), std::move(g));
  173. asio::post(derive.io_->context(), make_allocator(derive.wallocator(),
  174. [&derive, ec, old_state, this_ptr = std::move(this_ptr), chain = std::move(chain)]
  175. () mutable
  176. {
  177. // Second ensure that this session has removed from the session map.
  178. derive.sessions_.erase(this_ptr,
  179. [&derive, ec, old_state, this_ptr, chain = std::move(chain)]
  180. (bool erased) mutable
  181. {
  182. set_last_error(ec);
  183. state_t expected = state_t::stopping;
  184. if (derive.state_.compare_exchange_strong(expected, state_t::stopped))
  185. {
  186. if (old_state == state_t::started && erased)
  187. derive._fire_disconnect(const_cast<std::shared_ptr<derived_t>&>(this_ptr));
  188. }
  189. else
  190. {
  191. ASIO2_ASSERT(false);
  192. }
  193. // Third we can stop this session and close this socket now.
  194. asio::dispatch(derive.io_->context(), make_allocator(derive.wallocator(),
  195. [&derive, ec, this_ptr = std::move(this_ptr), chain = std::move(chain)]
  196. () mutable
  197. {
  198. // call CRTP polymorphic stop
  199. derive._handle_close(ec, std::move(this_ptr), std::move(chain));
  200. }));
  201. });
  202. }));
  203. }, chain.move_guard());
  204. }
  205. template<typename DeferEvent>
  206. inline void _handle_close(const error_code& ec, std::shared_ptr<derived_t> this_ptr, DeferEvent chain)
  207. {
  208. derived_t& derive = static_cast<derived_t&>(*this);
  209. ASIO2_ASSERT(derive.io_->running_in_this_thread());
  210. ASIO2_LOG_DEBUG("close_cp::_handle_close: {} {}", ec.value(), ec.message());
  211. derive._post_disconnect(ec, std::move(this_ptr), std::move(chain));
  212. }
  213. };
  214. }
  215. #endif // !__ASIO2_CLOSE_COMPONENT_HPP__