mqtt_subscription_map.hpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. *
  10. * refrenced from : mqtt_cpp/include/mqtt/broker/subscription_map.hpp
  11. */
  12. #ifndef __ASIO2_MQTT_SUBSCRIPTION_MAP_HPP__
  13. #define __ASIO2_MQTT_SUBSCRIPTION_MAP_HPP__
  14. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #pragma once
  16. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  17. #include <cstdint>
  18. #include <string>
  19. #include <string_view>
  20. #include <type_traits>
  21. #include <unordered_set>
  22. #include <unordered_map>
  23. #include <algorithm>
  24. #include <optional>
  25. #include <vector>
  26. #include <cinttypes>
  27. #include <asio2/base/error.hpp>
  28. #include <asio2/base/detail/util.hpp>
  29. #include <asio2/base/detail/shared_mutex.hpp>
  30. #include <asio2/mqtt/detail/mqtt_topic_util.hpp>
  31. #include <asio2/mqtt/idmgr.hpp>
  32. #include <asio2/mqtt/message.hpp>
  33. namespace asio2::mqtt
  34. {
  35. template<
  36. class Key, // client id
  37. class Value, // subscribe data
  38. class Container = std::unordered_map<Key, Value>
  39. >
  40. class subscription_map
  41. {
  42. public:
  43. using key_type = std::pair<std::size_t, std::string_view>;
  44. struct hasher
  45. {
  46. inline std::size_t operator()(key_type const& pair) const noexcept
  47. {
  48. std::size_t v = asio2::detail::fnv1a_hash<std::size_t>(
  49. (const unsigned char*)(std::addressof(pair.first)), sizeof(std::size_t));
  50. return asio2::detail::fnv1a_hash<std::size_t>(v,
  51. (const unsigned char*)(pair.second.data()), pair.second.size());
  52. }
  53. };
  54. struct node
  55. {
  56. std::size_t id;
  57. std::size_t count = 1;
  58. bool has_plus = false;
  59. bool has_hash = false;
  60. key_type parent;
  61. std::vector<char> tokenize; // vector has no SSO
  62. Container subscribers;
  63. inline std::string_view tokenize_view()
  64. {
  65. return std::string_view{ tokenize.data(), tokenize.size() };
  66. }
  67. explicit node(std::size_t i, key_type p, std::string_view t) : id(i), parent(p)
  68. {
  69. tokenize.resize(t.size());
  70. std::memcpy((void*)tokenize.data(), (const void*)t.data(), t.size());
  71. }
  72. };
  73. using map_type = std::unordered_map<key_type, node, hasher>;
  74. using map_iterator = typename map_type::iterator;
  75. using map_const_iterator = typename map_type::const_iterator;
  76. public:
  77. subscription_map()
  78. {
  79. this->root_node_id_ = idmgr_.get();
  80. ASIO2_ASSERT(this->root_node_id_ == 1);
  81. map_.emplace(root_key_, node(this->root_node_id_, root_key_, ""));
  82. }
  83. // Return the number of registered topic filters
  84. inline std::size_t get_subscribe_count() const
  85. {
  86. return this->subscribe_count_;
  87. }
  88. template<typename Function>
  89. void match(std::string_view topic, Function&& callback)
  90. {
  91. std::vector<map_iterator> iters;
  92. asio2::shared_locker g(this->submap_mutex_);
  93. iters.emplace_back(this->get_root());
  94. topic_filter_tokenizer(topic, [this, &iters, &callback](std::string_view t) mutable
  95. {
  96. return this->match_subfun(iters, callback, t);
  97. });
  98. for (auto& it : iters)
  99. {
  100. for (auto&[k, v] : it->second.subscribers)
  101. {
  102. callback(k, v);
  103. }
  104. }
  105. }
  106. // Insert a key => value at the specified topic filter
  107. // returns the handle and true if key was inserted, false if key was updated
  108. template <typename K, typename V>
  109. std::pair<key_type, bool> insert_or_assign(std::string_view topic_filter, K&& key, V&& val)
  110. {
  111. asio2::unique_locker g(this->submap_mutex_);
  112. std::vector<map_iterator> iters = this->get_nodes_by_topic_filter(topic_filter);
  113. if (iters.empty())
  114. {
  115. iters = this->emplace(topic_filter);
  116. this->emplace_subscriber_node(key, iters.back()->first);
  117. iters.back()->second.subscribers.insert_or_assign(std::forward<K>(key), std::forward<V>(val));
  118. this->increase_subscribe_count();
  119. return std::pair(iters.back()->first, true);
  120. }
  121. else
  122. {
  123. auto& subscribers = iters.back()->second.subscribers;
  124. this->emplace_subscriber_node(key, iters.back()->first);
  125. auto[_1, inserted] = subscribers.insert_or_assign(std::forward<K>(key), std::forward<V>(val));
  126. asio2::ignore_unused(_1, inserted);
  127. if (inserted)
  128. {
  129. this->increase_subscriptions(iters);
  130. this->increase_subscribe_count();
  131. }
  132. return std::pair(iters.back()->first, inserted);
  133. }
  134. }
  135. // Insert a key => value with a handle to the topic filter
  136. // returns the handle and true if key was inserted, false if key was updated
  137. template <typename K, typename V>
  138. std::pair<key_type, bool> insert_or_assign(key_type const& h, K&& key, V&& val)
  139. {
  140. asio2::unique_locker g(this->submap_mutex_);
  141. auto it = this->map_.find(h);
  142. if (it == this->map_.end())
  143. {
  144. return std::pair(key_type(0, "null"), false);
  145. }
  146. auto& subscribers = it->second.subscribers;
  147. this->emplace_subscriber_node(key, it->first);
  148. auto[_1, inserted] = subscribers.insert_or_assign(std::forward<K>(key), std::forward<V>(val));
  149. asio2::ignore_unused(_1, inserted);
  150. if (inserted)
  151. {
  152. this->increase_subscriptions(h);
  153. this->increase_subscribe_count();
  154. }
  155. return std::pair(h, inserted);
  156. }
  157. // returns the number of removed elements
  158. std::size_t erase(key_type const& h, Key const& key)
  159. {
  160. asio2::unique_locker g(this->submap_mutex_);
  161. auto it = this->map_.find(h);
  162. if (it == this->map_.end())
  163. {
  164. return 0;
  165. }
  166. this->erase_subscriber_node(key, h);
  167. auto amount = it->second.subscribers.erase(key);
  168. if (amount)
  169. {
  170. std::vector<map_iterator> v = this->handle_to_iterators(h);
  171. this->erase(v);
  172. this->decrease_subscribe_count();
  173. }
  174. return amount;
  175. }
  176. // returns the number of removed elements
  177. std::size_t erase(std::string_view topic_filter, Key const& key)
  178. {
  179. asio2::unique_locker g(this->submap_mutex_);
  180. std::vector<map_iterator> iters = this->get_nodes_by_topic_filter(topic_filter);
  181. if (iters.empty())
  182. {
  183. return 0;
  184. }
  185. this->erase_subscriber_node(key, iters.back()->first);
  186. auto amount = iters.back()->second.subscribers.erase(key);
  187. if (amount)
  188. {
  189. this->decrease_subscribe_count();
  190. this->erase(iters);
  191. }
  192. return amount;
  193. }
  194. // returns the number of removed elements
  195. std::size_t erase(Key const& key)
  196. {
  197. asio2::unique_locker g(this->submap_mutex_);
  198. auto iter = this->subscriber_nodes_.find(key);
  199. if (iter == this->subscriber_nodes_.end())
  200. return 0;
  201. std::size_t total = 0;
  202. for (auto& h : iter->second)
  203. {
  204. auto it = this->map_.find(h);
  205. if (it == this->map_.end())
  206. continue;
  207. auto amount = it->second.subscribers.erase(key);
  208. if (amount)
  209. {
  210. std::vector<map_iterator> v = this->handle_to_iterators(h);
  211. this->erase(v);
  212. this->decrease_subscribe_count();
  213. }
  214. total += amount;
  215. }
  216. this->subscriber_nodes_.erase(key);
  217. return total;
  218. }
  219. protected:
  220. inline map_iterator get_root() ASIO2_NO_THREAD_SAFETY_ANALYSIS { return map_.find(root_key_); };
  221. inline map_const_iterator get_root() const ASIO2_NO_THREAD_SAFETY_ANALYSIS { return map_.find(root_key_); };
  222. // Increase the map size (total number of subscriptions stored)
  223. inline void increase_subscribe_count()
  224. {
  225. ++subscribe_count_;
  226. }
  227. // Decrease the map size (total number of subscriptions stored)
  228. inline void decrease_subscribe_count()
  229. {
  230. ASIO2_ASSERT(subscribe_count_ > 0);
  231. --subscribe_count_;
  232. }
  233. inline void increase_subscriptions(key_type const& h)
  234. {
  235. handle_to_iterators(h, [](map_iterator it) mutable
  236. {
  237. it->second.count++;
  238. });
  239. }
  240. // Increase the number of subscriptions for this path
  241. inline void increase_subscriptions(std::vector<map_iterator>& iters)
  242. {
  243. for (auto i : iters)
  244. {
  245. i->second.count++;
  246. }
  247. }
  248. std::optional<key_type> lookup(std::string_view topic_filter)
  249. {
  250. std::vector<map_iterator> iters = this->get_nodes_by_topic_filter(topic_filter);
  251. if (iters.empty())
  252. return std::nullopt;
  253. else
  254. return iters.back()->first;
  255. }
  256. std::vector<map_iterator> emplace(std::string_view topic_filter) ASIO2_NO_THREAD_SAFETY_ANALYSIS
  257. {
  258. map_iterator parent = this->get_root();
  259. std::vector<map_iterator> iters;
  260. topic_filter_tokenizer(topic_filter, [this, &iters, &parent](std::string_view t) mutable
  261. {
  262. return emplace_subfun(iters, parent, t);
  263. });
  264. return iters;
  265. }
  266. inline bool emplace_subfun(std::vector<map_iterator>& iters, map_iterator& parent, std::string_view t)
  267. ASIO2_NO_THREAD_SAFETY_ANALYSIS
  268. {
  269. node& pn = parent->second;
  270. auto it = map_.find(key_type(pn.id, t));
  271. if (it == map_.end())
  272. {
  273. node val{ idmgr_.get(), parent->first, t };
  274. key_type key{ pn.id, val.tokenize_view() };
  275. it = map_.emplace(std::move(key), std::move(val)).first;
  276. pn.has_plus |= (t == "+");
  277. pn.has_hash |= (t == "#");
  278. }
  279. else
  280. {
  281. it->second.count++;
  282. }
  283. iters.emplace_back(it);
  284. parent = std::move(it);
  285. return true;
  286. }
  287. void erase(std::vector<map_iterator>& iters) ASIO2_NO_THREAD_SAFETY_ANALYSIS
  288. {
  289. bool remove_plus_flag = false;
  290. bool remove_hash_flag = false;
  291. std::reverse(iters.begin(), iters.end());
  292. for (map_iterator& it : iters)
  293. {
  294. node& n = it->second;
  295. if (remove_plus_flag)
  296. {
  297. n.has_plus = false;
  298. remove_plus_flag = false;
  299. }
  300. if (remove_hash_flag)
  301. {
  302. n.has_hash = false;
  303. remove_hash_flag = false;
  304. }
  305. ASIO2_ASSERT(n.count > 0);
  306. n.count--;
  307. if (n.count == 0)
  308. {
  309. remove_plus_flag = (it->first.second == "+");
  310. remove_hash_flag = (it->first.second == "#");
  311. this->idmgr_.release(it->second.id);
  312. // std::unordered_map<Key,T,Hash,KeyEqual,Allocator>::erase
  313. // References and iterators to the erased elements are invalidated.
  314. // Other iterators and references are not invalidated.
  315. map_.erase(it);
  316. }
  317. }
  318. map_iterator root = this->get_root();
  319. if (remove_plus_flag)
  320. {
  321. root->second.has_plus = false;
  322. }
  323. if (remove_hash_flag)
  324. {
  325. root->second.has_hash = false;
  326. }
  327. }
  328. template <typename K>
  329. inline void emplace_subscriber_node(K&& key, key_type node_key) ASIO2_NO_THREAD_SAFETY_ANALYSIS
  330. {
  331. std::unordered_set<key_type, hasher>& node_keys = this->subscriber_nodes_[key];
  332. node_keys.emplace(std::move(node_key));
  333. }
  334. inline void erase_subscriber_node(const Key& key, const key_type& node_key) ASIO2_NO_THREAD_SAFETY_ANALYSIS
  335. {
  336. std::unordered_set<key_type, hasher>& node_keys = this->subscriber_nodes_[key];
  337. node_keys.erase(node_key);
  338. if (node_keys.empty())
  339. {
  340. this->subscriber_nodes_.erase(key);
  341. }
  342. }
  343. std::vector<map_iterator> get_nodes_by_topic_filter(std::string_view topic_filter)
  344. ASIO2_NO_THREAD_SAFETY_ANALYSIS
  345. {
  346. std::size_t id = this->get_root()->second.id;
  347. std::vector<map_iterator> iters;
  348. topic_filter_tokenizer(topic_filter, [this, &iters, &id](std::string_view t) mutable
  349. {
  350. return this->get_nodes_by_topic_filter_subfun(iters, id, t);
  351. });
  352. return iters;
  353. }
  354. inline bool get_nodes_by_topic_filter_subfun(
  355. std::vector<map_iterator>& iters, std::size_t& id, std::string_view t) ASIO2_NO_THREAD_SAFETY_ANALYSIS
  356. {
  357. auto it = map_.find(key_type(id, t));
  358. if (it == map_.end())
  359. {
  360. iters.clear();
  361. return false;
  362. }
  363. id = it->second.id;
  364. iters.emplace_back(it);
  365. return true;
  366. }
  367. template<typename Function>
  368. bool match_subfun(std::vector<map_iterator>& iters, Function& callback, std::string_view t)
  369. ASIO2_NO_THREAD_SAFETY_ANALYSIS
  370. {
  371. std::vector<map_iterator> new_iters;
  372. for (auto& it : iters)
  373. {
  374. node& pn = it->second;
  375. auto i = this->map_.find(key_type(pn.id, t));
  376. if (i != this->map_.end())
  377. {
  378. new_iters.emplace_back(i);
  379. }
  380. if (pn.has_plus)
  381. {
  382. i = this->map_.find(key_type(pn.id, std::string_view("+")));
  383. if (i != this->map_.end())
  384. {
  385. if (pn.id != this->root_node_id_ || t.empty() || t[0] != '$')
  386. {
  387. new_iters.emplace_back(i);
  388. }
  389. }
  390. }
  391. if (pn.has_hash)
  392. {
  393. i = this->map_.find(key_type(pn.id, std::string_view("#")));
  394. if (i != this->map_.end())
  395. {
  396. if (pn.id != this->root_node_id_ || t.empty() || t[0] != '$')
  397. {
  398. for (auto& [k, v] : i->second.subscribers)
  399. {
  400. callback(k, v);
  401. }
  402. }
  403. }
  404. }
  405. }
  406. std::swap(iters, new_iters);
  407. return !iters.empty();
  408. }
  409. template<typename Function>
  410. void handle_to_iterators(key_type const& h, Function&& callback) ASIO2_NO_THREAD_SAFETY_ANALYSIS
  411. {
  412. key_type k = h;
  413. while (k != this->root_key_)
  414. {
  415. auto it = this->map_.find(k);
  416. if (it == this->map_.end())
  417. {
  418. return;
  419. }
  420. callback(it);
  421. k = it->second.parent;
  422. }
  423. }
  424. inline std::vector<map_iterator> handle_to_iterators(key_type const& h) ASIO2_NO_THREAD_SAFETY_ANALYSIS
  425. {
  426. std::vector<map_iterator> iters;
  427. this->handle_to_iterators(h, [&iters](map_iterator it) mutable
  428. {
  429. iters.emplace_back(it);
  430. });
  431. std::reverse(iters.begin(), iters.end());
  432. return iters;
  433. }
  434. // Get path of topic_filter
  435. std::string handle_to_topic_filter(key_type const& h) const ASIO2_NO_THREAD_SAFETY_ANALYSIS
  436. {
  437. std::string result;
  438. handle_to_iterators(h, [&result](map_iterator it) mutable
  439. {
  440. if (result.empty())
  441. {
  442. result = std::string(it->first.second);
  443. }
  444. else
  445. {
  446. result.insert(0, "/");
  447. result.insert(0, it->first.second);
  448. }
  449. });
  450. return result;
  451. }
  452. protected:
  453. static constexpr key_type root_key_{ 0, "" };
  454. /// use rwlock to make thread safe
  455. mutable asio2::shared_mutexer submap_mutex_;
  456. std::size_t root_node_id_ = 1;
  457. map_type map_ ASIO2_GUARDED_BY(submap_mutex_);
  458. // Key - client id, Val - all nodes keys for the subscriber
  459. std::unordered_map<Key, std::unordered_set<key_type, hasher>> subscriber_nodes_ ASIO2_GUARDED_BY(submap_mutex_);
  460. // Map size tracks the total number of subscriptions within the map
  461. std::atomic<std::size_t> subscribe_count_ = 0;
  462. mqtt::idmgr<std::set<std::size_t>> idmgr_;
  463. };
  464. }
  465. #endif // !__ASIO2_MQTT_SUBSCRIPTION_MAP_HPP__