mqtt_offline_message.hpp 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. *
  10. * refrenced from : mqtt_cpp/include/mqtt/broker/offline_message.hpp
  11. */
  12. #ifndef __ASIO2_MQTT_OFFLINE_MESSAGE_HPP__
  13. #define __ASIO2_MQTT_OFFLINE_MESSAGE_HPP__
  14. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #pragma once
  16. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  17. #include <cstdint>
  18. #include <string>
  19. #include <string_view>
  20. #include <type_traits>
  21. #include <list>
  22. #include <algorithm>
  23. #include <variant>
  24. #include <asio2/base/iopool.hpp>
  25. #include <asio2/mqtt/message.hpp>
  26. #include <asio2/mqtt/detail/mqtt_topic_util.hpp>
  27. namespace asio2::mqtt
  28. {
  29. // The offline message structure holds messages that have been published on a
  30. // topic that a not-currently-connected client is subscribed to.
  31. // When a new connection is made with the client id for this saved data,
  32. // these messages will be published to that client, and only that client.
  33. template<class Value>
  34. class offline_messages
  35. {
  36. public:
  37. offline_messages() = default;
  38. ~offline_messages() = default;
  39. //void send_all(endpoint_t& ep)
  40. //{
  41. // auto& idx = messages_.get<tag_seq>();
  42. // while (!idx.empty())
  43. // {
  44. // if (idx.front().send(ep))
  45. // {
  46. // idx.pop_front();
  47. // }
  48. // else
  49. // {
  50. // break;
  51. // }
  52. // }
  53. //}
  54. //void send_by_packet_id_release(endpoint_t& ep)
  55. //{
  56. // auto& idx = messages_.get<tag_seq>();
  57. // while (!idx.empty())
  58. // {
  59. // if (idx.front().send(ep))
  60. // {
  61. // // if packet_id is consumed, then finish
  62. // idx.pop_front();
  63. // }
  64. // else
  65. // {
  66. // break;
  67. // }
  68. // }
  69. //}
  70. //bool send(endpoint_t& ep) const
  71. //{
  72. // auto props = props_;
  73. // if (message_expiry_timer_)
  74. // {
  75. // auto d = std::chrono::duration_cast<std::chrono::seconds>(
  76. // message_expiry_timer_->expiry() - std::chrono::steady_clock::now()).count();
  77. // if (d < 0)
  78. // d = 0;
  79. // set_property<v5::property::message_expiry_interval>(
  80. // props,
  81. // v5::property::message_expiry_interval(
  82. // static_cast<uint32_t>(d)));
  83. // }
  84. // mqtt::qos_type qos_value = publish_.qos();
  85. // if (qos_value == mqtt::qos_type::at_least_once || qos_value == mqtt::qos_type::exactly_once)
  86. // {
  87. // if (auto pid = ep.acquire_unique_packet_id_no_except())
  88. // {
  89. // ep.publish(pid.value(), topic_, contents_, pubopts_, std::move(props));
  90. // return true;
  91. // }
  92. // }
  93. // else
  94. // {
  95. // ep.publish(topic_, contents_, pubopts_, std::move(props));
  96. // return true;
  97. // }
  98. // return false;
  99. //}
  100. void clear()
  101. {
  102. messages_.clear();
  103. }
  104. bool empty() const
  105. {
  106. return messages_.empty();
  107. }
  108. template<class Message>
  109. void push_back(asio::io_context& ioc, Message&& msg)
  110. {
  111. using message_type = typename asio2::detail::remove_cvref_t<Message>;
  112. auto it = messages_.emplace(messages_.end(), std::forward<Message>(msg), nullptr);
  113. if constexpr (std::is_same_v<message_type, mqtt::v5::publish>)
  114. {
  115. mqtt::v5::message_expiry_interval* mei =
  116. msg.properties().template get_if<mqtt::v5::message_expiry_interval>();
  117. if (mei)
  118. {
  119. std::shared_ptr<asio::steady_timer> expiry_timer =
  120. std::make_shared<asio::steady_timer>(ioc, std::chrono::seconds(mei->value()));
  121. expiry_timer->async_wait(
  122. [this, it, wp = std::weak_ptr<asio::steady_timer>(expiry_timer)](error_code ec) mutable
  123. {
  124. if (auto sp = wp.lock())
  125. {
  126. if (!ec)
  127. {
  128. messages_.erase(it);
  129. }
  130. }
  131. });
  132. it->message_expiry_timer = std::move(expiry_timer);
  133. }
  134. }
  135. else
  136. {
  137. asio2::ignore_unused(ioc, msg, it);
  138. }
  139. }
  140. template<class Callback>
  141. void for_each(Callback&& cb)
  142. {
  143. for (auto& v : messages_)
  144. {
  145. cb(v);
  146. }
  147. }
  148. protected:
  149. ///
  150. std::list<Value> messages_;
  151. };
  152. struct omnode
  153. {
  154. public:
  155. template<class Message>
  156. explicit omnode(Message&& msg, std::shared_ptr<asio::steady_timer> expiry_timer)
  157. : message(std::forward<Message>(msg))
  158. , message_expiry_timer(std::move(expiry_timer))
  159. {
  160. }
  161. mqtt::message message;
  162. std::shared_ptr<asio::steady_timer> message_expiry_timer;
  163. };
  164. }
  165. #endif // !__ASIO2_MQTT_OFFLINE_MESSAGE_HPP__