mqtt_client.hpp 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. */
  10. #ifndef __ASIO2_MQTT_CLIENT_HPP__
  11. #define __ASIO2_MQTT_CLIENT_HPP__
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. #pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <asio2/base/detail/push_options.hpp>
  16. #include <asio2/base/detail/shared_mutex.hpp>
  17. #include <asio2/tcp/tcp_client.hpp>
  18. #include <asio2/mqtt/impl/mqtt_send_connect_op.hpp>
  19. #include <asio2/mqtt/impl/mqtt_send_op.hpp>
  20. #include <asio2/mqtt/detail/mqtt_handler.hpp>
  21. #include <asio2/mqtt/detail/mqtt_invoker.hpp>
  22. #include <asio2/mqtt/detail/mqtt_topic_alias.hpp>
  23. #include <asio2/mqtt/detail/mqtt_session_state.hpp>
  24. #include <asio2/mqtt/detail/mqtt_message_router.hpp>
  25. #include <asio2/mqtt/detail/mqtt_subscribe_router.hpp>
  26. #include <asio2/mqtt/options.hpp>
  27. #include <asio2/util/uuid.hpp>
  28. namespace asio2::detail
  29. {
  30. struct template_args_mqtt_client : public template_args_tcp_client
  31. {
  32. static constexpr bool rdc_call_cp_enabled = false;
  33. template<class caller_t>
  34. struct subnode
  35. {
  36. explicit subnode(
  37. std::weak_ptr<caller_t> c,
  38. mqtt::subscription s,
  39. mqtt::v5::properties_set p = mqtt::v5::properties_set{}
  40. )
  41. : caller(std::move(c))
  42. , sub (std::move(s))
  43. , props (std::move(p))
  44. {
  45. }
  46. inline std::string_view share_name () { return sub.share_name (); }
  47. inline std::string_view topic_filter() { return sub.topic_filter(); }
  48. //
  49. std::weak_ptr<caller_t> caller;
  50. // subscription info
  51. mqtt::subscription sub;
  52. // subscription properties
  53. mqtt::v5::properties_set props;
  54. detail::function<void(mqtt::message&)> callback;
  55. };
  56. };
  57. ASIO2_CLASS_FORWARD_DECLARE_BASE;
  58. ASIO2_CLASS_FORWARD_DECLARE_TCP_BASE;
  59. ASIO2_CLASS_FORWARD_DECLARE_TCP_CLIENT;
  60. template<class derived_t, class args_t = template_args_mqtt_client>
  61. class mqtt_client_impl_t
  62. : public tcp_client_impl_t <derived_t, args_t>
  63. , public mqtt_options
  64. , public mqtt_handler_t <derived_t, args_t>
  65. , public mqtt_invoker_t <derived_t, args_t>
  66. , public mqtt_message_router_t <derived_t, args_t>
  67. , public mqtt_subscribe_router_t<derived_t, args_t>
  68. , public mqtt_topic_alias_t <derived_t, args_t>
  69. , public mqtt_send_op <derived_t, args_t>
  70. , public mqtt::session_state
  71. {
  72. ASIO2_CLASS_FRIEND_DECLARE_BASE;
  73. ASIO2_CLASS_FRIEND_DECLARE_TCP_BASE;
  74. ASIO2_CLASS_FRIEND_DECLARE_TCP_CLIENT;
  75. public:
  76. using super = tcp_client_impl_t <derived_t, args_t>;
  77. using self = mqtt_client_impl_t<derived_t, args_t>;
  78. using args_type = args_t;
  79. using subnode_type = typename args_type::template subnode<derived_t>;
  80. using super::send;
  81. using super::async_send;
  82. public:
  83. /**
  84. * @brief constructor
  85. */
  86. explicit mqtt_client_impl_t(
  87. std::size_t init_buf_size = tcp_frame_size,
  88. std::size_t max_buf_size = mqtt::max_payload,
  89. std::size_t concurrency = 1
  90. )
  91. : super(init_buf_size, max_buf_size, concurrency)
  92. , mqtt_options ()
  93. , mqtt_handler_t <derived_t, args_t>()
  94. , mqtt_invoker_t <derived_t, args_t>()
  95. , mqtt_message_router_t <derived_t, args_t>()
  96. , mqtt_subscribe_router_t<derived_t, args_t>()
  97. , mqtt_topic_alias_t <derived_t, args_t>()
  98. , mqtt_send_op <derived_t, args_t>()
  99. , pingreq_timer_(this->io_->context())
  100. {
  101. }
  102. template<class Scheduler, std::enable_if_t<!std::is_integral_v<detail::remove_cvref_t<Scheduler>>, int> = 0>
  103. explicit mqtt_client_impl_t(
  104. std::size_t init_buf_size,
  105. std::size_t max_buf_size,
  106. Scheduler&& scheduler
  107. )
  108. : super(init_buf_size, max_buf_size, std::forward<Scheduler>(scheduler))
  109. , mqtt_options ()
  110. , mqtt_handler_t <derived_t, args_t>()
  111. , mqtt_invoker_t <derived_t, args_t>()
  112. , mqtt_message_router_t <derived_t, args_t>()
  113. , mqtt_subscribe_router_t<derived_t, args_t>()
  114. , mqtt_topic_alias_t <derived_t, args_t>()
  115. , mqtt_send_op <derived_t, args_t>()
  116. , pingreq_timer_(this->io_->context())
  117. {
  118. }
  119. template<class Scheduler, std::enable_if_t<!std::is_integral_v<detail::remove_cvref_t<Scheduler>>, int> = 0>
  120. explicit mqtt_client_impl_t(Scheduler&& scheduler)
  121. : mqtt_client_impl_t(tcp_frame_size, mqtt::max_payload, std::forward<Scheduler>(scheduler))
  122. {
  123. }
  124. /**
  125. * @brief destructor
  126. */
  127. ~mqtt_client_impl_t()
  128. {
  129. this->stop();
  130. }
  131. /**
  132. * @brief start the client, blocking connect to server
  133. * @param host - A string identifying a location. May be a descriptive name or
  134. * a numeric address string.
  135. * @param port - A string identifying the requested service. This may be a
  136. * descriptive name or a numeric string corresponding to a port number.
  137. */
  138. template<typename String, typename StrOrInt, typename... Args>
  139. inline bool start(String&& host, StrOrInt&& port, Args&&... args)
  140. {
  141. if constexpr (sizeof...(Args) > std::size_t(0))
  142. return this->derived().template _do_connect_with_connect_message<false>(
  143. std::forward<String>(host), std::forward<StrOrInt>(port),
  144. std::forward<Args>(args)...);
  145. else
  146. return this->derived().template _do_connect<false>(
  147. std::forward<String>(host), std::forward<StrOrInt>(port),
  148. ecs_helper::make_ecs(asio::transfer_at_least(1),
  149. mqtt::mqtt_match_role, std::forward<Args>(args)...));
  150. }
  151. /**
  152. * @brief start the client, asynchronous connect to server
  153. * @param host - A string identifying a location. May be a descriptive name or
  154. * a numeric address string.
  155. * @param port - A string identifying the requested service. This may be a
  156. * descriptive name or a numeric string corresponding to a port number.
  157. */
  158. template<typename String, typename StrOrInt, typename... Args>
  159. inline bool async_start(String&& host, StrOrInt&& port, Args&&... args)
  160. {
  161. if constexpr (sizeof...(Args) > std::size_t(0))
  162. return this->derived().template _do_connect_with_connect_message<true>(
  163. std::forward<String>(host), std::forward<StrOrInt>(port),
  164. std::forward<Args>(args)...);
  165. else
  166. return this->derived().template _do_connect<true>(
  167. std::forward<String>(host), std::forward<StrOrInt>(port),
  168. ecs_helper::make_ecs(asio::transfer_at_least(1),
  169. mqtt::mqtt_match_role, std::forward<Args>(args)...));
  170. }
  171. public:
  172. /**
  173. * @brief get the mqtt version number
  174. */
  175. inline mqtt::version version() const
  176. {
  177. return this->get_version();
  178. }
  179. /**
  180. * @brief get the mqtt version number
  181. */
  182. inline mqtt::version get_version() const
  183. {
  184. if /**/ (std::holds_alternative<mqtt::v3::connect>(connect_message_.base()))
  185. {
  186. return mqtt::version::v3;
  187. }
  188. else if (std::holds_alternative<mqtt::v4::connect>(connect_message_.base()))
  189. {
  190. return mqtt::version::v4;
  191. }
  192. else if (std::holds_alternative<mqtt::v5::connect>(connect_message_.base()))
  193. {
  194. return mqtt::version::v5;
  195. }
  196. ASIO2_ASSERT(false);
  197. return static_cast<mqtt::version>(0);
  198. }
  199. /**
  200. * @brief get the mqtt client identifier
  201. */
  202. inline std::string_view client_id() const
  203. {
  204. return this->get_client_id();
  205. }
  206. /**
  207. * @brief get the mqtt client identifier
  208. */
  209. inline std::string_view get_client_id() const
  210. {
  211. std::string_view v{};
  212. if (!this->connect_message_.empty())
  213. {
  214. if /**/ (std::holds_alternative<mqtt::v3::connect>(connect_message_.base()))
  215. {
  216. v = connect_message_.template get<mqtt::v3::connect>().client_id();
  217. }
  218. else if (std::holds_alternative<mqtt::v4::connect>(connect_message_.base()))
  219. {
  220. v = connect_message_.template get<mqtt::v4::connect>().client_id();
  221. }
  222. else if (std::holds_alternative<mqtt::v5::connect>(connect_message_.base()))
  223. {
  224. v = connect_message_.template get<mqtt::v5::connect>().client_id();
  225. }
  226. }
  227. if (v.empty())
  228. {
  229. if (const mqtt::v5::connack* m = std::get_if<mqtt::v5::connack>(std::addressof(connack_message_.base())))
  230. {
  231. const mqtt::v5::assigned_client_identifier* p =
  232. m->properties().get_if<mqtt::v5::assigned_client_identifier>();
  233. if (p)
  234. v = p->value();
  235. }
  236. }
  237. return v;
  238. }
  239. /**
  240. * @brief get the mqtt Keep Alive which is a time interval measured in seconds.
  241. */
  242. inline std::uint16_t keep_alive_time() const
  243. {
  244. return this->get_keep_alive_time();
  245. }
  246. /**
  247. * @brief get the mqtt Keep Alive which is a time interval measured in seconds.
  248. */
  249. inline std::uint16_t get_keep_alive_time() const
  250. {
  251. //The Keep Alive is a Two Byte Integer which is a time interval measured in seconds.
  252. // It is the maximum time interval that is permitted to elapse between the point at
  253. // which the Client finishes transmitting one MQTT Control Packet and the point it
  254. // starts sending the next. It is the responsibility of the Client to ensure that
  255. // the interval between MQTT Control Packets being sent does not exceed the Keep
  256. // Alive value. If Keep Alive is non-zero and in the absence of sending any other
  257. // MQTT Control Packets, the Client MUST send a PINGREQ packet [MQTT-3.1.2-20].
  258. // If the Server returns a Server Keep Alive on the CONNACK packet, the Client MUST
  259. // use that value instead of the value it sent as the Keep Alive [MQTT-3.1.2-21].
  260. if (const mqtt::v5::connack* m = std::get_if<mqtt::v5::connack>(std::addressof(connack_message_.base())))
  261. {
  262. const mqtt::v5::server_keep_alive* p =
  263. m->properties().get_if<mqtt::v5::server_keep_alive>();
  264. if (p)
  265. return p->value();
  266. }
  267. // Default to 60 seconds
  268. std::uint16_t v = 60;
  269. if (!this->connect_message_.empty())
  270. {
  271. if /**/ (std::holds_alternative<mqtt::v3::connect>(connect_message_.base()))
  272. {
  273. v = this->connect_message_.template get_if<mqtt::v3::connect>()->keep_alive();
  274. }
  275. else if (std::holds_alternative<mqtt::v4::connect>(connect_message_.base()))
  276. {
  277. v = this->connect_message_.template get_if<mqtt::v4::connect>()->keep_alive();
  278. }
  279. else if (std::holds_alternative<mqtt::v5::connect>(connect_message_.base()))
  280. {
  281. v = this->connect_message_.template get_if<mqtt::v5::connect>()->keep_alive();
  282. }
  283. }
  284. return v;
  285. }
  286. /**
  287. * @brief set the mqtt connect message packet
  288. */
  289. template<class Message>
  290. inline derived_t& set_connect_message(Message&& connect_msg)
  291. {
  292. using msg_type = typename detail::remove_cvref_t<Message>;
  293. if constexpr (
  294. std::is_same_v<msg_type, mqtt::v3::connect> ||
  295. std::is_same_v<msg_type, mqtt::v4::connect> ||
  296. std::is_same_v<msg_type, mqtt::v5::connect>)
  297. {
  298. this->connect_message_ = std::forward<Message>(connect_msg);
  299. }
  300. else
  301. {
  302. static_assert(detail::always_false_v<Message>);
  303. }
  304. return (static_cast<derived_t&>(*this));
  305. }
  306. /**
  307. * @brief get the mqtt connect message reference
  308. */
  309. inline mqtt::message& get_connect_message() { return this->connect_message_; }
  310. /**
  311. * @brief get the mqtt connect message reference
  312. */
  313. inline mqtt::message const& get_connect_message() const { return this->connect_message_; }
  314. /**
  315. * @brief get the mqtt connect message packet reference
  316. */
  317. template<mqtt::version v>
  318. inline auto& get_connect_packet()
  319. {
  320. if constexpr /**/ (mqtt::version::v3 == v)
  321. {
  322. return std::get<mqtt::v3::connect>(this->connect_message_.base());
  323. }
  324. else if constexpr (mqtt::version::v4 == v)
  325. {
  326. return std::get<mqtt::v4::connect>(this->connect_message_.base());
  327. }
  328. else if constexpr (mqtt::version::v5 == v)
  329. {
  330. return std::get<mqtt::v5::connect>(this->connect_message_.base());
  331. }
  332. else
  333. {
  334. static_assert(mqtt::version::v3 == v || mqtt::version::v4 == v || mqtt::version::v5 == v);
  335. }
  336. }
  337. protected:
  338. template<bool IsAsync, typename String, typename StrOrInt, typename Arg1, typename... Args>
  339. bool _do_connect_with_connect_message(String&& host, StrOrInt&& port, Arg1&& arg1, Args&&... args)
  340. {
  341. using arg1_type = typename detail::remove_cvref_t<Arg1>;
  342. if constexpr (
  343. std::is_same_v<arg1_type, mqtt::v3::connect> ||
  344. std::is_same_v<arg1_type, mqtt::v4::connect> ||
  345. std::is_same_v<arg1_type, mqtt::v5::connect>)
  346. {
  347. this->connect_message_ = std::forward<Arg1>(arg1);
  348. return this->derived().template _do_connect<IsAsync>(
  349. std::forward<String>(host), std::forward<StrOrInt>(port),
  350. ecs_helper::make_ecs(asio::transfer_at_least(1),
  351. mqtt::mqtt_match_role, std::forward<Args>(args)...));
  352. }
  353. else
  354. {
  355. return this->derived().template _do_connect<IsAsync>(
  356. std::forward<String>(host), std::forward<StrOrInt>(port),
  357. ecs_helper::make_ecs(asio::transfer_at_least(1),
  358. mqtt::mqtt_match_role, std::forward<Arg1>(arg1), std::forward<Args>(args)...));
  359. }
  360. }
  361. template<bool IsAsync, typename String, typename StrOrInt, typename C>
  362. inline bool _do_connect(String&& host, StrOrInt&& port, std::shared_ptr<ecs_t<C>> ecs)
  363. {
  364. if (!this->connect_message_.template holds<mqtt::v3::connect, mqtt::v4::connect, mqtt::v5::connect>())
  365. {
  366. ASIO2_ASSERT(false);
  367. set_last_error(asio::error::invalid_argument);
  368. return false;
  369. }
  370. return super::template _do_connect<IsAsync>(
  371. std::forward<String>(host), std::forward<StrOrInt>(port), std::move(ecs));
  372. }
  373. template<typename C>
  374. inline void _bind_default_mqtt_handler(std::shared_ptr<ecs_t<C>>& ecs)
  375. {
  376. detail::ignore_unused(ecs);
  377. // must set default callback for every mqtt message.
  378. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::connect ))) this->on_connect ([](mqtt::message& ) mutable {});
  379. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::connack ))) this->on_connack ([](mqtt::message& ) mutable {});
  380. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::publish ))) this->on_publish ([](mqtt::message&, mqtt::message&) mutable {});
  381. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::puback ))) this->on_puback ([](mqtt::message& ) mutable {});
  382. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::pubrec ))) this->on_pubrec ([](mqtt::message&, mqtt::message&) mutable {});
  383. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::pubrel ))) this->on_pubrel ([](mqtt::message&, mqtt::message&) mutable {});
  384. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::pubcomp ))) this->on_pubcomp ([](mqtt::message& ) mutable {});
  385. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::subscribe ))) this->on_subscribe ([](mqtt::message& ) mutable {});
  386. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::suback ))) this->on_suback ([](mqtt::message& ) mutable {});
  387. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::unsubscribe))) this->on_unsubscribe([](mqtt::message& ) mutable {});
  388. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::unsuback ))) this->on_unsuback ([](mqtt::message& ) mutable {});
  389. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::pingreq ))) this->on_pingreq ([](mqtt::message&, mqtt::message&) mutable {});
  390. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::pingresp ))) this->on_pingresp ([](mqtt::message& ) mutable {});
  391. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::disconnect ))) this->on_disconnect ([](mqtt::message& ) mutable {});
  392. if (!(this->_find_mqtt_handler(mqtt::control_packet_type::auth ))) this->on_auth ([](mqtt::message&, mqtt::message&) mutable {});
  393. }
  394. protected:
  395. template<typename C>
  396. inline void _do_init(std::shared_ptr<ecs_t<C>>& ecs)
  397. {
  398. // must set default callback for every mqtt message.
  399. this->derived()._bind_default_mqtt_handler(ecs);
  400. super::_do_init(ecs);
  401. }
  402. template<typename E = defer_event<void, derived_t>>
  403. inline void _do_disconnect(
  404. const error_code& ec, std::shared_ptr<derived_t> this_ptr, E chain = defer_event<void, derived_t>{})
  405. {
  406. state_t expected = state_t::started;
  407. if (this->derived().state_.compare_exchange_strong(expected, state_t::started))
  408. {
  409. mqtt::version ver = this->derived().version();
  410. if /**/ (ver == mqtt::version::v3)
  411. {
  412. mqtt::v3::disconnect disconnect;
  413. this->derived().internal_async_send(std::move(this_ptr), std::move(disconnect),
  414. [this, ec, e = chain.move_event()]
  415. (std::shared_ptr<derived_t> this_ptr, const error_code&,
  416. std::size_t, event_queue_guard<derived_t> g) mutable
  417. {
  418. defer_event chain(std::move(e), std::move(g));
  419. super::_do_disconnect(ec, std::move(this_ptr), std::move(chain));
  420. }, chain.move_guard());
  421. return;
  422. }
  423. else if (ver == mqtt::version::v4)
  424. {
  425. mqtt::v4::disconnect disconnect;
  426. this->derived().internal_async_send(std::move(this_ptr), std::move(disconnect),
  427. [this, ec, e = chain.move_event()]
  428. (std::shared_ptr<derived_t> this_ptr, const error_code&,
  429. std::size_t, event_queue_guard<derived_t> g) mutable
  430. {
  431. defer_event chain(std::move(e), std::move(g));
  432. super::_do_disconnect(ec, std::move(this_ptr), std::move(chain));
  433. }, chain.move_guard());
  434. return;
  435. }
  436. else if (ver == mqtt::version::v5)
  437. {
  438. mqtt::v5::disconnect disconnect;
  439. switch (ec.value())
  440. {
  441. // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901208
  442. case 0 : // Client or Server
  443. case 4 : // Client
  444. case 128 : // Client or Server
  445. case 129 : // Client or Server
  446. case 130 : // Client or Server
  447. case 131 : // Client or Server
  448. case 144 : // Client or Server
  449. case 147 : // Client or Server
  450. case 148 : // Client or Server
  451. case 149 : // Client or Server
  452. case 150 : // Client or Server
  453. case 151 : // Client or Server
  454. case 152 : // Client or Server
  455. case 153 : // Client or Server
  456. disconnect.reason_code(static_cast<std::uint8_t>(ec.value())); break;
  457. default: break;
  458. }
  459. this->derived().internal_async_send(std::move(this_ptr), std::move(disconnect),
  460. [this, ec, e = chain.move_event()]
  461. (std::shared_ptr<derived_t> this_ptr, const error_code&,
  462. std::size_t, event_queue_guard<derived_t> g) mutable
  463. {
  464. defer_event chain(std::move(e), std::move(g));
  465. super::_do_disconnect(ec, std::move(this_ptr), std::move(chain));
  466. }, chain.move_guard());
  467. return;
  468. }
  469. }
  470. super::_do_disconnect(ec, std::move(this_ptr), std::move(chain));
  471. }
  472. template<typename C, typename DeferEvent>
  473. inline void _handle_connect(
  474. const error_code& ec,
  475. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
  476. {
  477. derived_t& derive = this->derived();
  478. set_last_error(ec);
  479. if (ec)
  480. {
  481. return derive._done_connect(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  482. }
  483. // send connect message to server use coroutine
  484. mqtt_send_connect_op
  485. {
  486. derive.io_->context(),
  487. derive.connect_message_,
  488. derive.stream(),
  489. [&derive, this_ptr = std::move(this_ptr), ecs = std::move(ecs), chain = std::move(chain)]
  490. (error_code ec, std::unique_ptr<asio::streambuf> stream) mutable
  491. {
  492. derive._handle_mqtt_connect_response(ec, std::move(this_ptr), std::move(ecs),
  493. std::move(stream), std::move(chain));
  494. }
  495. };
  496. }
  497. template<typename C, typename DeferEvent>
  498. inline void _handle_mqtt_connect_response(
  499. error_code ec,
  500. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs,
  501. std::unique_ptr<asio::streambuf> stream, DeferEvent chain)
  502. {
  503. if (ec)
  504. {
  505. this->derived()._done_connect(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  506. return;
  507. }
  508. std::string_view data{ reinterpret_cast<std::string_view::const_pointer>(
  509. static_cast<const char*>(stream->data().data())), stream->size() };
  510. mqtt::control_packet_type type = mqtt::message_type_from_data(data);
  511. bool valid_message = (type == mqtt::control_packet_type::connack) ||
  512. (this->derived().version() == mqtt::version::v5 && type == mqtt::control_packet_type::auth);
  513. // -- the connect_timeout_cp will disconnect after a reasonable amount of time.
  514. // If the client does not receive a CONNACK message from the server within a reasonable amount
  515. // of time, the client should close the TCP/IP socket connection,
  516. // and restart the session by opening a new socket to the server and issuing a CONNECT message.
  517. if (!valid_message)
  518. {
  519. ASIO2_ASSERT(false);
  520. ec = mqtt::make_error_code(mqtt::error::malformed_packet);
  521. this->derived()._done_connect(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  522. return;
  523. }
  524. this->idmgr_.clear();
  525. ec = mqtt::make_error_code(mqtt::error::server_unavailable);
  526. this->derived()._call_mqtt_handler(type, ec, this_ptr, static_cast<derived_t*>(this), data);
  527. this->derived()._done_connect(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  528. }
  529. template<typename C, typename DeferEvent>
  530. inline void _do_start(
  531. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
  532. {
  533. super::_do_start(this_ptr, std::move(ecs), std::move(chain));
  534. this->derived()._post_pingreq_timer(
  535. std::move(this_ptr), std::chrono::seconds(this->derived().keep_alive_time()));
  536. }
  537. template<class Rep, class Period>
  538. inline void _post_pingreq_timer(
  539. std::shared_ptr<derived_t> this_ptr, std::chrono::duration<Rep, Period> duration)
  540. {
  541. derived_t& derive = this->derived();
  542. // start the timer
  543. if (duration > std::chrono::duration<Rep, Period>::zero() && this->is_started())
  544. {
  545. this->pingreq_timer_.expires_after(duration);
  546. this->pingreq_timer_.async_wait(
  547. [&derive, this_ptr = std::move(this_ptr)](const error_code& ec) mutable
  548. {
  549. derive._handle_pingreq_timer(ec, std::move(this_ptr));
  550. });
  551. }
  552. }
  553. inline void _handle_pingreq_timer(const error_code& ec, std::shared_ptr<derived_t> this_ptr)
  554. {
  555. derived_t& derive = this->derived();
  556. ASIO2_ASSERT(derive.io_->running_in_this_thread());
  557. ASIO2_ASSERT((!ec) || ec == asio::error::operation_aborted);
  558. if (ec)
  559. return;
  560. // The Client can send PINGREQ at any time, irrespective of the Keep Alive value, and check
  561. // for a corresponding PINGRESP to determine that the network and the Server are available.
  562. // If the Keep Alive value is non-zero and the Server does not receive an MQTT Control Packet
  563. // from the Client within one and a half times the Keep Alive time period, it MUST close the
  564. // Network Connection to the Client as if the network had failed [MQTT-3.1.2-22].
  565. // If a Client does not receive a PINGRESP packet within a reasonable amount of time after it
  566. // has sent a PINGREQ, it SHOULD close the Network Connection to the Server.
  567. // A Keep Alive value of 0 has the effect of turning off the Keep Alive mechanism. If Keep Alive
  568. // is 0 the Client is not obliged to send MQTT Control Packets on any particular schedule.
  569. // send pingreq message, don't case the last sent and recved time.
  570. mqtt::version ver = derive.version();
  571. if /**/ (ver == mqtt::version::v3)
  572. {
  573. derive.internal_async_send(this_ptr, mqtt::v3::pingreq{});
  574. }
  575. else if (ver == mqtt::version::v4)
  576. {
  577. derive.internal_async_send(this_ptr, mqtt::v4::pingreq{});
  578. }
  579. else if (ver == mqtt::version::v5)
  580. {
  581. derive.internal_async_send(this_ptr, mqtt::v5::pingreq{});
  582. }
  583. // do next timer
  584. derive._post_pingreq_timer(std::move(this_ptr), std::chrono::seconds(derive.keep_alive_time()));
  585. }
  586. inline void _stop_pingreq_timer()
  587. {
  588. detail::cancel_timer(this->pingreq_timer_);
  589. }
  590. template<typename DeferEvent>
  591. inline void _handle_disconnect(const error_code& ec, std::shared_ptr<derived_t> this_ptr, DeferEvent chain)
  592. {
  593. this->derived()._stop_pingreq_timer();
  594. super::_handle_disconnect(ec, std::move(this_ptr), std::move(chain));
  595. }
  596. template<typename DeferEvent>
  597. inline void _handle_stop(const error_code& ec, std::shared_ptr<derived_t> this_ptr, DeferEvent chain)
  598. {
  599. super::_handle_stop(ec, std::move(this_ptr), std::move(chain));
  600. }
  601. template<class Data, class Callback>
  602. inline bool _do_send(Data& data, Callback&& callback)
  603. {
  604. return this->derived()._mqtt_send(data, std::forward<Callback>(callback));
  605. }
  606. protected:
  607. template<typename C>
  608. inline void _fire_recv(
  609. std::shared_ptr<derived_t>& this_ptr, std::shared_ptr<ecs_t<C>>& ecs, std::string_view data)
  610. {
  611. data = detail::call_data_filter_before_recv(this->derived(), data);
  612. this->listener_.notify(event_type::recv, data);
  613. this->derived()._rdc_handle_recv(this_ptr, ecs, data);
  614. mqtt::control_packet_type type = mqtt::message_type_from_data(data);
  615. if (type > mqtt::control_packet_type::auth)
  616. {
  617. ASIO2_ASSERT(false);
  618. // give a error callback and call it ?
  619. return;
  620. }
  621. error_code ec;
  622. this->derived()._call_mqtt_handler(type, ec, this_ptr, static_cast<derived_t*>(this), data);
  623. if (ec)
  624. {
  625. // give a error callback and call it ?
  626. }
  627. }
  628. protected:
  629. /// Should we set a default mqtt version to v4, default client id to a uuid string ?
  630. mqtt::message connect_message_{/* mqtt::v4::connect{ asio2::uuid().next().str() } */};
  631. ///
  632. mqtt::message connack_message_{};
  633. /// timer for pingreq
  634. asio::steady_timer pingreq_timer_;
  635. /// packet id manager
  636. mqtt::idmgr<std::atomic<mqtt::two_byte_integer::value_type>> idmgr_;
  637. };
  638. }
  639. namespace asio2
  640. {
  641. using mqtt_client_args = detail::template_args_mqtt_client;
  642. template<class derived_t, class args_t>
  643. using mqtt_client_impl_t = detail::mqtt_client_impl_t<derived_t, args_t>;
  644. template<class derived_t>
  645. class mqtt_client_t : public detail::mqtt_client_impl_t<derived_t, detail::template_args_mqtt_client>
  646. {
  647. public:
  648. using detail::mqtt_client_impl_t<derived_t, detail::template_args_mqtt_client>::mqtt_client_impl_t;
  649. };
  650. class mqtt_client : public mqtt_client_t<mqtt_client>
  651. {
  652. public:
  653. using mqtt_client_t<mqtt_client>::mqtt_client_t;
  654. };
  655. }
  656. #include <asio2/base/detail/pop_options.hpp>
  657. #endif // !__ASIO2_MQTT_CLIENT_HPP__