mqtt_server.hpp 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. */
  10. #ifndef __ASIO2_MQTT_SERVER_HPP__
  11. #define __ASIO2_MQTT_SERVER_HPP__
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. #pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <asio2/base/detail/push_options.hpp>
  16. #include <asio2/tcp/tcp_server.hpp>
  17. #include <asio2/mqtt/mqtt_session.hpp>
  18. namespace asio2::detail
  19. {
  20. ASIO2_CLASS_FORWARD_DECLARE_BASE;
  21. ASIO2_CLASS_FORWARD_DECLARE_TCP_BASE;
  22. ASIO2_CLASS_FORWARD_DECLARE_TCP_SERVER;
  23. template<class derived_t, class session_t>
  24. class mqtt_server_impl_t
  25. : public tcp_server_impl_t<derived_t, session_t>
  26. , public mqtt_options
  27. , public mqtt_invoker_t <session_t, typename session_t::args_type>
  28. {
  29. ASIO2_CLASS_FRIEND_DECLARE_BASE;
  30. ASIO2_CLASS_FRIEND_DECLARE_TCP_BASE;
  31. ASIO2_CLASS_FRIEND_DECLARE_TCP_SERVER;
  32. public:
  33. using super = tcp_server_impl_t <derived_t, session_t>;
  34. using self = mqtt_server_impl_t<derived_t, session_t>;
  35. using session_type = session_t;
  36. using super::async_send;
  37. public:
  38. /**
  39. * @brief constructor
  40. */
  41. explicit mqtt_server_impl_t(
  42. std::size_t init_buf_size = tcp_frame_size,
  43. std::size_t max_buf_size = mqtt::max_payload,
  44. std::size_t concurrency = default_concurrency() + 1 // The 1 is used for tcp acceptor
  45. )
  46. : super(init_buf_size, max_buf_size, concurrency)
  47. , mqtt_options()
  48. , mqtt_invoker_t<session_t, typename session_t::args_type>()
  49. , broker_state_(*this, *this)
  50. {
  51. }
  52. template<class Scheduler, std::enable_if_t<!std::is_integral_v<detail::remove_cvref_t<Scheduler>>, int> = 0>
  53. explicit mqtt_server_impl_t(
  54. std::size_t init_buf_size,
  55. std::size_t max_buf_size,
  56. Scheduler&& scheduler
  57. )
  58. : super(init_buf_size, max_buf_size, std::forward<Scheduler>(scheduler))
  59. , mqtt_options()
  60. , mqtt_invoker_t<session_t, typename session_t::args_type>()
  61. , broker_state_(*this, *this)
  62. {
  63. }
  64. template<class Scheduler, std::enable_if_t<!std::is_integral_v<detail::remove_cvref_t<Scheduler>>, int> = 0>
  65. explicit mqtt_server_impl_t(Scheduler&& scheduler)
  66. : mqtt_server_impl_t(tcp_frame_size, mqtt::max_payload, std::forward<Scheduler>(scheduler))
  67. {
  68. }
  69. /**
  70. * @brief destructor
  71. */
  72. ~mqtt_server_impl_t()
  73. {
  74. this->stop();
  75. }
  76. /**
  77. * @brief start the server
  78. * @param host - A string identifying a location. May be a descriptive name or
  79. * a numeric address string.
  80. * @param service - A string identifying the requested service. This may be a
  81. * descriptive name or a numeric string corresponding to a port number.
  82. */
  83. template<typename String, typename StrOrInt, typename... Args>
  84. bool start(String&& host, StrOrInt&& service, Args&&... args)
  85. {
  86. return this->derived()._do_start(
  87. std::forward<String>(host), std::forward<StrOrInt>(service),
  88. ecs_helper::make_ecs(asio::transfer_at_least(1),
  89. mqtt::mqtt_match_role, std::forward<Args>(args)...));
  90. }
  91. protected:
  92. template<typename C>
  93. inline void _handle_start(
  94. error_code ec, std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  95. {
  96. this->derived()._bind_default_mqtt_handler(ecs);
  97. return super::_handle_start(std::move(ec), std::move(this_ptr), std::move(ecs));
  98. }
  99. inline void _post_stop(const error_code& ec, std::shared_ptr<derived_t> this_ptr, state_t old_state)
  100. {
  101. asio::dispatch(this->derived().io_->context(), make_allocator(this->derived().wallocator(),
  102. [this, this_ptr]() mutable
  103. {
  104. this->mqtt_sessions().clear_mqtt_sessions();
  105. }));
  106. super::_post_stop(ec, std::move(this_ptr), old_state);
  107. }
  108. template<typename C>
  109. inline void _bind_default_mqtt_handler(std::shared_ptr<ecs_t<C>>& ecs)
  110. {
  111. detail::ignore_unused(ecs);
  112. // must set default callback for every mqtt message.
  113. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::connect ))) this->on_connect ([](std::shared_ptr<session_t>&, mqtt::message&, mqtt::message&) mutable {});
  114. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::connack ))) this->on_connack ([](std::shared_ptr<session_t>&, mqtt::message& ) mutable {});
  115. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::publish ))) this->on_publish ([](std::shared_ptr<session_t>&, mqtt::message&, mqtt::message&) mutable {});
  116. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::puback ))) this->on_puback ([](std::shared_ptr<session_t>&, mqtt::message& ) mutable {});
  117. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::pubrec ))) this->on_pubrec ([](std::shared_ptr<session_t>&, mqtt::message&, mqtt::message&) mutable {});
  118. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::pubrel ))) this->on_pubrel ([](std::shared_ptr<session_t>&, mqtt::message&, mqtt::message&) mutable {});
  119. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::pubcomp ))) this->on_pubcomp ([](std::shared_ptr<session_t>&, mqtt::message& ) mutable {});
  120. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::subscribe ))) this->on_subscribe ([](std::shared_ptr<session_t>&, mqtt::message&, mqtt::message&) mutable {});
  121. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::suback ))) this->on_suback ([](std::shared_ptr<session_t>&, mqtt::message& ) mutable {});
  122. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::unsubscribe))) this->on_unsubscribe([](std::shared_ptr<session_t>&, mqtt::message&, mqtt::message&) mutable {});
  123. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::unsuback ))) this->on_unsuback ([](std::shared_ptr<session_t>&, mqtt::message& ) mutable {});
  124. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::pingreq ))) this->on_pingreq ([](std::shared_ptr<session_t>&, mqtt::message&, mqtt::message&) mutable {});
  125. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::pingresp ))) this->on_pingresp ([](std::shared_ptr<session_t>&, mqtt::message& ) mutable {});
  126. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::disconnect ))) this->on_disconnect ([](std::shared_ptr<session_t>&, mqtt::message& ) mutable {});
  127. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::auth ))) this->on_auth ([](std::shared_ptr<session_t>&, mqtt::message&, mqtt::message&) mutable {});
  128. }
  129. template<typename... Args>
  130. inline std::shared_ptr<session_t> _make_session(Args&&... args)
  131. {
  132. std::shared_ptr<session_t> p = super::_make_session(std::forward<Args>(args)..., this->broker_state_);
  133. // Copy the parameter configuration of user calls for the "server" to each "session"
  134. p->_mqtt_options_copy_from(*this);
  135. return p;
  136. }
  137. inline auto& invoker () noexcept { return this->broker_state_.invoker_ ; }
  138. inline auto& mqtt_sessions () noexcept { return this->broker_state_.mqtt_sessions_ ; }
  139. inline auto& subs_map () noexcept { return this->broker_state_.subs_map_ ; }
  140. inline auto& shared_targets () noexcept { return this->broker_state_.shared_targets_ ; }
  141. inline auto& retained_messages() noexcept { return this->broker_state_.retained_messages_; }
  142. inline auto const& invoker () const noexcept { return this->broker_state_.invoker_ ; }
  143. inline auto const& mqtt_sessions () const noexcept { return this->broker_state_.mqtt_sessions_ ; }
  144. inline auto const& subs_map () const noexcept { return this->broker_state_.subs_map_ ; }
  145. inline auto const& shared_targets () const noexcept { return this->broker_state_.shared_targets_ ; }
  146. inline auto const& retained_messages() const noexcept { return this->broker_state_.retained_messages_; }
  147. public:
  148. inline auto& security () noexcept { return this->broker_state_.security_; }
  149. inline auto& get_security () noexcept { return this->broker_state_.security_; }
  150. inline auto const& security () const noexcept { return this->broker_state_.security_; }
  151. inline auto const& get_security () const noexcept { return this->broker_state_.security_; }
  152. protected:
  153. ///
  154. mqtt::broker_state<session_t, typename session_t::args_type> broker_state_;
  155. };
  156. }
  157. namespace asio2
  158. {
  159. template<class derived_t, class session_t>
  160. using mqtt_server_impl_t = detail::mqtt_server_impl_t<derived_t, session_t>;
  161. template<class session_t>
  162. class mqtt_server_t : public detail::mqtt_server_impl_t<mqtt_server_t<session_t>, session_t>
  163. {
  164. public:
  165. using detail::mqtt_server_impl_t<mqtt_server_t<session_t>, session_t>::mqtt_server_impl_t;
  166. };
  167. using mqtt_server = mqtt_server_t<mqtt_session>;
  168. }
  169. #include <asio2/base/detail/pop_options.hpp>
  170. #endif // !__ASIO2_MQTT_SERVER_HPP__