mqtts_client.hpp 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. */
  10. #if defined(ASIO2_ENABLE_SSL) || defined(ASIO2_USE_SSL)
  11. #ifndef __ASIO2_MQTTS_CLIENT_HPP__
  12. #define __ASIO2_MQTTS_CLIENT_HPP__
  13. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  14. #pragma once
  15. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  16. #include <asio2/base/detail/push_options.hpp>
  17. #include <asio2/mqtt/mqtt_client.hpp>
  18. #include <asio2/tcp/impl/ssl_stream_cp.hpp>
  19. #include <asio2/tcp/impl/ssl_context_cp.hpp>
  20. namespace asio2::detail
  21. {
  22. ASIO2_CLASS_FORWARD_DECLARE_BASE;
  23. ASIO2_CLASS_FORWARD_DECLARE_TCP_BASE;
  24. ASIO2_CLASS_FORWARD_DECLARE_TCP_CLIENT;
  25. template<class derived_t, class args_t = template_args_mqtt_client>
  26. class mqtts_client_impl_t
  27. : public ssl_context_cp <derived_t, args_t>
  28. , public mqtt_client_impl_t<derived_t, args_t>
  29. , public ssl_stream_cp <derived_t, args_t>
  30. {
  31. ASIO2_CLASS_FRIEND_DECLARE_BASE;
  32. ASIO2_CLASS_FRIEND_DECLARE_TCP_BASE;
  33. ASIO2_CLASS_FRIEND_DECLARE_TCP_CLIENT;
  34. public:
  35. using super = mqtt_client_impl_t <derived_t, args_t>;
  36. using self = mqtts_client_impl_t<derived_t, args_t>;
  37. using args_type = args_t;
  38. using buffer_type = typename args_t::buffer_t;
  39. using ssl_context_comp = ssl_context_cp<derived_t, args_t>;
  40. using ssl_stream_comp = ssl_stream_cp <derived_t, args_t>;
  41. using super::send;
  42. using super::async_send;
  43. public:
  44. /**
  45. * @brief constructor
  46. */
  47. template<class... Args>
  48. explicit mqtts_client_impl_t(
  49. asio::ssl::context::method method,
  50. Args&&... args
  51. )
  52. : ssl_context_comp(method)
  53. , super(std::forward<Args>(args)...)
  54. , ssl_stream_comp(*this, asio::ssl::stream_base::client)
  55. {
  56. }
  57. /**
  58. * @brief constructor
  59. */
  60. template<class... Args>
  61. explicit mqtts_client_impl_t(Args&&... args)
  62. : ssl_context_comp(ASIO2_DEFAULT_SSL_METHOD)
  63. , super(std::forward<Args>(args)...)
  64. , ssl_stream_comp(*this, asio::ssl::stream_base::client)
  65. {
  66. }
  67. /**
  68. * @brief constructor
  69. */
  70. explicit mqtts_client_impl_t()
  71. : ssl_context_comp(ASIO2_DEFAULT_SSL_METHOD)
  72. , super()
  73. , ssl_stream_comp(*this, asio::ssl::stream_base::client)
  74. {
  75. }
  76. /**
  77. * @brief destructor
  78. */
  79. ~mqtts_client_impl_t()
  80. {
  81. this->stop();
  82. }
  83. /**
  84. * @brief destroy the content of all member variables, this is used for solve the memory leaks.
  85. * After this function is called, this class object cannot be used again.
  86. */
  87. inline void destroy()
  88. {
  89. derived_t& derive = this->derived();
  90. derive.ssl_stream_.reset();
  91. super::destroy();
  92. }
  93. /**
  94. * @brief get the stream object reference
  95. *
  96. */
  97. inline typename ssl_stream_comp::ssl_stream_type& stream() noexcept
  98. {
  99. return this->derived().ssl_stream();
  100. }
  101. /**
  102. * @brief get the stream object reference
  103. *
  104. */
  105. inline typename ssl_stream_comp::ssl_stream_type const& stream() const noexcept
  106. {
  107. return this->derived().ssl_stream();
  108. }
  109. public:
  110. /**
  111. * @brief bind ssl handshake listener
  112. * @param fun - a user defined callback function.
  113. * @param obj - a pointer or reference to a class object, this parameter can be none.
  114. * if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  115. * the class object's pointer or reference.
  116. * Function signature : void()
  117. */
  118. template<class F, class ...C>
  119. inline derived_t & bind_handshake(F&& fun, C&&... obj)
  120. {
  121. this->listener_.bind(event_type::handshake,
  122. observer_t<>(std::forward<F>(fun), std::forward<C>(obj)...));
  123. return (this->derived());
  124. }
  125. protected:
  126. template<typename C>
  127. inline void _do_init(std::shared_ptr<ecs_t<C>>& ecs)
  128. {
  129. super::_do_init(ecs);
  130. this->derived()._ssl_init(ecs, this->derived().socket(), *this);
  131. }
  132. template<typename DeferEvent>
  133. inline void _post_shutdown(const error_code& ec, std::shared_ptr<derived_t> this_ptr, DeferEvent chain)
  134. {
  135. ASIO2_LOG_DEBUG("mqtts_client::_post_shutdown: {} {}", ec.value(), ec.message());
  136. this->derived()._ssl_stop(this_ptr, defer_event
  137. {
  138. [this, ec, this_ptr, e = chain.move_event()] (event_queue_guard<derived_t> g) mutable
  139. {
  140. super::_post_shutdown(ec, std::move(this_ptr), defer_event(std::move(e), std::move(g)));
  141. }, chain.move_guard()
  142. });
  143. }
  144. template<typename C, typename DeferEvent>
  145. inline void _handle_connect(
  146. const error_code& ec,
  147. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
  148. {
  149. set_last_error(ec);
  150. derived_t& derive = this->derived();
  151. if (ec)
  152. {
  153. return derive._done_connect(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  154. }
  155. derive._ssl_start(this_ptr, ecs, derive.socket(), *this);
  156. derive._post_handshake(std::move(this_ptr), std::move(ecs), std::move(chain));
  157. }
  158. template<typename C, typename DeferEvent>
  159. inline void _handle_handshake(
  160. const error_code& ec,
  161. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
  162. {
  163. set_last_error(ec);
  164. derived_t& derive = this->derived();
  165. derive._fire_handshake(this_ptr);
  166. if (ec)
  167. {
  168. return derive._done_connect(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  169. }
  170. super::_handle_connect(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  171. }
  172. inline void _fire_handshake(std::shared_ptr<derived_t>& this_ptr)
  173. {
  174. // the _fire_handshake must be executed in the thread 0.
  175. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  176. detail::ignore_unused(this_ptr);
  177. this->listener_.notify(event_type::handshake);
  178. }
  179. };
  180. }
  181. namespace asio2
  182. {
  183. using mqtts_client_args = detail::template_args_mqtt_client;
  184. template<class derived_t, class args_t>
  185. using mqtts_client_impl_t = detail::mqtts_client_impl_t<derived_t, args_t>;
  186. /**
  187. * @brief ssl mqtt client
  188. * @throws constructor maybe throw exception "Too many open files" (exception code : 24)
  189. * asio::error::no_descriptors - Too many open files
  190. */
  191. template<class derived_t>
  192. class mqtts_client_t : public detail::mqtts_client_impl_t<derived_t, detail::template_args_mqtt_client>
  193. {
  194. public:
  195. using detail::mqtts_client_impl_t<derived_t, detail::template_args_mqtt_client>::mqtts_client_impl_t;
  196. };
  197. /**
  198. * @brief ssl mqtt client
  199. * @throws constructor maybe throw exception "Too many open files" (exception code : 24)
  200. * asio::error::no_descriptors - Too many open files
  201. */
  202. class mqtts_client : public mqtts_client_t<mqtts_client>
  203. {
  204. public:
  205. using mqtts_client_t<mqtts_client>::mqtts_client_t;
  206. };
  207. }
  208. #if defined(ASIO2_INCLUDE_RATE_LIMIT)
  209. #include <asio2/tcp/tcp_stream.hpp>
  210. namespace asio2
  211. {
  212. struct mqtts_rate_client_args : public mqtts_client_args
  213. {
  214. using socket_t = asio2::tcp_stream<asio2::simple_rate_policy>;
  215. };
  216. template<class derived_t>
  217. class mqtts_rate_client_t : public asio2::mqtts_client_impl_t<derived_t, mqtts_rate_client_args>
  218. {
  219. public:
  220. using asio2::mqtts_client_impl_t<derived_t, mqtts_rate_client_args>::mqtts_client_impl_t;
  221. };
  222. class mqtts_rate_client : public asio2::mqtts_rate_client_t<mqtts_rate_client>
  223. {
  224. public:
  225. using asio2::mqtts_rate_client_t<mqtts_rate_client>::mqtts_rate_client_t;
  226. };
  227. }
  228. #endif
  229. #include <asio2/base/detail/pop_options.hpp>
  230. #endif // !__ASIO2_MQTTS_CLIENT_HPP__
  231. #endif