ws_session.hpp 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. */
  10. #ifndef __ASIO2_WS_SESSION_HPP__
  11. #define __ASIO2_WS_SESSION_HPP__
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. #pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <asio2/base/detail/push_options.hpp>
  16. #include <asio2/tcp/tcp_session.hpp>
  17. #include <asio2/http/impl/ws_stream_cp.hpp>
  18. #include <asio2/http/impl/ws_send_op.hpp>
  19. #include <asio2/http/request.hpp>
  20. #include <asio2/http/response.hpp>
  21. namespace asio2::detail
  22. {
  23. struct template_args_ws_session : public template_args_tcp_session
  24. {
  25. using stream_t = websocket::stream<typename template_args_tcp_session::socket_t&>;
  26. using body_t = http::string_body;
  27. using buffer_t = beast::flat_buffer;
  28. };
  29. ASIO2_CLASS_FORWARD_DECLARE_BASE;
  30. ASIO2_CLASS_FORWARD_DECLARE_TCP_BASE;
  31. ASIO2_CLASS_FORWARD_DECLARE_TCP_SERVER;
  32. ASIO2_CLASS_FORWARD_DECLARE_TCP_SESSION;
  33. template<class derived_t, class args_t = template_args_ws_session>
  34. class ws_session_impl_t
  35. : public tcp_session_impl_t<derived_t, args_t>
  36. , public ws_stream_cp <derived_t, args_t>
  37. , public ws_send_op <derived_t, args_t>
  38. {
  39. ASIO2_CLASS_FRIEND_DECLARE_BASE;
  40. ASIO2_CLASS_FRIEND_DECLARE_TCP_BASE;
  41. ASIO2_CLASS_FRIEND_DECLARE_TCP_SERVER;
  42. ASIO2_CLASS_FRIEND_DECLARE_TCP_SESSION;
  43. public:
  44. using super = tcp_session_impl_t<derived_t, args_t>;
  45. using self = ws_session_impl_t <derived_t, args_t>;
  46. using args_type = args_t;
  47. using key_type = std::size_t;
  48. using body_type = typename args_t::body_t;
  49. using buffer_type = typename args_t::buffer_t;
  50. using ws_stream_comp = ws_stream_cp<derived_t, args_t>;
  51. using super::send;
  52. using super::async_send;
  53. public:
  54. /**
  55. * @brief constructor
  56. */
  57. explicit ws_session_impl_t(
  58. session_mgr_t<derived_t> & sessions,
  59. listener_t & listener,
  60. std::shared_ptr<io_t> rwio,
  61. std::size_t init_buf_size,
  62. std::size_t max_buf_size
  63. )
  64. : super(sessions, listener, std::move(rwio), init_buf_size, max_buf_size)
  65. , ws_stream_cp<derived_t, args_t>()
  66. , ws_send_op <derived_t, args_t>()
  67. {
  68. }
  69. /**
  70. * @brief destructor
  71. */
  72. ~ws_session_impl_t()
  73. {
  74. }
  75. /**
  76. * @brief destroy the content of all member variables, this is used for solve the memory leaks.
  77. * After this function is called, this class object cannot be used again.
  78. */
  79. inline void destroy()
  80. {
  81. derived_t& derive = this->derived();
  82. derive.ws_stream_.reset();
  83. super::destroy();
  84. }
  85. /**
  86. * @brief return the websocket stream object reference
  87. */
  88. inline typename args_t::stream_t & stream() noexcept
  89. {
  90. return this->derived().ws_stream();
  91. }
  92. /**
  93. * @brief return the websocket stream object reference
  94. */
  95. inline typename args_t::stream_t const& stream() const noexcept
  96. {
  97. return this->derived().ws_stream();
  98. }
  99. public:
  100. /**
  101. * @brief get this object hash key,used for session map
  102. */
  103. inline key_type hash_key() const noexcept
  104. {
  105. return reinterpret_cast<key_type>(this);
  106. }
  107. /**
  108. * @brief get the websocket upgraged request object
  109. */
  110. inline websocket::request_type& get_upgrade_request() noexcept { return this->upgrade_req_; }
  111. /**
  112. * @brief get the websocket upgraged request object
  113. */
  114. inline const websocket::request_type& get_upgrade_request() const noexcept { return this->upgrade_req_; }
  115. protected:
  116. inline typename super::socket_type& upgrade_stream() noexcept
  117. {
  118. return this->socket();
  119. }
  120. inline typename super::socket_type const& upgrade_stream() const noexcept
  121. {
  122. return this->socket();
  123. }
  124. template<typename C>
  125. inline void _do_init(std::shared_ptr<derived_t>& this_ptr, std::shared_ptr<ecs_t<C>>& ecs)
  126. {
  127. super::_do_init(this_ptr, ecs);
  128. this->derived()._ws_init(ecs, this->derived().socket());
  129. }
  130. template<typename DeferEvent>
  131. inline void _post_shutdown(const error_code& ec, std::shared_ptr<derived_t> this_ptr, DeferEvent chain)
  132. {
  133. ASIO2_LOG_DEBUG("ws_session::_post_shutdown: {} {}", ec.value(), ec.message());
  134. this->derived()._ws_stop(this_ptr, defer_event
  135. {
  136. [this, ec, this_ptr, e = chain.move_event()] (event_queue_guard<derived_t> g) mutable
  137. {
  138. super::_post_shutdown(ec, std::move(this_ptr), defer_event(std::move(e), std::move(g)));
  139. }, chain.move_guard()
  140. });
  141. }
  142. template<typename C, typename DeferEvent>
  143. inline void _handle_connect(
  144. const error_code& ec,
  145. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
  146. {
  147. detail::ignore_unused(ec);
  148. derived_t& derive = this->derived();
  149. ASIO2_ASSERT(!ec);
  150. ASIO2_ASSERT(derive.sessions_.io_->running_in_this_thread());
  151. asio::dispatch(this->io_->context(), make_allocator(this->wallocator_,
  152. [&derive, this_ptr = std::move(this_ptr), ecs = std::move(ecs), chain = std::move(chain)]
  153. () mutable
  154. {
  155. derive._ws_start(this_ptr, ecs, derive.socket());
  156. derive._post_read_upgrade_request(std::move(this_ptr), std::move(ecs), std::move(chain));
  157. }));
  158. }
  159. template<class Data, class Callback>
  160. inline bool _do_send(Data& data, Callback&& callback)
  161. {
  162. return this->derived()._ws_send(data, std::forward<Callback>(callback));
  163. }
  164. protected:
  165. template<typename C>
  166. inline void _post_recv(std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  167. {
  168. this->derived()._ws_post_recv(std::move(this_ptr), std::move(ecs));
  169. }
  170. template<typename C>
  171. inline void _handle_recv(
  172. const error_code& ec, std::size_t bytes_recvd,
  173. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  174. {
  175. this->derived()._ws_handle_recv(ec, bytes_recvd, std::move(this_ptr), std::move(ecs));
  176. }
  177. inline void _fire_upgrade(std::shared_ptr<derived_t>& this_ptr)
  178. {
  179. // the _fire_upgrade must be executed in the thread 0.
  180. ASIO2_ASSERT(this->sessions_.io_->running_in_this_thread());
  181. this->listener_.notify(event_type::upgrade, this_ptr);
  182. }
  183. protected:
  184. websocket::request_type upgrade_req_;
  185. };
  186. }
  187. namespace asio2
  188. {
  189. using ws_session_args = detail::template_args_ws_session;
  190. template<class derived_t, class args_t>
  191. using ws_session_impl_t = detail::ws_session_impl_t<derived_t, args_t>;
  192. template<class derived_t>
  193. class ws_session_t : public detail::ws_session_impl_t<derived_t, detail::template_args_ws_session>
  194. {
  195. public:
  196. using detail::ws_session_impl_t<derived_t, detail::template_args_ws_session>::ws_session_impl_t;
  197. };
  198. class ws_session : public ws_session_t<ws_session>
  199. {
  200. public:
  201. using ws_session_t<ws_session>::ws_session_t;
  202. };
  203. }
  204. #if defined(ASIO2_INCLUDE_RATE_LIMIT)
  205. #include <asio2/tcp/tcp_stream.hpp>
  206. namespace asio2
  207. {
  208. struct ws_rate_session_args : public ws_session_args
  209. {
  210. using socket_t = asio2::tcp_stream<asio2::simple_rate_policy>;
  211. using stream_t = websocket::stream<socket_t&>;
  212. };
  213. template<class derived_t>
  214. class ws_rate_session_t : public asio2::ws_session_impl_t<derived_t, ws_rate_session_args>
  215. {
  216. public:
  217. using asio2::ws_session_impl_t<derived_t, ws_rate_session_args>::ws_session_impl_t;
  218. };
  219. class ws_rate_session : public asio2::ws_rate_session_t<ws_rate_session>
  220. {
  221. public:
  222. using asio2::ws_rate_session_t<ws_rate_session>::ws_rate_session_t;
  223. };
  224. }
  225. #endif
  226. #include <asio2/base/detail/pop_options.hpp>
  227. #endif // !__ASIO2_WS_SESSION_HPP__