mqtt_shared_target.hpp 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. *
  10. * refrenced from : mqtt_cpp/include/mqtt/broker/shared_target.hpp
  11. */
  12. #ifndef __ASIO2_MQTT_SHARED_TARGET_HPP__
  13. #define __ASIO2_MQTT_SHARED_TARGET_HPP__
  14. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #pragma once
  16. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  17. #include <cstdint>
  18. #include <string>
  19. #include <string_view>
  20. #include <type_traits>
  21. #include <optional>
  22. #include <thread>
  23. #include <map>
  24. #include <unordered_map>
  25. #include <chrono>
  26. #include <set>
  27. #include <vector>
  28. #include <asio2/base/detail/shared_mutex.hpp>
  29. #include <asio2/mqtt/detail/mqtt_topic_util.hpp>
  30. namespace asio2::mqtt
  31. {
  32. // v is shared target node
  33. template<typename STNode>
  34. auto round_shared_target_method(STNode& v) ->
  35. std::shared_ptr<typename asio2::detail::remove_cvref_t<STNode>::session_type>
  36. {
  37. using session_type = typename asio2::detail::remove_cvref_t<STNode>::session_type;
  38. std::weak_ptr<session_type> session;
  39. if (!v.session_map.empty())
  40. {
  41. if (v.last == v.session_map.end())
  42. v.last = v.session_map.begin();
  43. else
  44. {
  45. v.last = std::next(v.last);
  46. if (v.last == v.session_map.end())
  47. v.last = v.session_map.begin();
  48. }
  49. session = v.last->second;
  50. }
  51. return session.lock();
  52. }
  53. template<typename STNode>
  54. class shared_target
  55. {
  56. public:
  57. struct hasher
  58. {
  59. inline std::size_t operator()(std::pair<std::string_view, std::string_view> const& pair) const noexcept
  60. {
  61. std::size_t v = asio2::detail::fnv1a_hash<std::size_t>(
  62. (const unsigned char*)(pair.first.data()), pair.first.size());
  63. return asio2::detail::fnv1a_hash<std::size_t>(v,
  64. (const unsigned char*)(pair.second.data()), pair.second.size());
  65. }
  66. };
  67. shared_target()
  68. {
  69. set_policy(std::bind(round_shared_target_method<STNode>, std::placeholders::_1));
  70. }
  71. ~shared_target() = default;
  72. using session_t = typename STNode::session_type;
  73. using session_type = typename STNode::session_type;
  74. template<class Function>
  75. inline shared_target& set_policy(Function&& fun)
  76. {
  77. asio2::unique_locker g(this->shared_target_mutex_);
  78. policy_ = std::forward<Function>(fun);
  79. return (*this);
  80. }
  81. public:
  82. void insert(std::shared_ptr<session_t>& session, std::string_view share_name, std::string_view topic_filter)
  83. {
  84. auto key = std::pair{ share_name, topic_filter };
  85. asio2::unique_locker g(this->shared_target_mutex_);
  86. auto it = targets_.find(key);
  87. if (it == targets_.end())
  88. {
  89. STNode v{ share_name, topic_filter };
  90. auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
  91. std::chrono::steady_clock::now().time_since_epoch()).count();
  92. session->shared_target_key_ = ns;
  93. v.session_map.emplace(ns, session);
  94. v.session_set.emplace(session.get());
  95. key = std::pair{ v.share_name_view(), v.topic_filter_view() };
  96. it = targets_.emplace(std::move(key), std::move(v)).first;
  97. }
  98. else
  99. {
  100. STNode& v = it->second;
  101. if (v.session_set.find(session.get()) != v.session_set.end())
  102. return;
  103. for (;;)
  104. {
  105. auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
  106. std::chrono::steady_clock::now().time_since_epoch()).count();
  107. auto it_map = v.session_map.find(ns);
  108. if (it_map != v.session_map.end())
  109. {
  110. std::this_thread::yield();
  111. continue;
  112. }
  113. session->shared_target_key_ = ns;
  114. v.session_map.emplace(ns, session);
  115. v.session_set.emplace(session.get());
  116. break;
  117. }
  118. }
  119. }
  120. void erase(std::shared_ptr<session_t>& session, std::string_view share_name, std::string_view topic_filter)
  121. {
  122. auto key = std::pair{ share_name, topic_filter };
  123. asio2::unique_locker g(this->shared_target_mutex_);
  124. auto it = targets_.find(key);
  125. if (it == targets_.end())
  126. return;
  127. STNode& v = it->second;
  128. auto it_map = v.session_map.find(session->shared_target_key_);
  129. if (it_map != v.session_map.end())
  130. {
  131. if (v.last == it_map)
  132. {
  133. if (it_map != v.session_map.begin())
  134. v.last = std::prev(v.last);
  135. else
  136. v.last = std::next(v.last);
  137. }
  138. v.session_map.erase(it_map);
  139. }
  140. auto it_set = v.session_set.find(session.get());
  141. if (it_set != v.session_set.end())
  142. {
  143. v.session_set.erase(it_set);
  144. }
  145. }
  146. //void erase(std::shared_ptr<session_t>& session, std::set<std::string_view> share_names)
  147. //{
  148. // for (std::string_view share_name : share_names)
  149. // {
  150. // auto it = targets_.find(share_name);
  151. // if (it == targets_.end())
  152. // continue;
  153. // std::unordered_map<std::string_view, entry>& map_inner = it->second;
  154. // auto it_map = map_inner.find(session->client_id());
  155. // if (it_map == map_inner.end())
  156. // continue;
  157. // map_inner.erase(it_map);
  158. // }
  159. //}
  160. std::shared_ptr<session_t> get_target(std::string_view share_name, std::string_view topic_filter)
  161. {
  162. auto key = std::pair{ share_name, topic_filter };
  163. asio2::unique_locker g(this->shared_target_mutex_);
  164. auto it = targets_.find(key);
  165. if (it == targets_.end())
  166. return std::shared_ptr<session_t>();
  167. STNode& v = it->second;
  168. return policy_(v);
  169. }
  170. protected:
  171. /// use rwlock to make thread safe
  172. mutable asio2::shared_mutexer shared_target_mutex_;
  173. /// key : share_name - topic_filter, val : shared target node
  174. std::unordered_map<std::pair<std::string_view, std::string_view>, STNode, hasher> targets_ ASIO2_GUARDED_BY(shared_target_mutex_);
  175. std::function<std::shared_ptr<session_type>(STNode&)> policy_ ASIO2_GUARDED_BY(shared_target_mutex_);
  176. };
  177. template<class session_t>
  178. struct stnode
  179. {
  180. template <class> friend class mqtt::shared_target;
  181. using session_type = session_t;
  182. explicit stnode(std::string_view _share_name, std::string_view _topic_filter)
  183. {
  184. share_name.resize(_share_name.size());
  185. std::memcpy((void*)share_name.data(), (const void*)_share_name.data(), _share_name.size());
  186. topic_filter.resize(_topic_filter.size());
  187. std::memcpy((void*)topic_filter.data(), (const void*)_topic_filter.data(), _topic_filter.size());
  188. last = session_map.end();
  189. }
  190. inline std::string_view share_name_view()
  191. {
  192. return std::string_view{ share_name.data(), share_name.size() };
  193. }
  194. inline std::string_view topic_filter_view()
  195. {
  196. return std::string_view{ topic_filter.data(), topic_filter.size() };
  197. }
  198. std::vector<char> share_name ; // vector has no SSO
  199. std::vector<char> topic_filter;
  200. /// session map ordered by steady_clock
  201. std::map<std::chrono::nanoseconds::rep, std::weak_ptr<session_t>> session_map;
  202. /// session unique
  203. std::set<session_t*> session_set;
  204. /// last session for shared subscribe
  205. typename std::map<std::chrono::nanoseconds::rep, std::weak_ptr<session_t>>::iterator last;
  206. };
  207. }
  208. #endif // !__ASIO2_MQTT_SHARED_TARGET_HPP__