aop_publish.hpp 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. */
  10. #ifndef __ASIO2_MQTT_AOP_PUBLISH_HPP__
  11. #define __ASIO2_MQTT_AOP_PUBLISH_HPP__
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. #pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <asio2/base/iopool.hpp>
  16. #include <asio2/base/detail/function_traits.hpp>
  17. #include <asio2/base/detail/util.hpp>
  18. #include <asio2/mqtt/message.hpp>
  19. namespace asio2::detail
  20. {
  21. template<class caller_t, class args_t>
  22. class mqtt_aop_publish
  23. {
  24. friend caller_t;
  25. protected:
  26. template<class Message>
  27. inline void _do_check_publish_topic_alias(
  28. error_code& ec, caller_t* caller, Message& msg, std::string& topic_name)
  29. {
  30. // << topic_alias >>
  31. // A Topic Alias of 0 is not permitted. A sender MUST NOT send a PUBLISH packet containing a Topic Alias
  32. // which has the value 0 [MQTT-3.3.2-8].
  33. // << topic_alias_maximum >>
  34. // This value indicates the highest value that the Client will accept as a Topic Alias sent by the Server.
  35. // The Client uses this value to limit the number of Topic Aliases that it is willing to hold on this Connection.
  36. // The Server MUST NOT send a Topic Alias in a PUBLISH packet to the Client greater than Topic Alias Maximum
  37. // [MQTT-3.1.2-26]. A value of 0 indicates that the Client does not accept any Topic Aliases on this connection.
  38. // If Topic Alias Maximum is absent or zero, the Server MUST NOT send any Topic Aliases to the Client [MQTT-3.1.2-27].
  39. mqtt::v5::topic_alias* topic_alias = msg.properties().template get_if<mqtt::v5::topic_alias>();
  40. if (topic_alias)
  41. {
  42. auto alias_value = topic_alias->value();
  43. if (alias_value == 0 || alias_value > caller->topic_alias_maximum())
  44. {
  45. ec = mqtt::make_error_code(mqtt::error::malformed_packet);
  46. return;
  47. }
  48. if (!topic_name.empty())
  49. {
  50. caller->push_topic_alias(alias_value, topic_name);
  51. }
  52. else
  53. {
  54. if (!caller->find_topic_alias(alias_value, topic_name))
  55. {
  56. ec = mqtt::make_error_code(mqtt::error::topic_alias_invalid);
  57. return;
  58. }
  59. }
  60. }
  61. }
  62. template<class Message>
  63. inline void _check_publish_topic_alias(
  64. error_code& ec, caller_t* caller, Message& msg, std::string& topic_name)
  65. {
  66. using message_type = typename detail::remove_cvref_t<Message>;
  67. // << topic_alias >>
  68. // A Topic Alias of 0 is not permitted. A sender MUST NOT send a PUBLISH packet containing a Topic Alias
  69. // which has the value 0 [MQTT-3.3.2-8].
  70. // << topic_alias_maximum >>
  71. // This value indicates the highest value that the Client will accept as a Topic Alias sent by the Server.
  72. // The Client uses this value to limit the number of Topic Aliases that it is willing to hold on this Connection.
  73. // The Server MUST NOT send a Topic Alias in a PUBLISH packet to the Client greater than Topic Alias Maximum
  74. // [MQTT-3.1.2-26]. A value of 0 indicates that the Client does not accept any Topic Aliases on this connection.
  75. // If Topic Alias Maximum is absent or zero, the Server MUST NOT send any Topic Aliases to the Client [MQTT-3.1.2-27].
  76. if constexpr (std::is_same_v<message_type, mqtt::v5::publish>)
  77. {
  78. _do_check_publish_topic_alias(ec, caller, msg, topic_name);
  79. }
  80. else
  81. {
  82. detail::ignore_unused(ec, caller, msg, topic_name);
  83. }
  84. }
  85. // server or client
  86. template<class Message, class Response>
  87. inline void _before_publish_callback(
  88. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  89. Message& msg, Response& rep)
  90. {
  91. detail::ignore_unused(ec, caller_ptr, caller, om, msg, rep);
  92. using message_type [[maybe_unused]] = typename detail::remove_cvref_t<Message>;
  93. using response_type [[maybe_unused]] = typename detail::remove_cvref_t<Response>;
  94. // A PUBACK, PUBREC , PUBREL, or PUBCOMP packet MUST contain the same Packet Identifier
  95. // as the PUBLISH packet that was originally sent [MQTT-2.2.1-5].
  96. if (msg.has_packet_id())
  97. {
  98. rep.packet_id(msg.packet_id());
  99. }
  100. mqtt::qos_type qos = msg.qos();
  101. // the qos 0 publish messgae don't need response
  102. if (qos == mqtt::qos_type::at_most_once)
  103. rep.set_send_flag(false);
  104. if (!mqtt::is_valid_qos(qos))
  105. {
  106. ec = mqtt::make_error_code(mqtt::error::malformed_packet);
  107. return;
  108. }
  109. if (detail::to_underlying(qos) > caller->maximum_qos())
  110. {
  111. ec = mqtt::make_error_code(mqtt::error::qos_not_supported);
  112. return;
  113. }
  114. if (msg.retain() && caller->retain_available() == false)
  115. {
  116. ec = mqtt::make_error_code(mqtt::error::retain_not_supported);
  117. return;
  118. }
  119. // The Packet Identifier field is only present in PUBLISH Packets where the QoS level is 1 or 2.
  120. if (detail::to_underlying(qos) > 0 && msg.has_packet_id() == false)
  121. {
  122. ec = mqtt::make_error_code(mqtt::error::malformed_packet); // error code : broker.hivemq.com
  123. return;
  124. }
  125. if (detail::to_underlying(qos) == 0 && msg.has_packet_id() == true)
  126. {
  127. ec = mqtt::make_error_code(mqtt::error::malformed_packet); // error code : broker.hivemq.com
  128. return;
  129. }
  130. std::string topic_name{ msg.topic_name() };
  131. // must first determine whether topic_name is empty, beacuse v5::publish's topic_name maybe empty.
  132. if (!topic_name.empty())
  133. {
  134. if (mqtt::is_topic_name_valid(topic_name) == false)
  135. {
  136. ec = mqtt::make_error_code(mqtt::error::topic_name_invalid);
  137. return;
  138. }
  139. }
  140. if (_check_publish_topic_alias(ec, caller, msg, topic_name); ec)
  141. return;
  142. // All Topic Names and Topic Filters MUST be at least one character long [MQTT-4.7.3-1]
  143. if (topic_name.empty())
  144. {
  145. ec = mqtt::make_error_code(mqtt::error::topic_name_invalid);
  146. return;
  147. }
  148. //// Potentially allow write access for bridge status, otherwise explicitly deny.
  149. //// rc = mosquitto_topic_matches_sub("$SYS/broker/connection/+/state", topic, std::addressof(match));
  150. //if (topic_name.compare(0, 4, "$SYS") == 0)
  151. //{
  152. // ec = mqtt::make_error_code(mqtt::error::topic_name_invalid);
  153. // return;
  154. //}
  155. // Only allow sub/unsub to shared subscriptions
  156. if (topic_name.compare(0, 6, "$share") == 0)
  157. {
  158. ec = mqtt::make_error_code(mqtt::error::topic_name_invalid);
  159. return;
  160. }
  161. constexpr bool is_pubrec =
  162. std::is_same_v<response_type, mqtt::v3::pubrec> ||
  163. std::is_same_v<response_type, mqtt::v4::pubrec> ||
  164. std::is_same_v<response_type, mqtt::v5::pubrec>;
  165. // the client or session sent publish with qos 2 but don't recvd pubrec, and it sent publish
  166. // a later again, so we need sent pubrec directly and return directly
  167. if (msg.qos() == mqtt::qos_type::exactly_once && caller->exactly_once_processing(msg.packet_id()))
  168. {
  169. ASIO2_ASSERT(msg.has_packet_id());
  170. ASIO2_ASSERT(is_pubrec);
  171. // return, then the pubrec will be sent directly
  172. return;
  173. }
  174. if constexpr (is_pubrec)
  175. {
  176. ASIO2_ASSERT(msg.has_packet_id());
  177. ASIO2_ASSERT(msg.qos() == mqtt::qos_type::exactly_once);
  178. caller->exactly_once_start(msg.packet_id());
  179. }
  180. else
  181. {
  182. std::ignore = true;
  183. }
  184. _multicast_publish(caller_ptr, caller, msg, std::move(topic_name));
  185. }
  186. template<class Message, bool IsClient = args_t::is_client>
  187. inline std::enable_if_t<!IsClient, void>
  188. _multicast_publish(
  189. std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, Message&& msg, std::string topic_name)
  190. {
  191. using message_type [[maybe_unused]] = typename detail::remove_cvref_t<Message>;
  192. // use post and push_event to ensure the publish message is sent to clients must
  193. // after mqtt response is sent already.
  194. asio::post(caller->io_->context(), make_allocator(caller->wallocator(),
  195. [this, caller, caller_ptr, msg = std::move(msg), topic_name = std::move(topic_name)]
  196. () mutable
  197. {
  198. caller->push_event(
  199. [this, caller, caller_ptr = std::move(caller_ptr), msg = std::move(msg),
  200. topic_name = std::move(topic_name)]
  201. (event_queue_guard<caller_t> g) mutable
  202. {
  203. detail::ignore_unused(g);
  204. this->_do_multicast_publish(caller_ptr, caller, std::move(msg), std::move(topic_name));
  205. });
  206. }));
  207. }
  208. template<class Message, bool IsClient = args_t::is_client>
  209. inline std::enable_if_t<IsClient, void>
  210. _multicast_publish(
  211. std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, Message&& msg, std::string topic_name)
  212. {
  213. detail::ignore_unused(caller_ptr, caller, msg, topic_name);
  214. }
  215. template<class Message>
  216. inline void _do_multicast_publish(
  217. std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, Message&& msg, std::string topic_name)
  218. {
  219. detail::ignore_unused(caller_ptr, caller, msg);
  220. using message_type [[maybe_unused]] = typename detail::remove_cvref_t<Message>;
  221. // share_name topic_filter
  222. std::set<std::tuple<std::string, std::string>> sent;
  223. if (topic_name.empty())
  224. topic_name = msg.topic_name();
  225. ASIO2_ASSERT(!topic_name.empty());
  226. caller->subs_map().match(topic_name,
  227. [this, caller, &msg, &sent](std::string_view key, auto& node) mutable
  228. {
  229. detail::ignore_unused(this, key);
  230. mqtt::subscription& sub = node.sub;
  231. std::string_view share_name = sub.share_name();
  232. std::string_view topic_filter = sub.topic_filter();
  233. if (share_name.empty())
  234. {
  235. // Non shared subscriptions
  236. auto session_ptr = node.caller.lock();
  237. if (!session_ptr)
  238. return;
  239. // If NL (no local) subscription option is set and
  240. // publisher is the same as subscriber, then skip it.
  241. if (sub.no_local() && session_ptr->hash_key() == caller->hash_key())
  242. return;
  243. // send message
  244. _send_publish_to_subscriber(std::move(session_ptr), node.sub, node.props, msg);
  245. }
  246. else
  247. {
  248. // Shared subscriptions
  249. bool inserted;
  250. std::tie(std::ignore, inserted) = sent.emplace(share_name, topic_filter);
  251. if (inserted)
  252. {
  253. auto session_ptr = caller->shared_targets().get_target(share_name, topic_filter);
  254. if (session_ptr)
  255. {
  256. _send_publish_to_subscriber(std::move(session_ptr), node.sub, node.props, msg);
  257. }
  258. }
  259. }
  260. });
  261. if (msg.retain())
  262. {
  263. _do_retain_publish(caller_ptr, caller, msg, topic_name);
  264. }
  265. }
  266. template<class Message>
  267. inline void _do_retain_publish(
  268. std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, Message&& msg, std::string topic_name)
  269. {
  270. detail::ignore_unused(caller_ptr, caller, msg);
  271. using message_type = typename detail::remove_cvref_t<Message>;
  272. /*
  273. * If the message is marked as being retained, then we
  274. * keep it in case a new subscription is added that matches
  275. * this topic.
  276. *
  277. * @note: The MQTT standard 3.3.1.3 RETAIN makes it clear that
  278. * retained messages are global based on the topic, and
  279. * are not scoped by the client id. So any client may
  280. * publish a retained message on any topic, and the most
  281. * recently published retained message on a particular
  282. * topic is the message that is stored on the server.
  283. *
  284. * @note: The standard doesn't make it clear that publishing
  285. * a message with zero length, but the retain flag not
  286. * set, does not result in any existing retained message
  287. * being removed. However, internet searching indicates
  288. * that most brokers have opted to keep retained messages
  289. * when receiving contents of zero bytes, unless the so
  290. * received message has the retain flag set, in which case
  291. * the retained message is removed.
  292. */
  293. if (msg.payload().empty())
  294. {
  295. caller->retained_messages().erase(topic_name);
  296. }
  297. else
  298. {
  299. std::shared_ptr<asio::steady_timer> expiry_timer;
  300. if constexpr (std::is_same_v<message_type, mqtt::v5::publish>)
  301. {
  302. mqtt::v5::message_expiry_interval* mei =
  303. msg.properties().template get_if<mqtt::v5::message_expiry_interval>();
  304. if (mei)
  305. {
  306. expiry_timer = std::make_shared<asio::steady_timer>(
  307. caller->io_->context(), std::chrono::seconds(mei->value()));
  308. expiry_timer->async_wait(
  309. [caller, topic_name, wp = std::weak_ptr<asio::steady_timer>(expiry_timer)]
  310. (error_code const& ec) mutable
  311. {
  312. if (auto sp = wp.lock())
  313. {
  314. if (!ec)
  315. {
  316. caller->retained_messages().erase(topic_name);
  317. }
  318. }
  319. });
  320. }
  321. }
  322. else
  323. {
  324. std::ignore = true;
  325. }
  326. caller->retained_messages().insert_or_assign(topic_name,
  327. mqtt::rmnode{ msg, std::move(expiry_timer) });
  328. }
  329. }
  330. template<class session_t, class Message>
  331. inline void _send_publish_to_subscriber(
  332. std::shared_ptr<session_t> session, mqtt::subscription& sub, mqtt::v5::properties_set& props,
  333. Message& msg)
  334. {
  335. if (!session)
  336. return;
  337. mqtt::version ver = session->version();
  338. // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901026
  339. // The Client and Server assign Packet Identifiers independently of each other.
  340. if /**/ (ver == mqtt::version::v3)
  341. {
  342. _prepare_send_publish(std::move(session), sub, props, msg, mqtt::v3::publish{});
  343. }
  344. else if (ver == mqtt::version::v4)
  345. {
  346. _prepare_send_publish(std::move(session), sub, props, msg, mqtt::v4::publish{});
  347. }
  348. else if (ver == mqtt::version::v5)
  349. {
  350. _prepare_send_publish(std::move(session), sub, props, msg, mqtt::v5::publish{});
  351. }
  352. }
  353. template<class session_t, class Message, class Response>
  354. inline void _prepare_send_publish(
  355. std::shared_ptr<session_t> session, mqtt::subscription& sub, mqtt::v5::properties_set& props,
  356. Message& msg, Response&& rep)
  357. {
  358. using message_type = typename detail::remove_cvref_t<Message>;
  359. using response_type = typename detail::remove_cvref_t<Response>;
  360. // dup
  361. rep.dup(msg.dup());
  362. // qos
  363. rep.qos((std::min)(msg.qos(), sub.qos()));
  364. // retaion
  365. // Bit 3 of the Subscription Options represents the Retain As Published option.
  366. // Retained messages sent when the subscription is established have the RETAIN flag set to 1.
  367. // If 1, Application Messages forwarded using this subscription keep the RETAIN
  368. // flag they were published with.
  369. if (sub.rap())
  370. {
  371. rep.retain(msg.retain());
  372. }
  373. // If 0, Application Messages forwarded using this subscription have the RETAIN
  374. // flag set to 0.
  375. else
  376. {
  377. rep.retain(false);
  378. }
  379. // topic, payload
  380. rep.topic_name(msg.topic_name());
  381. rep.payload(msg.payload());
  382. // properties
  383. if constexpr (std::is_same_v<response_type, mqtt::v5::publish>)
  384. {
  385. if constexpr (std::is_same_v<message_type, mqtt::v5::publish>)
  386. {
  387. rep.properties() = msg.properties();
  388. }
  389. else
  390. {
  391. std::ignore = true;
  392. }
  393. props.for_each([&rep](auto& prop) mutable
  394. {
  395. rep.properties().erase(prop);
  396. rep.properties().add(prop);
  397. });
  398. }
  399. else
  400. {
  401. std::ignore = true;
  402. }
  403. // prepare send
  404. session_t* p = session.get();
  405. p->dispatch([this, session = std::move(session), rep = std::forward<Response>(rep)]() mutable
  406. {
  407. this->_check_send_publish(std::move(session), std::move(rep));
  408. });
  409. }
  410. template<class session_t, class Response>
  411. inline void _check_send_publish(std::shared_ptr<session_t> session, Response&& rep)
  412. {
  413. using response_type = typename detail::remove_cvref_t<Response>;
  414. if (session->is_started())
  415. {
  416. if (session->offline_messages_.empty())
  417. {
  418. auto pub_qos = rep.qos();
  419. if (pub_qos == mqtt::qos_type::at_least_once || pub_qos == mqtt::qos_type::exactly_once)
  420. {
  421. if (auto pid = session->idmgr_.get())
  422. {
  423. // TODO: Probably this should be switched to async_publish?
  424. // Given the async_client / sync_client seperation
  425. // and the way they have different function names,
  426. // it wouldn't be possible for broker.hpp to be
  427. // used with some hypothetical "async_server" in the future.
  428. rep.packet_id(pid);
  429. _do_send_publish(session, std::forward<Response>(rep));
  430. }
  431. else
  432. {
  433. // no packet id available
  434. ASIO2_ASSERT(false);
  435. // offline_messages_ is not empty or packet_id_exhausted
  436. session->offline_messages_.push_back(session->io_->context(),
  437. std::forward<Response>(rep));
  438. }
  439. }
  440. else
  441. {
  442. // A PUBLISH Packet MUST NOT contain a Packet Identifier if its QoS value is set to 0
  443. ASIO2_ASSERT(rep.has_packet_id() == false);
  444. _do_send_publish(session, std::forward<Response>(rep));
  445. }
  446. }
  447. else
  448. {
  449. // send all offline messages first
  450. _send_all_offline_message(session);
  451. _do_send_publish(session, std::forward<Response>(rep));
  452. }
  453. }
  454. else
  455. {
  456. session->offline_messages_.push_back(session->io_->context(), std::forward<Response>(rep));
  457. }
  458. }
  459. template<class session_t, class Response>
  460. inline void _do_send_publish(std::shared_ptr<session_t> session, Response&& rep)
  461. {
  462. session->push_event([session, id = session->life_id(), rep = std::forward<Response>(rep)]
  463. (event_queue_guard<caller_t> g) mutable
  464. {
  465. if (id != session->life_id())
  466. {
  467. set_last_error(asio::error::operation_aborted);
  468. return;
  469. }
  470. session->_do_send(rep, [session, &rep, g = std::move(g)]
  471. (const error_code& ec, std::size_t) mutable
  472. {
  473. // send failed, add it to offline messages
  474. if (ec)
  475. {
  476. session->offline_messages_.push_back(session->io_->context(), std::move(rep));
  477. }
  478. });
  479. });
  480. }
  481. template<class session_t>
  482. inline void _send_all_offline_message(std::shared_ptr<session_t> session)
  483. {
  484. session->offline_messages_.for_each([this, session](mqtt::omnode& node) mutable
  485. {
  486. std::visit([this, session](auto&& pub) mutable
  487. {
  488. this->_do_send_publish(session, std::move(pub));
  489. }, node.message.base());
  490. });
  491. session->offline_messages_.clear();
  492. }
  493. // server or client
  494. inline void _before_user_callback_impl(
  495. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  496. mqtt::v3::publish& msg, mqtt::v3::puback& rep)
  497. {
  498. if (_before_publish_callback(ec, caller_ptr, caller, om, msg, rep); ec)
  499. return;
  500. }
  501. // server or client
  502. inline void _before_user_callback_impl(
  503. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  504. mqtt::v4::publish& msg, mqtt::v4::puback& rep)
  505. {
  506. if (_before_publish_callback(ec, caller_ptr, caller, om, msg, rep); ec)
  507. return;
  508. }
  509. // server or client
  510. inline void _before_user_callback_impl(
  511. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  512. mqtt::v5::publish& msg, mqtt::v5::puback& rep)
  513. {
  514. if (_before_publish_callback(ec, caller_ptr, caller, om, msg, rep); ec)
  515. return;
  516. }
  517. // server or client
  518. inline void _before_user_callback_impl(
  519. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  520. mqtt::v3::publish& msg, mqtt::v3::pubrec& rep)
  521. {
  522. if (_before_publish_callback(ec, caller_ptr, caller, om, msg, rep); ec)
  523. return;
  524. }
  525. // server or client
  526. inline void _before_user_callback_impl(
  527. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  528. mqtt::v4::publish& msg, mqtt::v4::pubrec& rep)
  529. {
  530. if (_before_publish_callback(ec, caller_ptr, caller, om, msg, rep); ec)
  531. return;
  532. }
  533. // server or client
  534. inline void _before_user_callback_impl(
  535. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  536. mqtt::v5::publish& msg, mqtt::v5::pubrec& rep)
  537. {
  538. if (_before_publish_callback(ec, caller_ptr, caller, om, msg, rep); ec)
  539. return;
  540. }
  541. template<class Message, class Response, bool IsClient = args_t::is_client>
  542. inline std::enable_if_t<!IsClient, void>
  543. _do_publish_router(
  544. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  545. Message& msg, Response& rep)
  546. {
  547. detail::ignore_unused(ec, caller_ptr, caller, om, msg, rep);
  548. }
  549. template<class Message, class Response, bool IsClient = args_t::is_client>
  550. inline std::enable_if_t<IsClient, void>
  551. _do_publish_router(
  552. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  553. Message& msg, Response& rep)
  554. {
  555. detail::ignore_unused(ec, caller_ptr, caller, om, msg, rep);
  556. using message_type [[maybe_unused]] = typename detail::remove_cvref_t<Message>;
  557. using response_type [[maybe_unused]] = typename detail::remove_cvref_t<Response>;
  558. std::string_view topic_name = msg.topic_name();
  559. // client don't need lock
  560. caller->subs_map().match(topic_name, [this, caller, &om](std::string_view key, auto& node) mutable
  561. {
  562. detail::ignore_unused(this, caller, key);
  563. mqtt::subscription& sub = node.sub;
  564. [[maybe_unused]] std::string_view share_name = sub.share_name();
  565. [[maybe_unused]] std::string_view topic_filter = sub.topic_filter();
  566. if (share_name.empty())
  567. {
  568. if (node.callback)
  569. node.callback(om);
  570. }
  571. else
  572. {
  573. }
  574. });
  575. }
  576. // server or client
  577. template<class Message, class Response>
  578. inline void _after_publish_callback(
  579. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  580. Message& msg, Response& rep)
  581. {
  582. detail::ignore_unused(ec, caller_ptr, caller, om, msg, rep);
  583. using message_type [[maybe_unused]] = typename detail::remove_cvref_t<Message>;
  584. using response_type [[maybe_unused]] = typename detail::remove_cvref_t<Response>;
  585. [[maybe_unused]] std::string topic_name{ msg.topic_name() };
  586. // << topic_alias >>
  587. // A Topic Alias of 0 is not permitted. A sender MUST NOT send a PUBLISH packet containing a Topic Alias
  588. // which has the value 0 [MQTT-3.3.2-8].
  589. // << topic_alias_maximum >>
  590. // This value indicates the highest value that the Client will accept as a Topic Alias sent by the Server.
  591. // The Client uses this value to limit the number of Topic Aliases that it is willing to hold on this Connection.
  592. // The Server MUST NOT send a Topic Alias in a PUBLISH packet to the Client greater than Topic Alias Maximum
  593. // [MQTT-3.1.2-26]. A value of 0 indicates that the Client does not accept any Topic Aliases on this connection.
  594. // If Topic Alias Maximum is absent or zero, the Server MUST NOT send any Topic Aliases to the Client [MQTT-3.1.2-27].
  595. if constexpr (std::is_same_v<message_type, mqtt::v5::publish>)
  596. {
  597. mqtt::v5::topic_alias* topic_alias = msg.properties().template get_if<mqtt::v5::topic_alias>();
  598. if (topic_alias)
  599. {
  600. caller->find_topic_alias(topic_alias->value(), topic_name);
  601. }
  602. }
  603. else
  604. {
  605. std::ignore = true;
  606. }
  607. _do_publish_router(ec, caller_ptr, caller, om, msg, rep);
  608. }
  609. inline void _after_user_callback_impl(
  610. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  611. mqtt::v3::publish& msg, mqtt::v3::puback& rep)
  612. {
  613. if (_after_publish_callback(ec, caller_ptr, caller, om, msg, rep); ec)
  614. return;
  615. }
  616. inline void _after_user_callback_impl(
  617. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  618. mqtt::v4::publish& msg, mqtt::v4::puback& rep)
  619. {
  620. if (_after_publish_callback(ec, caller_ptr, caller, om, msg, rep); ec)
  621. return;
  622. }
  623. inline void _after_user_callback_impl(
  624. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  625. mqtt::v5::publish& msg, mqtt::v5::puback& rep)
  626. {
  627. if (_after_publish_callback(ec, caller_ptr, caller, om, msg, rep); ec)
  628. return;
  629. }
  630. inline void _after_user_callback_impl(
  631. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  632. mqtt::v3::publish& msg, mqtt::v3::pubrec& rep)
  633. {
  634. if (_after_publish_callback(ec, caller_ptr, caller, om, msg, rep); ec)
  635. return;
  636. }
  637. inline void _after_user_callback_impl(
  638. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  639. mqtt::v4::publish& msg, mqtt::v4::pubrec& rep)
  640. {
  641. if (_after_publish_callback(ec, caller_ptr, caller, om, msg, rep); ec)
  642. return;
  643. }
  644. inline void _after_user_callback_impl(
  645. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  646. mqtt::v5::publish& msg, mqtt::v5::pubrec& rep)
  647. {
  648. if (_after_publish_callback(ec, caller_ptr, caller, om, msg, rep); ec)
  649. return;
  650. }
  651. };
  652. }
  653. #endif // !__ASIO2_MQTT_AOP_PUBLISH_HPP__