mqtts_session.hpp 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. */
  10. #if defined(ASIO2_ENABLE_SSL) || defined(ASIO2_USE_SSL)
  11. #ifndef __ASIO2_MQTTS_SESSION_HPP__
  12. #define __ASIO2_MQTTS_SESSION_HPP__
  13. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  14. #pragma once
  15. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  16. #include <asio2/base/detail/push_options.hpp>
  17. #include <asio2/mqtt/mqtt_session.hpp>
  18. #include <asio2/tcp/impl/ssl_stream_cp.hpp>
  19. namespace asio2::detail
  20. {
  21. struct template_args_mqtts_session : public template_args_mqtt_session
  22. {
  23. using stream_t = asio::ssl::stream<typename template_args_mqtt_session::socket_t&>;
  24. };
  25. ASIO2_CLASS_FORWARD_DECLARE_BASE;
  26. ASIO2_CLASS_FORWARD_DECLARE_TCP_BASE;
  27. ASIO2_CLASS_FORWARD_DECLARE_TCP_SERVER;
  28. ASIO2_CLASS_FORWARD_DECLARE_TCP_SESSION;
  29. template<class derived_t, class args_t = template_args_mqtts_session>
  30. class mqtts_session_impl_t
  31. : public mqtt_session_impl_t<derived_t, args_t>
  32. , public ssl_stream_cp <derived_t, args_t>
  33. {
  34. ASIO2_CLASS_FRIEND_DECLARE_BASE;
  35. ASIO2_CLASS_FRIEND_DECLARE_TCP_BASE;
  36. ASIO2_CLASS_FRIEND_DECLARE_TCP_SERVER;
  37. ASIO2_CLASS_FRIEND_DECLARE_TCP_SESSION;
  38. public:
  39. using super = mqtt_session_impl_t <derived_t, args_t>;
  40. using self = mqtts_session_impl_t<derived_t, args_t>;
  41. using args_type = args_t;
  42. using key_type = std::size_t;
  43. using ssl_stream_comp = ssl_stream_cp<derived_t, args_t>;
  44. using super::send;
  45. using super::async_send;
  46. public:
  47. /**
  48. * @brief constructor
  49. */
  50. explicit mqtts_session_impl_t(
  51. asio::ssl::context & ctx,
  52. mqtt::broker_state<derived_t, args_t>& broker_state,
  53. session_mgr_t <derived_t>& sessions,
  54. listener_t & listener,
  55. std::shared_ptr<io_t> rwio,
  56. std::size_t init_buf_size,
  57. std::size_t max_buf_size
  58. )
  59. : super(broker_state, sessions, listener, std::move(rwio), init_buf_size, max_buf_size)
  60. , ssl_stream_comp(ctx, asio::ssl::stream_base::server)
  61. , ctx_(ctx)
  62. {
  63. }
  64. /**
  65. * @brief destructor
  66. */
  67. ~mqtts_session_impl_t()
  68. {
  69. }
  70. /**
  71. * @brief destroy the content of all member variables, this is used for solve the memory leaks.
  72. * After this function is called, this class object cannot be used again.
  73. */
  74. inline void destroy()
  75. {
  76. derived_t& derive = this->derived();
  77. derive.ssl_stream_.reset();
  78. super::destroy();
  79. }
  80. /**
  81. * @brief get the stream object reference
  82. */
  83. inline typename ssl_stream_comp::ssl_stream_type & stream() noexcept
  84. {
  85. return this->derived().ssl_stream();
  86. }
  87. /**
  88. * @brief get the stream object reference
  89. */
  90. inline typename ssl_stream_comp::ssl_stream_type const& stream() const noexcept
  91. {
  92. return this->derived().ssl_stream();
  93. }
  94. public:
  95. /**
  96. * @brief get this object hash key,used for session map
  97. */
  98. inline key_type hash_key() const noexcept
  99. {
  100. return reinterpret_cast<key_type>(this);
  101. }
  102. protected:
  103. template<typename C>
  104. inline void _do_init(std::shared_ptr<derived_t>& this_ptr, std::shared_ptr<ecs_t<C>>& ecs)
  105. {
  106. super::_do_init(this_ptr, ecs);
  107. this->derived()._ssl_init(ecs, this->derived().socket(), this->ctx_);
  108. }
  109. template<typename C, typename DeferEvent>
  110. inline void _handle_connect(
  111. const error_code& ec,
  112. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
  113. {
  114. detail::ignore_unused(ec);
  115. derived_t& derive = this->derived();
  116. ASIO2_ASSERT(!ec);
  117. ASIO2_ASSERT(derive.sessions_.io_->running_in_this_thread());
  118. asio::dispatch(derive.io_->context(), make_allocator(derive.wallocator(),
  119. [&derive, this_ptr = std::move(this_ptr), ecs = std::move(ecs), chain = std::move(chain)]
  120. () mutable
  121. {
  122. derive._ssl_start(this_ptr, ecs, derive.socket(), derive.ctx_);
  123. derive._post_handshake(std::move(this_ptr), std::move(ecs), std::move(chain));
  124. }));
  125. }
  126. template<typename C, typename DeferEvent>
  127. inline void _handle_handshake(
  128. const error_code& ec,
  129. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
  130. {
  131. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  132. // Use "sessions_.dispatch" to ensure that the _fire_accept function and the _fire_handshake
  133. // function are fired in the same thread
  134. this->sessions_.dispatch(
  135. [this, ec, this_ptr = std::move(this_ptr), ecs = std::move(ecs), chain = std::move(chain)]
  136. () mutable
  137. {
  138. ASIO2_ASSERT(this->derived().sessions_.io_->running_in_this_thread());
  139. set_last_error(ec);
  140. this->derived()._fire_handshake(this_ptr);
  141. if (ec)
  142. {
  143. this->derived()._do_disconnect(ec, std::move(this_ptr), std::move(chain));
  144. return;
  145. }
  146. super::_handle_connect(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  147. });
  148. }
  149. template<typename DeferEvent>
  150. inline void _post_shutdown(const error_code& ec, std::shared_ptr<derived_t> this_ptr, DeferEvent chain)
  151. {
  152. this->derived()._ssl_stop(this_ptr, defer_event
  153. {
  154. [this, ec, this_ptr, e = chain.move_event()](event_queue_guard<derived_t> g) mutable
  155. {
  156. super::_post_shutdown(ec, std::move(this_ptr), defer_event(std::move(e), std::move(g)));
  157. }, chain.move_guard()
  158. });
  159. }
  160. inline void _fire_handshake(std::shared_ptr<derived_t>& this_ptr)
  161. {
  162. // the _fire_handshake must be executed in the thread 0.
  163. ASIO2_ASSERT(this->sessions_.io_->running_in_this_thread());
  164. this->listener_.notify(event_type::handshake, this_ptr);
  165. }
  166. protected:
  167. asio::ssl::context & ctx_;
  168. };
  169. }
  170. namespace asio2
  171. {
  172. using mqtts_session_args = detail::template_args_mqtts_session;
  173. template<class derived_t, class args_t>
  174. using mqtts_session_impl_t = detail::mqtts_session_impl_t<derived_t, args_t>;
  175. template<class derived_t>
  176. class mqtts_session_t : public detail::mqtts_session_impl_t<derived_t, detail::template_args_mqtts_session>
  177. {
  178. public:
  179. using detail::mqtts_session_impl_t<derived_t, detail::template_args_mqtts_session>::mqtts_session_impl_t;
  180. };
  181. class mqtts_session : public mqtts_session_t<mqtts_session>
  182. {
  183. public:
  184. using mqtts_session_t<mqtts_session>::mqtts_session_t;
  185. };
  186. }
  187. #if defined(ASIO2_INCLUDE_RATE_LIMIT)
  188. #include <asio2/tcp/tcp_stream.hpp>
  189. namespace asio2
  190. {
  191. struct mqtts_rate_session_args : public mqtts_session_args
  192. {
  193. using socket_t = asio2::tcp_stream<asio2::simple_rate_policy>;
  194. using stream_t = asio::ssl::stream<socket_t&>;
  195. };
  196. template<class derived_t>
  197. class mqtts_rate_session_t : public asio2::mqtts_session_impl_t<derived_t, mqtts_rate_session_args>
  198. {
  199. public:
  200. using asio2::mqtts_session_impl_t<derived_t, mqtts_rate_session_args>::mqtts_session_impl_t;
  201. };
  202. class mqtts_rate_session : public asio2::mqtts_rate_session_t<mqtts_rate_session>
  203. {
  204. public:
  205. using asio2::mqtts_rate_session_t<mqtts_rate_session>::mqtts_rate_session_t;
  206. };
  207. }
  208. #endif
  209. #include <asio2/base/detail/pop_options.hpp>
  210. #endif // !__ASIO2_MQTTS_SESSION_HPP__
  211. #endif