aop_subscribe.hpp 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. */
  10. #ifndef __ASIO2_MQTT_AOP_SUBSCRIBE_HPP__
  11. #define __ASIO2_MQTT_AOP_SUBSCRIBE_HPP__
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. #pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <asio2/base/iopool.hpp>
  16. #include <asio2/base/detail/function_traits.hpp>
  17. #include <asio2/base/detail/util.hpp>
  18. #include <asio2/mqtt/message.hpp>
  19. namespace asio2::detail
  20. {
  21. template<class caller_t, class args_t>
  22. class mqtt_aop_subscribe
  23. {
  24. friend caller_t;
  25. protected:
  26. template<class Message>
  27. inline mqtt::v5::properties_set _check_subscribe_properties(error_code& ec, Message& msg)
  28. {
  29. using message_type = typename detail::remove_cvref_t<Message>;
  30. if constexpr (std::is_same_v<message_type, mqtt::v5::subscribe>)
  31. {
  32. // Get subscription identifier
  33. mqtt::v5::subscription_identifier* sub_id =
  34. msg.properties().template get_if<mqtt::v5::subscription_identifier>();
  35. if (sub_id)
  36. {
  37. // The Subscription Identifier can have the value of 1 to 268,435,455.
  38. // It is a Protocol Error if the Subscription Identifier has a value of 0
  39. auto v = sub_id->value();
  40. if (v < 1 || v > 268435455)
  41. {
  42. ASIO2_ASSERT(false);
  43. ec = mqtt::make_error_code(mqtt::error::protocol_error);
  44. }
  45. else
  46. {
  47. std::ignore = true;
  48. }
  49. }
  50. return msg.properties();
  51. }
  52. else
  53. {
  54. return {};
  55. }
  56. }
  57. // must be server
  58. template<class Message, class Response, bool IsClient = args_t::is_client>
  59. inline std::enable_if_t<!IsClient, void>
  60. _before_subscribe_callback(
  61. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  62. Message& msg, Response& rep)
  63. {
  64. detail::ignore_unused(ec, caller_ptr, caller, om, msg, rep);
  65. using message_type = typename detail::remove_cvref_t<Message>;
  66. bool is_v5 = std::is_same_v<message_type, mqtt::v5::subscribe>;
  67. // A SUBACK and UNSUBACK MUST contain the Packet Identifier that was used in the
  68. // corresponding SUBSCRIBE and UNSUBSCRIBE packet respectively [MQTT-2.2.1-6].
  69. rep.packet_id(msg.packet_id());
  70. // subscription properties
  71. mqtt::v5::properties_set props = _check_subscribe_properties(ec, msg);
  72. for (mqtt::subscription& sub : msg.subscriptions().data())
  73. {
  74. // Reason Codes
  75. // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901178
  76. // error, the session will be disconnect a later
  77. if (sub.topic_filter().empty())
  78. {
  79. ec = mqtt::make_error_code(mqtt::error::topic_filter_invalid);
  80. rep.add_reason_codes(detail::to_underlying(is_v5 ?
  81. mqtt::error::topic_filter_invalid : mqtt::error::unspecified_error));
  82. continue;
  83. }
  84. mqtt::qos_type qos = sub.qos();
  85. // error, the session will be disconnect a later
  86. if (!mqtt::is_valid_qos(qos))
  87. {
  88. ec = mqtt::make_error_code(mqtt::error::unspecified_error);
  89. rep.add_reason_codes(detail::to_underlying(mqtt::error::unspecified_error));
  90. continue;
  91. }
  92. // not error, but not supported, and the session will not be disconnect
  93. if (detail::to_underlying(qos) > caller->maximum_qos())
  94. {
  95. ec = mqtt::make_error_code(mqtt::error::qos_not_supported);
  96. rep.add_reason_codes(detail::to_underlying(is_v5 ?
  97. mqtt::error::quota_exceeded : mqtt::error::unspecified_error));
  98. continue;
  99. }
  100. // not error, and supported too
  101. rep.add_reason_codes(detail::to_underlying(qos));
  102. typename caller_t::subnode_type node{ caller_ptr, sub, std::move(props) };
  103. std::string_view share_name = node.share_name();
  104. std::string_view topic_filter = node.topic_filter();
  105. if (!share_name.empty())
  106. {
  107. caller->shared_targets().insert(caller_ptr, share_name, topic_filter);
  108. }
  109. bool inserted = caller->subs_map().insert_or_assign(
  110. topic_filter, caller->client_id(), std::move(node)).second;
  111. mqtt::retain_handling_type rh = sub.retain_handling();
  112. if /**/ (rh == mqtt::retain_handling_type::send)
  113. {
  114. _send_retained_messages(caller_ptr, caller, sub);
  115. }
  116. else if (rh == mqtt::retain_handling_type::send_only_new_subscription)
  117. {
  118. if (inserted)
  119. {
  120. _send_retained_messages(caller_ptr, caller, sub);
  121. }
  122. }
  123. }
  124. }
  125. template<class Message, class Response, bool IsClient = args_t::is_client>
  126. inline std::enable_if_t<IsClient, void>
  127. _before_subscribe_callback(
  128. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  129. Message& msg, Response& rep)
  130. {
  131. detail::ignore_unused(ec, caller_ptr, caller, om, msg, rep);
  132. ASIO2_ASSERT(false && "client should't recv the subscribe message");
  133. }
  134. inline void _send_retained_messages(
  135. std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::subscription& sub)
  136. {
  137. detail::ignore_unused(caller_ptr, caller, sub);
  138. // use push_event to ensure the publish message is sent to clients must after mqtt
  139. // response is sent already.
  140. caller->push_event(
  141. [caller_ptr, caller, sub = std::move(sub), topic_filter = std::string{ sub.topic_filter() }]
  142. (event_queue_guard<caller_t> g) mutable
  143. {
  144. detail::ignore_unused(g);
  145. mqtt::v5::properties_set props;
  146. caller->retained_messages().find(topic_filter, [caller_ptr, caller, &sub, &props]
  147. (mqtt::rmnode& node) mutable
  148. {
  149. std::visit([caller_ptr, caller, &sub, &props](auto& pub) mutable
  150. {
  151. using T = asio2::detail::remove_cvref_t<decltype(pub)>;
  152. if constexpr (mqtt::is_publish_message<T>())
  153. {
  154. caller->_send_publish_to_subscriber(caller_ptr, sub, props, pub);
  155. }
  156. else
  157. {
  158. ASIO2_ASSERT(false);
  159. }
  160. }, node.message.base());
  161. });
  162. });
  163. }
  164. // must be server
  165. inline void _before_user_callback_impl(
  166. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  167. mqtt::v3::subscribe& msg, mqtt::v3::suback& rep)
  168. {
  169. if (_before_subscribe_callback(ec, caller_ptr, caller, om, msg, rep); ec)
  170. return;
  171. }
  172. // must be server
  173. inline void _before_user_callback_impl(
  174. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  175. mqtt::v4::subscribe& msg, mqtt::v4::suback& rep)
  176. {
  177. if (_before_subscribe_callback(ec, caller_ptr, caller, om, msg, rep); ec)
  178. return;
  179. }
  180. // must be server
  181. inline void _before_user_callback_impl(
  182. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  183. mqtt::v5::subscribe& msg, mqtt::v5::suback& rep)
  184. {
  185. if (_before_subscribe_callback(ec, caller_ptr, caller, om, msg, rep); ec)
  186. return;
  187. }
  188. inline void _after_user_callback_impl(
  189. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  190. mqtt::v3::subscribe& msg, mqtt::v3::suback& rep)
  191. {
  192. detail::ignore_unused(ec, caller_ptr, caller, om, msg, rep);
  193. }
  194. inline void _after_user_callback_impl(
  195. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  196. mqtt::v4::subscribe& msg, mqtt::v4::suback& rep)
  197. {
  198. detail::ignore_unused(ec, caller_ptr, caller, om, msg, rep);
  199. }
  200. inline void _after_user_callback_impl(
  201. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  202. mqtt::v5::subscribe& msg, mqtt::v5::suback& rep)
  203. {
  204. detail::ignore_unused(ec, caller_ptr, caller, om, msg, rep);
  205. }
  206. };
  207. }
  208. #endif // !__ASIO2_MQTT_AOP_SUBSCRIBE_HPP__