mqtt_invoker.hpp 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. */
  10. #ifndef __ASIO2_MQTT_INVOKER_HPP__
  11. #define __ASIO2_MQTT_INVOKER_HPP__
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. #pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <asio2/base/iopool.hpp>
  16. #include <asio2/base/define.hpp>
  17. #include <asio2/base/log.hpp>
  18. #include <asio2/base/detail/function_traits.hpp>
  19. #include <asio2/base/detail/util.hpp>
  20. #include <asio2/base/detail/shared_mutex.hpp>
  21. #include <asio2/mqtt/detail/mqtt_topic_util.hpp>
  22. #include <asio2/mqtt/detail/mqtt_subscription_map.hpp>
  23. #include <asio2/mqtt/detail/mqtt_shared_target.hpp>
  24. #include <asio2/mqtt/detail/mqtt_retained_message.hpp>
  25. #include <asio2/mqtt/detail/mqtt_message_router.hpp>
  26. #include <asio2/mqtt/message.hpp>
  27. #if !defined(ASIO2_HEADER_ONLY) && __has_include(<boost/core/type_name.hpp>)
  28. #include <boost/core/type_name.hpp>
  29. #else
  30. #include <asio2/bho/core/type_name.hpp>
  31. #endif
  32. namespace asio2::detail
  33. {
  34. ASIO2_CLASS_FORWARD_DECLARE_BASE;
  35. ASIO2_CLASS_FORWARD_DECLARE_TCP_BASE;
  36. ASIO2_CLASS_FORWARD_DECLARE_TCP_SERVER;
  37. ASIO2_CLASS_FORWARD_DECLARE_TCP_SESSION;
  38. ASIO2_CLASS_FORWARD_DECLARE_TCP_CLIENT;
  39. template<class caller_t, class args_t>
  40. class mqtt_invoker_t
  41. {
  42. friend caller_t;
  43. ASIO2_CLASS_FRIEND_DECLARE_BASE;
  44. ASIO2_CLASS_FRIEND_DECLARE_TCP_BASE;
  45. ASIO2_CLASS_FRIEND_DECLARE_TCP_SERVER;
  46. ASIO2_CLASS_FRIEND_DECLARE_TCP_SESSION;
  47. ASIO2_CLASS_FRIEND_DECLARE_TCP_CLIENT;
  48. protected:
  49. struct dummy {};
  50. public:
  51. using self = mqtt_invoker_t<caller_t, args_t>;
  52. using handler_type = std::function<
  53. void(error_code&, std::shared_ptr<caller_t>&, caller_t*, std::string_view&)>;
  54. /**
  55. * @brief constructor
  56. */
  57. mqtt_invoker_t() noexcept : mqtt_handlers_() {}
  58. /**
  59. * @brief destructor
  60. */
  61. ~mqtt_invoker_t() = default;
  62. /**
  63. * @brief bind connect listener
  64. * @param fun - a user defined callback function.
  65. * @li Function signature :
  66. * server : void(std::shared_ptr<mqtt_session>& session_ptr,
  67. * mqtt::v4::connect& msg, mqtt::v4::connack& rep) or v3 or v5
  68. * or : void(std::shared_ptr<mqtt_session>& session_ptr,
  69. * mqtt::message& msg, mqtt::message& rep)
  70. * client : Don't need
  71. */
  72. template<class F, class ...C>
  73. inline self& on_connect(F&& fun, C&&... obj)
  74. {
  75. this->_bind(mqtt::control_packet_type::connect, std::forward<F>(fun), std::forward<C>(obj)...);
  76. return (*this);
  77. }
  78. /**
  79. * @brief bind connack listener
  80. * @param fun - a user defined callback function.
  81. * @li Function signature : server : Don't need
  82. * client : void(mqtt::v4::connack& msg) or v3 or v5
  83. * or : void(mqtt::message& msg)
  84. */
  85. template<class F, class ...C>
  86. inline self& on_connack(F&& fun, C&&... obj)
  87. {
  88. this->_bind(mqtt::control_packet_type::connack, std::forward<F>(fun), std::forward<C>(obj)...);
  89. return (*this);
  90. }
  91. /**
  92. * @brief bind publish listener
  93. * @param fun - a user defined callback function.
  94. * @li Function signature :
  95. * server : void(std::shared_ptr<mqtt_session>& session_ptr,
  96. * mqtt::v4::publish& msg, mqtt::v4::puback rep) or v3 or v5
  97. * or : void(std::shared_ptr<mqtt_session>& session_ptr,
  98. * mqtt::message& msg, mqtt::message rep)
  99. * client : void(mqtt::v4::publish& msg, mqtt::v4::puback rep) or v3 or v5
  100. * or : void(mqtt::message& msg, mqtt::message rep)
  101. */
  102. template<class F, class ...C>
  103. inline self& on_publish(F&& fun, C&&... obj)
  104. {
  105. this->_bind(mqtt::control_packet_type::publish, std::forward<F>(fun), std::forward<C>(obj)...);
  106. return (*this);
  107. }
  108. /**
  109. * @brief bind puback listener
  110. * @param fun - a user defined callback function.
  111. * @li Function signature :
  112. * server : void(std::shared_ptr<mqtt_session>& session_ptr, mqtt::v4::puback& msg) or v3 or v5
  113. * or : void(std::shared_ptr<mqtt_session>& session_ptr, mqtt::message& msg)
  114. * client : void(mqtt::v4::puback& puback) or v3 or v5
  115. * or : void(mqtt::message& msg)
  116. */
  117. template<class F, class ...C>
  118. inline self& on_puback(F&& fun, C&&... obj)
  119. {
  120. this->_bind(mqtt::control_packet_type::puback, std::forward<F>(fun), std::forward<C>(obj)...);
  121. return (*this);
  122. }
  123. /**
  124. * @brief bind pubrec listener
  125. * @param fun - a user defined callback function.
  126. * @li Function signature :
  127. * server : void(std::shared_ptr<mqtt_session>& session_ptr,
  128. * mqtt::v4::pubrec& msg, mqtt::v4::pubrel& rep) or v3 or v5
  129. * or : void(std::shared_ptr<mqtt_session>& session_ptr,
  130. * mqtt::message& msg, mqtt::message& rep)
  131. * client : void(mqtt::v4::pubrec& msg, mqtt::v4::pubrel& rep) or v3 or v5
  132. * or : void(mqtt::message& msg, mqtt::message& rep)
  133. */
  134. template<class F, class ...C>
  135. inline self& on_pubrec(F&& fun, C&&... obj)
  136. {
  137. this->_bind(mqtt::control_packet_type::pubrec, std::forward<F>(fun), std::forward<C>(obj)...);
  138. return (*this);
  139. }
  140. /**
  141. * @brief bind pubrel listener
  142. * @param fun - a user defined callback function.
  143. * @li Function signature :
  144. * server : void(std::shared_ptr<mqtt_session>& session_ptr,
  145. * mqtt::v4::pubrel& msg, mqtt::v4::pubcomp& rep) or v3 or v5
  146. * or : void(std::shared_ptr<mqtt_session>& session_ptr,
  147. * mqtt::message& msg, mqtt::message& rep)
  148. * client : void(mqtt::v4::pubrel& msg, mqtt::v4::pubcomp& rep) or v3 or v5
  149. * or : void(mqtt::message& msg, mqtt::message& rep)
  150. */
  151. template<class F, class ...C>
  152. inline self& on_pubrel(F&& fun, C&&... obj)
  153. {
  154. this->_bind(mqtt::control_packet_type::pubrel, std::forward<F>(fun), std::forward<C>(obj)...);
  155. return (*this);
  156. }
  157. /**
  158. * @brief bind pubcomp listener
  159. * @param fun - a user defined callback function.
  160. * @li Function signature : server : void(std::shared_ptr<mqtt_session>& session_ptr,
  161. * mqtt::v4::pubcomp& msg) or v3 or v5
  162. * or : void(std::shared_ptr<mqtt_session>& session_ptr,
  163. * mqtt::message& msg)
  164. * client : void(mqtt::v4::pubcomp& msg) or v3 or v5
  165. * or : void(mqtt::message& msg)
  166. */
  167. template<class F, class ...C>
  168. inline self& on_pubcomp(F&& fun, C&&... obj)
  169. {
  170. this->_bind(mqtt::control_packet_type::pubcomp, std::forward<F>(fun), std::forward<C>(obj)...);
  171. return (*this);
  172. }
  173. /**
  174. * @brief bind subscribe listener
  175. * @param fun - a user defined callback function.
  176. * @li Function signature :
  177. * server : void(std::shared_ptr<mqtt_session>& session_ptr,
  178. * mqtt::v4::subscribe& msg, mqtt::v4::suback& rep) or v3 or v5
  179. * or : void(std::shared_ptr<mqtt_session>& session_ptr,
  180. * mqtt::message& msg, mqtt::message& rep)
  181. * client : Don't need
  182. */
  183. template<class F, class ...C>
  184. inline self& on_subscribe(F&& fun, C&&... obj)
  185. {
  186. this->_bind(mqtt::control_packet_type::subscribe, std::forward<F>(fun), std::forward<C>(obj)...);
  187. return (*this);
  188. }
  189. /**
  190. * @brief bind suback listener
  191. * @param fun - a user defined callback function.
  192. * @li Function signature : server : Don't need
  193. * client : void(mqtt::v4::suback& msg) or v3 or v5
  194. * or : void(mqtt::message& msg)
  195. */
  196. template<class F, class ...C>
  197. inline self& on_suback(F&& fun, C&&... obj)
  198. {
  199. this->_bind(mqtt::control_packet_type::suback, std::forward<F>(fun), std::forward<C>(obj)...);
  200. return (*this);
  201. }
  202. /**
  203. * @brief bind unsubscribe listener
  204. * @param fun - a user defined callback function.
  205. * @li Function signature :
  206. * server : void(std::shared_ptr<mqtt_session>& session_ptr,
  207. * mqtt::v4::unsubscribe& msg, mqtt::v4::unsuback& rep) or v3 or v5
  208. * server : void(std::shared_ptr<mqtt_session>& session_ptr,
  209. * mqtt::message& msg, mqtt::message& rep)
  210. * client : Don't need
  211. */
  212. template<class F, class ...C>
  213. inline self& on_unsubscribe(F&& fun, C&&... obj)
  214. {
  215. this->_bind(mqtt::control_packet_type::unsubscribe, std::forward<F>(fun), std::forward<C>(obj)...);
  216. return (*this);
  217. }
  218. /**
  219. * @brief bind unsuback listener
  220. * @param fun - a user defined callback function.
  221. * @li Function signature : server : Don't need
  222. * client : void(mqtt::v4::unsuback& msg) or v3 or v5
  223. * or : void(mqtt::message& msg)
  224. */
  225. template<class F, class ...C>
  226. inline self& on_unsuback(F&& fun, C&&... obj)
  227. {
  228. this->_bind(mqtt::control_packet_type::unsuback, std::forward<F>(fun), std::forward<C>(obj)...);
  229. return (*this);
  230. }
  231. /**
  232. * @brief bind pingreq listener
  233. * @param fun - a user defined callback function.
  234. * @li Function signature :
  235. * server : void(std::shared_ptr<mqtt_session>& session_ptr,
  236. * mqtt::v4::pingreq& msg, mqtt::v4::pingresp& rep) or v3 or v5
  237. * or : void(std::shared_ptr<mqtt_session>& session_ptr,
  238. * mqtt::message& msg, mqtt::message& rep)
  239. * client : Don't need
  240. */
  241. template<class F, class ...C>
  242. inline self& on_pingreq(F&& fun, C&&... obj)
  243. {
  244. this->_bind(mqtt::control_packet_type::pingreq, std::forward<F>(fun), std::forward<C>(obj)...);
  245. return (*this);
  246. }
  247. /**
  248. * @brief bind pingresp listener
  249. * @param fun - a user defined callback function.
  250. * @li Function signature : server : Don't need
  251. * client : void(mqtt::v4::pingresp& msg) or v3 or v5
  252. * or : void(mqtt::message& msg)
  253. */
  254. template<class F, class ...C>
  255. inline self& on_pingresp(F&& fun, C&&... obj)
  256. {
  257. this->_bind(mqtt::control_packet_type::pingresp, std::forward<F>(fun), std::forward<C>(obj)...);
  258. return (*this);
  259. }
  260. /**
  261. * @brief bind disconnect listener
  262. * @param fun - a user defined callback function.
  263. * @li Function signature :
  264. * server : void(std::shared_ptr<mqtt_session>& session_ptr, mqtt::v4::disconnect& msg) or v3 or v5
  265. * or : void(std::shared_ptr<mqtt_session>& session_ptr, mqtt::message& msg)
  266. * client : void(mqtt::v4::disconnect& msg) or v3 or v5
  267. * or : void(mqtt::message& msg)
  268. */
  269. template<class F, class ...C>
  270. inline self& on_disconnect(F&& fun, C&&... obj)
  271. {
  272. this->_bind(mqtt::control_packet_type::disconnect, std::forward<F>(fun), std::forward<C>(obj)...);
  273. return (*this);
  274. }
  275. /**
  276. * @brief bind auth listener
  277. * @param fun - a user defined callback function.
  278. * @li Function signature :
  279. * server : void(std::shared_ptr<mqtt_session>& session_ptr,
  280. * mqtt::v5::auth& msg, mqtt::v5::connack& rep)
  281. * or : void(std::shared_ptr<mqtt_session>& session_ptr,
  282. * mqtt::message& msg, mqtt::message& rep)
  283. * client : void(mqtt::v5::auth& msg, mqtt::v5::auth& rep)
  284. * or : void(mqtt::message& msg, mqtt::message& rep)
  285. */
  286. template<class F, class ...C>
  287. inline self& on_auth(F&& fun, C&&... obj)
  288. {
  289. this->_bind(mqtt::control_packet_type::auth, std::forward<F>(fun), std::forward<C>(obj)...);
  290. return (*this);
  291. }
  292. protected:
  293. template<class F>
  294. inline void _bind(mqtt::control_packet_type type, F f)
  295. {
  296. this->_do_bind(type, std::move(f), ((dummy*)nullptr));
  297. }
  298. template<class F, class C>
  299. inline void _bind(mqtt::control_packet_type type, F f, C& c)
  300. {
  301. this->_do_bind(type, std::move(f), std::addressof(c));
  302. }
  303. template<class F, class C>
  304. inline void _bind(mqtt::control_packet_type type, F f, C* c)
  305. {
  306. this->_do_bind(type, std::move(f), c);
  307. }
  308. template<class F, class C>
  309. inline void _do_bind(mqtt::control_packet_type type, F f, C* c)
  310. {
  311. asio2::unique_locker g(this->mqtt_invoker_mutex_);
  312. this->mqtt_handlers_[detail::to_underlying(type)] = std::make_shared<handler_type>(std::bind(
  313. &self::template _proxy<F, C>,
  314. this, std::move(f), c,
  315. std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
  316. }
  317. template<class F, class C>
  318. inline void _proxy(F& f, C* c, error_code& ec,
  319. std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, std::string_view& data)
  320. {
  321. using fun_traits_type = function_traits<F>;
  322. _argc_proxy<fun_traits_type::argc>(f, c, ec, caller_ptr, caller, data);
  323. }
  324. template<class F, class C, class M>
  325. inline void _do_argc_1_proxy(F& f, C* c,
  326. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, M& msg)
  327. {
  328. using fun_traits_type = function_traits<F>;
  329. using arg0_type = typename std::remove_cv_t<std::remove_reference_t<
  330. typename fun_traits_type::template args<0>::type>>;
  331. using message_type = arg0_type;
  332. if constexpr (std::is_same_v<message_type, mqtt::message>)
  333. {
  334. ec.clear();
  335. this->_do_client_no_response(f, c, ec, caller_ptr, caller, msg, msg);
  336. }
  337. else
  338. {
  339. message_type* pmsg = std::get_if<message_type>(std::addressof(msg.variant()));
  340. if (pmsg)
  341. {
  342. ec.clear();
  343. this->_do_client_no_response(f, c, ec, caller_ptr, caller, msg, *pmsg);
  344. }
  345. else
  346. {
  347. this->_do_no_match_callback(f, ec, caller_ptr, caller, pmsg);
  348. }
  349. }
  350. }
  351. // Argc == 1 : must be client, the callback signature : void (mqtt::xxx_message&)
  352. template<std::size_t Argc, class F, class C>
  353. typename std::enable_if_t<Argc == 1>
  354. inline _argc_proxy(F& f, C* c,
  355. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, std::string_view& data)
  356. {
  357. mqtt::data_to_message(caller->version(), data, [this, &f, &c, &ec, &caller_ptr, caller]
  358. (auto msg) mutable
  359. {
  360. if (msg.empty() || asio2::get_last_error())
  361. {
  362. this->_do_malformed_packet(f, ec ? ec : asio2::get_last_error(), caller_ptr, caller);
  363. return;
  364. }
  365. this->_do_argc_1_proxy(f, c, ec, caller_ptr, caller, msg);
  366. });
  367. }
  368. template<class F, class C, class M>
  369. inline void _do_argc_2_proxy_server(F& f, C* c,
  370. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, M& msg)
  371. {
  372. using fun_traits_type = function_traits<F>;
  373. using message_type = typename std::remove_cv_t<std::remove_reference_t<
  374. typename fun_traits_type::template args<1>::type>>;
  375. if constexpr (std::is_same_v<message_type, mqtt::message>)
  376. {
  377. ec.clear();
  378. this->_do_server_no_response(f, c, ec, caller_ptr, caller, msg, msg);
  379. }
  380. else
  381. {
  382. message_type* pmsg = std::get_if<message_type>(std::addressof(msg.variant()));
  383. if (pmsg)
  384. {
  385. ec.clear();
  386. this->_do_server_no_response(f, c, ec, caller_ptr, caller, msg, *pmsg);
  387. }
  388. else
  389. {
  390. this->_do_no_match_callback(f, ec, caller_ptr, caller, pmsg);
  391. }
  392. }
  393. }
  394. template<class F, class C>
  395. inline void _argc_2_proxy_server(F& f, C* c,
  396. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, std::string_view& data)
  397. {
  398. mqtt::data_to_message(caller->version(), data, [this, &f, &c, &ec, &caller_ptr, caller]
  399. (auto msg) mutable
  400. {
  401. if (msg.empty() || asio2::get_last_error())
  402. {
  403. this->_do_malformed_packet(f, ec ? ec : asio2::get_last_error(), caller_ptr, caller);
  404. return;
  405. }
  406. this->_do_argc_2_proxy_server(f, c, ec, caller_ptr, caller, msg);
  407. });
  408. }
  409. template<class F, class C, class M>
  410. inline void _do_argc_2_proxy_client(F& f, C* c,
  411. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, M& msg)
  412. {
  413. using fun_traits_type = function_traits<F>;
  414. using arg0_type = typename std::remove_cv_t<std::remove_reference_t<
  415. typename fun_traits_type::template args<0>::type>>;
  416. using message_type = arg0_type;
  417. using response_type = typename std::remove_cv_t<std::remove_reference_t<
  418. typename fun_traits_type::template args<1>::type>>;
  419. if constexpr (std::is_same_v<message_type, mqtt::message>)
  420. {
  421. ec.clear();
  422. this->_do_client_with_response(
  423. f, c, ec, caller_ptr, caller, msg, msg, response_type{});
  424. }
  425. else
  426. {
  427. message_type* pmsg = std::get_if<message_type>(std::addressof(msg.variant()));
  428. if (pmsg)
  429. {
  430. ec.clear();
  431. this->_do_client_with_response(
  432. f, c, ec, caller_ptr, caller, msg, *pmsg, response_type{});
  433. }
  434. else
  435. {
  436. this->_do_no_match_callback(f, ec, caller_ptr, caller, pmsg);
  437. }
  438. }
  439. }
  440. template<class F, class C>
  441. inline void _argc_2_proxy_client(F& f, C* c,
  442. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, std::string_view& data)
  443. {
  444. mqtt::data_to_message(caller->version(), data, [this, &f, &c, &ec, &caller_ptr, caller]
  445. (auto msg) mutable
  446. {
  447. if (msg.empty() || asio2::get_last_error())
  448. {
  449. this->_do_malformed_packet(f, ec ? ec : asio2::get_last_error(), caller_ptr, caller);
  450. return;
  451. }
  452. this->_do_argc_2_proxy_client(f, c, ec, caller_ptr, caller, msg);
  453. });
  454. }
  455. // Argc == 2 : client or server
  456. // if client, the callback signature : void (mqtt::xxx_message& message, mqtt::xxx_message& response)
  457. // if server, the callback signature : void (std::shared_ptr<xxx_session>& session, mqtt::xxx_message& message)
  458. template<std::size_t Argc, class F, class C>
  459. typename std::enable_if_t<Argc == 2>
  460. inline _argc_proxy(F& f, C* c,
  461. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, std::string_view& data)
  462. {
  463. using fun_traits_type = function_traits<F>;
  464. using arg0_type = typename std::remove_cv_t<std::remove_reference_t<
  465. typename fun_traits_type::template args<0>::type>>;
  466. // must be server
  467. if constexpr (std::is_same_v<std::shared_ptr<caller_t>, arg0_type>)
  468. {
  469. this->_argc_2_proxy_server(f, c, ec, caller_ptr, caller, data);
  470. }
  471. // must be client
  472. else
  473. {
  474. this->_argc_2_proxy_client(f, c, ec, caller_ptr, caller, data);
  475. }
  476. }
  477. template<class F, class C, class M>
  478. inline void _do_argc_3_proxy(F& f, C* c,
  479. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, M& msg)
  480. {
  481. using fun_traits_type = function_traits<F>;
  482. using arg0_type = typename std::remove_cv_t<std::remove_reference_t<
  483. typename fun_traits_type::template args<0>::type>>;
  484. static_assert(std::is_same_v<std::shared_ptr<caller_t>, arg0_type>);
  485. using message_type = typename std::remove_cv_t<std::remove_reference_t<
  486. typename fun_traits_type::template args<1>::type>>;
  487. using response_type = typename std::remove_cv_t<std::remove_reference_t<
  488. typename fun_traits_type::template args<2>::type>>;
  489. if constexpr (std::is_same_v<message_type, mqtt::message>)
  490. {
  491. ec.clear();
  492. this->_do_server_with_response(
  493. f, c, ec, caller_ptr, caller, msg, msg, response_type{});
  494. }
  495. else
  496. {
  497. message_type* pmsg = std::get_if<message_type>(std::addressof(msg.variant()));
  498. if (pmsg)
  499. {
  500. ec.clear();
  501. this->_do_server_with_response(
  502. f, c, ec, caller_ptr, caller, msg, *pmsg, response_type{});
  503. }
  504. else
  505. {
  506. this->_do_no_match_callback(f, ec, caller_ptr, caller, pmsg);
  507. }
  508. }
  509. }
  510. // Argc == 3 : must be server, the callback signature :
  511. // void (std::shared_ptr<xxx_session>&, mqtt::xxx_message& message, mqtt::xxx_message& response)
  512. template<std::size_t Argc, class F, class C>
  513. typename std::enable_if_t<Argc == 3>
  514. inline _argc_proxy(F& f, C* c,
  515. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, std::string_view& data)
  516. {
  517. mqtt::data_to_message(caller->version(), data, [this, &f, &c, &ec, &caller_ptr, caller]
  518. (auto msg) mutable
  519. {
  520. if (msg.empty() || asio2::get_last_error())
  521. {
  522. this->_do_malformed_packet(f, ec ? ec : asio2::get_last_error(), caller_ptr, caller);
  523. return;
  524. }
  525. this->_do_argc_3_proxy(f, c, ec, caller_ptr, caller, msg);
  526. });
  527. }
  528. template<class F>
  529. inline void _do_malformed_packet(F& f,
  530. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller)
  531. {
  532. detail::ignore_unused(f);
  533. if (!ec)
  534. ec = mqtt::make_error_code(mqtt::error::malformed_packet);
  535. this->_handle_mqtt_error(ec, caller_ptr, caller);
  536. }
  537. template<class F, class M>
  538. inline void _do_no_match_callback(F& f,
  539. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, M* pmsg)
  540. {
  541. detail::ignore_unused(f, pmsg);
  542. #if !defined(ASIO2_HEADER_ONLY) && __has_include(<boost/core/type_name.hpp>)
  543. ASIO2_LOG_INFOR("The user callback function signature do not match : {}({} ...)"
  544. , boost::core::type_name<detail::remove_cvref_t<F>>()
  545. , boost::core::type_name<detail::remove_cvref_t<M>>()
  546. );
  547. #else
  548. ASIO2_LOG_INFOR("The user callback function signature do not match : {}({} ...)"
  549. , bho::core::type_name<detail::remove_cvref_t<F>>()
  550. , bho::core::type_name<detail::remove_cvref_t<M>>()
  551. );
  552. #endif
  553. //ASIO2_ASSERT(false &&
  554. // "The parameters of the user callback function do not match."
  555. // " Check that the parameters of your callback function are of the correct type");
  556. if (!ec)
  557. ec = mqtt::make_error_code(mqtt::error::malformed_packet);
  558. this->_handle_mqtt_error(ec, caller_ptr, caller);
  559. }
  560. template<typename F, typename C, class Message>
  561. inline void _do_client_no_response(F& f, C* c,
  562. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  563. Message& msg)
  564. {
  565. caller->_before_user_callback(ec, caller_ptr, caller, om, msg);
  566. if (ec)
  567. return this->_handle_mqtt_error(ec, caller_ptr, caller);
  568. this->_invoke_user_callback(f, c, msg);
  569. caller->_match_router(om);
  570. caller->_after_user_callback(ec, caller_ptr, caller, om, msg);
  571. this->_handle_mqtt_error(ec, caller_ptr, caller);
  572. }
  573. template<typename F, typename C, class Message>
  574. inline void _do_server_no_response(F& f, C* c,
  575. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  576. Message& msg)
  577. {
  578. caller->_before_user_callback(ec, caller_ptr, caller, om, msg);
  579. if (ec)
  580. return this->_handle_mqtt_error(ec, caller_ptr, caller);
  581. this->_invoke_user_callback(f, c, caller_ptr, msg);
  582. caller->_after_user_callback(ec, caller_ptr, caller, om, msg);
  583. this->_handle_mqtt_error(ec, caller_ptr, caller);
  584. }
  585. template<typename F, typename C, class Message, class Response>
  586. inline void _do_client_with_response(F& f, C* c,
  587. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  588. Message& msg, Response rep)
  589. {
  590. this->_init_response(ec, caller_ptr, caller, msg, rep);
  591. caller->_before_user_callback(ec, caller_ptr, caller, om, msg, rep);
  592. if (ec)
  593. return this->_handle_mqtt_error(ec, caller_ptr, caller);
  594. this->_invoke_user_callback(f, c, msg, rep);
  595. caller->_after_user_callback(ec, caller_ptr, caller, om, msg, rep);
  596. this->_send_mqtt_response(ec, caller_ptr, caller, msg, std::move(rep));
  597. this->_handle_mqtt_error(ec, caller_ptr, caller);
  598. }
  599. template<typename F, typename C, class Message, class Response>
  600. inline void _do_server_with_response(F& f, C* c,
  601. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,
  602. Message& msg, Response rep)
  603. {
  604. this->_init_response(ec, caller_ptr, caller, msg, rep);
  605. caller->_before_user_callback(ec, caller_ptr, caller, om, msg, rep);
  606. if (ec)
  607. return this->_handle_mqtt_error(ec, caller_ptr, caller);
  608. this->_invoke_user_callback(f, c, caller_ptr, msg, rep);
  609. caller->_after_user_callback(ec, caller_ptr, caller, om, msg, rep);
  610. this->_send_mqtt_response(ec, caller_ptr, caller, msg, std::move(rep));
  611. this->_handle_mqtt_error(ec, caller_ptr, caller);
  612. }
  613. template<typename F, typename C, typename... Args>
  614. inline void _invoke_user_callback(F& f, C* c, Args&&... args)
  615. {
  616. detail::ignore_unused(c);
  617. if constexpr (std::is_same_v<detail::remove_cvref_t<C>, dummy>)
  618. f(std::forward<Args>(args)...);
  619. else
  620. (c->*f)(std::forward<Args>(args)...);
  621. }
  622. template<class Message, class Response>
  623. typename std::enable_if_t<std::is_same_v<typename detail::remove_cvref_t<Message>, mqtt::message>>
  624. inline _init_response(error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller,
  625. Message& msg, Response& rep)
  626. {
  627. detail::ignore_unused(ec, caller_ptr, caller, msg, rep);
  628. using message_type [[maybe_unused]] = typename detail::remove_cvref_t<Message>;
  629. using response_type [[maybe_unused]] = typename detail::remove_cvref_t<Response>;
  630. if constexpr (std::is_same_v<response_type, mqtt::message>)
  631. {
  632. if constexpr (std::is_same_v<message_type, mqtt::message>)
  633. {
  634. std::visit([this, &ec, &caller_ptr, &caller, &rep](auto& pm) mutable
  635. {
  636. this->_init_response(ec, caller_ptr, caller, pm, rep);
  637. }, msg.variant());
  638. }
  639. else
  640. {
  641. std::ignore = true;
  642. }
  643. }
  644. else
  645. {
  646. std::ignore = true;
  647. }
  648. }
  649. template<class Message, class Response>
  650. inline void _init_connect_response(
  651. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, Message& msg, Response& rep)
  652. {
  653. detail::ignore_unused(ec, caller_ptr, caller, msg, rep);
  654. mqtt::version ver = caller->version();
  655. if /**/ (ver == mqtt::version::v3)
  656. {
  657. rep = mqtt::v3::connack{};
  658. }
  659. else if (ver == mqtt::version::v4)
  660. {
  661. rep = mqtt::v4::connack{};
  662. }
  663. else if (ver == mqtt::version::v5)
  664. {
  665. rep = mqtt::v5::connack{};
  666. }
  667. }
  668. template<class Message, class Response>
  669. inline void _init_publish_response(
  670. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, Message& msg, Response& rep)
  671. {
  672. detail::ignore_unused(ec, caller_ptr, caller, msg, rep);
  673. mqtt::version ver = caller->version();
  674. if /**/ (ver == mqtt::version::v3)
  675. {
  676. switch (msg.qos())
  677. {
  678. // the qos 0 publish messgae don't need response, here just a placeholder,
  679. // if has't set the rep to a msg, the _before_user_callback_impl can't be
  680. // called correctly.
  681. case mqtt::qos_type::at_most_once : rep = mqtt::v3::puback{}; break;
  682. case mqtt::qos_type::at_least_once: rep = mqtt::v3::puback{}; break;
  683. case mqtt::qos_type::exactly_once : rep = mqtt::v3::pubrec{}; break;
  684. default:break;
  685. }
  686. }
  687. else if (ver == mqtt::version::v4)
  688. {
  689. switch (msg.qos())
  690. {
  691. // the qos 0 publish messgae don't need response, here just a placeholder,
  692. // if has't set the rep to a msg, the _before_user_callback_impl can't be
  693. // called correctly.
  694. case mqtt::qos_type::at_most_once : rep = mqtt::v4::puback{}; break;
  695. case mqtt::qos_type::at_least_once: rep = mqtt::v4::puback{}; break;
  696. case mqtt::qos_type::exactly_once : rep = mqtt::v4::pubrec{}; break;
  697. default:break;
  698. }
  699. }
  700. else if (ver == mqtt::version::v5)
  701. {
  702. switch (msg.qos())
  703. {
  704. // the qos 0 publish messgae don't need response, here just a placeholder,
  705. // if has't set the rep to a msg, the _before_user_callback_impl can't be
  706. // called correctly.
  707. case mqtt::qos_type::at_most_once : rep = mqtt::v5::puback{}; break;
  708. case mqtt::qos_type::at_least_once: rep = mqtt::v5::puback{}; break;
  709. case mqtt::qos_type::exactly_once : rep = mqtt::v5::pubrec{}; break;
  710. default:break;
  711. }
  712. }
  713. }
  714. template<class Message, class Response>
  715. inline void _init_pubrec_response(
  716. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, Message& msg, Response& rep)
  717. {
  718. detail::ignore_unused(ec, caller_ptr, caller, msg, rep);
  719. mqtt::version ver = caller->version();
  720. if /**/ (ver == mqtt::version::v3)
  721. {
  722. rep = mqtt::v3::pubrel{};
  723. }
  724. else if (ver == mqtt::version::v4)
  725. {
  726. rep = mqtt::v4::pubrel{};
  727. }
  728. else if (ver == mqtt::version::v5)
  729. {
  730. rep = mqtt::v5::pubrel{};
  731. }
  732. }
  733. template<class Message, class Response>
  734. inline void _init_pubrel_response(
  735. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, Message& msg, Response& rep)
  736. {
  737. detail::ignore_unused(ec, caller_ptr, caller, msg, rep);
  738. mqtt::version ver = caller->version();
  739. if /**/ (ver == mqtt::version::v3)
  740. {
  741. rep = mqtt::v3::pubcomp{};
  742. }
  743. else if (ver == mqtt::version::v4)
  744. {
  745. rep = mqtt::v4::pubcomp{};
  746. }
  747. else if (ver == mqtt::version::v5)
  748. {
  749. rep = mqtt::v5::pubcomp{};
  750. }
  751. }
  752. template<class Message, class Response>
  753. inline void _init_subscribe_response(
  754. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, Message& msg, Response& rep)
  755. {
  756. detail::ignore_unused(ec, caller_ptr, caller, msg, rep);
  757. mqtt::version ver = caller->version();
  758. if /**/ (ver == mqtt::version::v3)
  759. {
  760. rep = mqtt::v3::suback{};
  761. }
  762. else if (ver == mqtt::version::v4)
  763. {
  764. rep = mqtt::v4::suback{};
  765. }
  766. else if (ver == mqtt::version::v5)
  767. {
  768. rep = mqtt::v5::suback{};
  769. }
  770. }
  771. template<class Message, class Response>
  772. inline void _init_unsubscribe_response(
  773. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, Message& msg, Response& rep)
  774. {
  775. detail::ignore_unused(ec, caller_ptr, caller, msg, rep);
  776. mqtt::version ver = caller->version();
  777. if /**/ (ver == mqtt::version::v3)
  778. {
  779. rep = mqtt::v3::unsuback{};
  780. }
  781. else if (ver == mqtt::version::v4)
  782. {
  783. rep = mqtt::v4::unsuback{};
  784. }
  785. else if (ver == mqtt::version::v5)
  786. {
  787. rep = mqtt::v5::unsuback{};
  788. }
  789. }
  790. template<class Message, class Response>
  791. inline void _init_pingreq_response(
  792. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, Message& msg, Response& rep)
  793. {
  794. detail::ignore_unused(ec, caller_ptr, caller, msg, rep);
  795. mqtt::version ver = caller->version();
  796. if /**/ (ver == mqtt::version::v3)
  797. {
  798. rep = mqtt::v3::pingresp{};
  799. }
  800. else if (ver == mqtt::version::v4)
  801. {
  802. rep = mqtt::v4::pingresp{};
  803. }
  804. else if (ver == mqtt::version::v5)
  805. {
  806. rep = mqtt::v5::pingresp{};
  807. }
  808. }
  809. template<class Message, class Response>
  810. inline void _init_auth_response(
  811. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, Message& msg, Response& rep)
  812. {
  813. detail::ignore_unused(ec, caller_ptr, caller, msg, rep);
  814. mqtt::version ver = caller->version();
  815. if /**/ (ver == mqtt::version::v3)
  816. {
  817. rep = mqtt::v3::connack{};
  818. }
  819. else if (ver == mqtt::version::v4)
  820. {
  821. rep = mqtt::v4::connack{};
  822. }
  823. else if (ver == mqtt::version::v5)
  824. {
  825. rep = mqtt::v5::auth{};
  826. }
  827. }
  828. template<class Message, class Response>
  829. typename std::enable_if_t<mqtt::is_rawmsg<typename detail::remove_cvref_t<Message>>()>
  830. inline _init_response(error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller,
  831. Message& msg, Response& rep)
  832. {
  833. detail::ignore_unused(ec, caller_ptr, caller, msg, rep);
  834. using message_type [[maybe_unused]] = typename detail::remove_cvref_t<Message>;
  835. using response_type [[maybe_unused]] = typename detail::remove_cvref_t<Response>;
  836. if constexpr (std::is_same_v<response_type, mqtt::message>)
  837. {
  838. if /**/ constexpr (
  839. std::is_same_v<message_type, mqtt::v3::connect> ||
  840. std::is_same_v<message_type, mqtt::v4::connect> ||
  841. std::is_same_v<message_type, mqtt::v5::connect>)
  842. {
  843. this->_init_connect_response(ec, caller_ptr, caller, msg, rep);
  844. }
  845. else if constexpr (
  846. std::is_same_v<message_type, mqtt::v3::publish> ||
  847. std::is_same_v<message_type, mqtt::v4::publish> ||
  848. std::is_same_v<message_type, mqtt::v5::publish>)
  849. {
  850. this->_init_publish_response(ec, caller_ptr, caller, msg, rep);
  851. }
  852. else if constexpr (
  853. std::is_same_v<message_type, mqtt::v3::pubrec> ||
  854. std::is_same_v<message_type, mqtt::v4::pubrec> ||
  855. std::is_same_v<message_type, mqtt::v5::pubrec>)
  856. {
  857. this->_init_pubrec_response(ec, caller_ptr, caller, msg, rep);
  858. }
  859. else if constexpr (
  860. std::is_same_v<message_type, mqtt::v3::pubrel> ||
  861. std::is_same_v<message_type, mqtt::v4::pubrel> ||
  862. std::is_same_v<message_type, mqtt::v5::pubrel>)
  863. {
  864. this->_init_pubrel_response(ec, caller_ptr, caller, msg, rep);
  865. }
  866. else if constexpr (
  867. std::is_same_v<message_type, mqtt::v3::subscribe> ||
  868. std::is_same_v<message_type, mqtt::v4::subscribe> ||
  869. std::is_same_v<message_type, mqtt::v5::subscribe>)
  870. {
  871. this->_init_subscribe_response(ec, caller_ptr, caller, msg, rep);
  872. }
  873. else if constexpr (
  874. std::is_same_v<message_type, mqtt::v3::unsubscribe> ||
  875. std::is_same_v<message_type, mqtt::v4::unsubscribe> ||
  876. std::is_same_v<message_type, mqtt::v5::unsubscribe>)
  877. {
  878. this->_init_unsubscribe_response(ec, caller_ptr, caller, msg, rep);
  879. }
  880. else if constexpr (
  881. std::is_same_v<message_type, mqtt::v3::pingreq> ||
  882. std::is_same_v<message_type, mqtt::v4::pingreq> ||
  883. std::is_same_v<message_type, mqtt::v5::pingreq>)
  884. {
  885. this->_init_pingreq_response(ec, caller_ptr, caller, msg, rep);
  886. }
  887. else if constexpr (
  888. std::is_same_v<message_type, mqtt::v5::auth>)
  889. {
  890. this->_init_auth_response(ec, caller_ptr, caller, msg, rep);
  891. }
  892. else
  893. {
  894. std::ignore = true;
  895. }
  896. }
  897. else
  898. {
  899. std::ignore = true;
  900. }
  901. }
  902. template<class Message, class Response>
  903. inline void _send_mqtt_message_response(
  904. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller,
  905. Message& msg, Response&& rep)
  906. {
  907. detail::ignore_unused(ec, caller_ptr, caller, msg, rep);
  908. using message_type [[maybe_unused]] = typename detail::remove_cvref_t<Message>;
  909. using response_type [[maybe_unused]] = typename detail::remove_cvref_t<Response>;
  910. if (rep.empty())
  911. return;
  912. bool sendflag = true;
  913. std::visit([&sendflag](auto& rep) mutable { sendflag = rep.get_send_flag(); }, rep.variant());
  914. if (sendflag == false)
  915. return;
  916. // can't use async_send, beacuse the caller maybe not started yet
  917. caller->push_event([caller_ptr, caller, id = caller->life_id(), rep = std::forward<Response>(rep)]
  918. (event_queue_guard<caller_t> g) mutable
  919. {
  920. detail::ignore_unused(caller_ptr);
  921. if (id != caller->life_id())
  922. {
  923. set_last_error(asio::error::operation_aborted);
  924. return;
  925. }
  926. std::visit([caller, g = std::move(g)](auto& pr) mutable
  927. {
  928. #if !defined(ASIO2_HEADER_ONLY) && __has_include(<boost/core/type_name.hpp>)
  929. ASIO2_LOG_DEBUG("mqtt send {}", boost::core::type_name<decltype(pr)>());
  930. #else
  931. ASIO2_LOG_DEBUG("mqtt send {}", bho::core::type_name<decltype(pr)>());
  932. #endif
  933. caller->_do_send(pr, [g = std::move(g)](const error_code&, std::size_t) mutable {});
  934. }, rep.variant());
  935. });
  936. }
  937. template<class Message, class Response>
  938. inline void _send_mqtt_packet_response(
  939. error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller,
  940. Message& msg, Response&& rep)
  941. {
  942. detail::ignore_unused(ec, caller_ptr, caller, msg, rep);
  943. using message_type [[maybe_unused]] = typename detail::remove_cvref_t<Message>;
  944. using response_type [[maybe_unused]] = typename detail::remove_cvref_t<Response>;
  945. if (rep.get_send_flag() == false)
  946. return;
  947. // can't use async_send, beacuse the caller maybe not started yet
  948. caller->push_event([caller_ptr, caller, id = caller->life_id(), rep = std::forward<Response>(rep)]
  949. (event_queue_guard<caller_t> g) mutable
  950. {
  951. detail::ignore_unused(caller_ptr);
  952. if (id != caller->life_id())
  953. {
  954. set_last_error(asio::error::operation_aborted);
  955. return;
  956. }
  957. #if !defined(ASIO2_HEADER_ONLY) && __has_include(<boost/core/type_name.hpp>)
  958. ASIO2_LOG_DEBUG("mqtt send {}", boost::core::type_name<decltype(rep)>());
  959. #else
  960. ASIO2_LOG_DEBUG("mqtt send {}", bho::core::type_name<decltype(rep)>());
  961. #endif
  962. caller->_do_send(rep, [g = std::move(g)](const error_code&, std::size_t) mutable {});
  963. });
  964. }
  965. template<class Message, class Response>
  966. inline void _send_mqtt_response(error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller,
  967. Message& msg, Response&& rep)
  968. {
  969. using message_type [[maybe_unused]] = typename detail::remove_cvref_t<Message>;
  970. using response_type [[maybe_unused]] = typename detail::remove_cvref_t<Response>;
  971. if constexpr (std::is_same_v<response_type, mqtt::message>)
  972. {
  973. this->_send_mqtt_message_response(ec, caller_ptr, caller, msg, std::forward<Response>(rep));
  974. }
  975. else
  976. {
  977. this->_send_mqtt_packet_response(ec, caller_ptr, caller, msg, std::forward<Response>(rep));
  978. }
  979. }
  980. inline void _call_mqtt_handler(mqtt::control_packet_type type, error_code& ec,
  981. std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, std::string_view& data)
  982. {
  983. ASIO2_ASSERT(caller->io_->running_in_this_thread());
  984. std::shared_ptr<handler_type> p;
  985. {
  986. asio2::shared_locker g(this->mqtt_invoker_mutex_);
  987. if (detail::to_underlying(type) < this->mqtt_handlers_.size())
  988. {
  989. p = this->mqtt_handlers_[detail::to_underlying(type)];
  990. }
  991. }
  992. if (p && (*p))
  993. {
  994. (*p)(ec, caller_ptr, caller, data);
  995. }
  996. else
  997. {
  998. // Should't run to here.
  999. ASIO2_ASSERT(false);
  1000. ec = mqtt::make_error_code(mqtt::error::malformed_packet);
  1001. this->_handle_mqtt_error(ec, caller_ptr, caller);
  1002. return;
  1003. }
  1004. }
  1005. template<class CallerT = caller_t>
  1006. inline void _handle_mqtt_error(error_code& ec, std::shared_ptr<CallerT>& caller_ptr, CallerT* caller)
  1007. {
  1008. if constexpr (CallerT::is_client())
  1009. {
  1010. return;
  1011. }
  1012. else
  1013. {
  1014. if (!ec)
  1015. return;
  1016. // post a async event to disconnect, don't call _do_disconnect directly,
  1017. // otherwise the client's bind_disconnect callback maybe can't be called.
  1018. asio::post(caller->io_->context(), make_allocator(caller->wallocator(),
  1019. [ec, caller_ptr, caller]() mutable
  1020. {
  1021. if (caller->state_ == state_t::started)
  1022. {
  1023. caller->_do_disconnect(ec, std::move(caller_ptr));
  1024. }
  1025. }));
  1026. }
  1027. }
  1028. inline std::shared_ptr<handler_type> _find_mqtt_handler(mqtt::control_packet_type type)
  1029. {
  1030. asio2::shared_locker g(this->mqtt_invoker_mutex_);
  1031. if (detail::to_underlying(type) < this->mqtt_handlers_.size())
  1032. {
  1033. std::shared_ptr<handler_type> p = mqtt_handlers_[detail::to_underlying(type)];
  1034. if (p && (*p))
  1035. return p;
  1036. }
  1037. return nullptr;
  1038. }
  1039. protected:
  1040. /// use rwlock to make thread safe
  1041. mutable asio2::shared_mutexer mqtt_invoker_mutex_;
  1042. // magic_enum has bug: maybe return 0 under wsl ubuntu
  1043. std::array<std::shared_ptr<handler_type>, detail::to_underlying(mqtt::control_packet_type::auth) + 1>
  1044. mqtt_handlers_ ASIO2_GUARDED_BY(mqtt_invoker_mutex_);
  1045. };
  1046. }
  1047. #endif // !__ASIO2_MQTT_INVOKER_HPP__