123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500 |
- /*
- * Copyright (c) 2017-2023 zhllxt
- *
- * author : zhllxt
- * email : 37792738@qq.com
- *
- * Distributed under the Boost Software License, Version 1.0. (See accompanying
- * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
- *
- * refrenced from : mqtt_cpp/include/mqtt/broker/retained_messages.hpp
- */
- #ifndef __ASIO2_MQTT_RETAINED_MESSAGES_HPP__
- #define __ASIO2_MQTT_RETAINED_MESSAGES_HPP__
- #if defined(_MSC_VER) && (_MSC_VER >= 1200)
- #pragma once
- #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
- #include <cstdint>
- #include <string>
- #include <string_view>
- #include <type_traits>
- #include <unordered_map>
- #include <algorithm>
- #include <optional>
- #include <deque>
- #include <asio2/base/detail/shared_mutex.hpp>
- #include <asio2/mqtt/message.hpp>
- #include <asio2/mqtt/detail/mqtt_topic_util.hpp>
- namespace asio2::mqtt
- {
- template<typename Value>
- class retained_messages
- {
- public:
- using key_type = std::pair<std::size_t, std::string_view>;
- struct hasher
- {
- inline std::size_t operator()(key_type const& pair) const noexcept
- {
- std::size_t v = asio2::detail::fnv1a_hash<std::size_t>(
- (const unsigned char*)(std::addressof(pair.first)), sizeof(std::size_t));
- return asio2::detail::fnv1a_hash<std::size_t>(v,
- (const unsigned char*)(pair.second.data()), pair.second.size());
- }
- };
- protected:
- // Exceptions used
- static void throw_max_stored_topics()
- {
- throw std::overflow_error("Retained map maximum number of topics reached");
- }
- static void throw_no_wildcards_allowed()
- {
- throw std::runtime_error("Retained map no wildcards allowed in retained topic name");
- }
- static constexpr std::size_t root_parent_id = 0;
- static constexpr std::size_t root_node_id = 1;
- static constexpr std::size_t max_node_id = (std::numeric_limits<std::size_t>::max)();
- struct path_entry
- {
- std::size_t parent_id;
- std::string_view name;
- std::size_t id;
- std::size_t count = 1;
- static constexpr std::size_t max_count = (std::numeric_limits<std::size_t>::max)();
- // Increase the count for this node
- inline void increase_count()
- {
- if (count == max_count)
- {
- throw_max_stored_topics();
- }
- ++count;
- }
- // Decrease the count for this node
- inline void decrease_count()
- {
- ASIO2_ASSERT(count >= 1);
- --count;
- }
- std::optional<Value> value;
- path_entry(std::size_t parent_id, std::string_view name, std::size_t id)
- : parent_id(parent_id)
- , name(name)
- , id(id)
- { }
- };
- using map_type = std::unordered_map<key_type, path_entry, hasher>;
- using map_iterator = typename map_type::iterator;
- using map_const_iterator = typename map_type::const_iterator;
- /// use rwlock to make thread safe
- mutable asio2::shared_mutexer retained_mutex_;
- std::unordered_map <key_type, path_entry, hasher> map_ ASIO2_GUARDED_BY(retained_mutex_);
- std::unordered_multimap<std::size_t, path_entry* > wildcard_map_ ASIO2_GUARDED_BY(retained_mutex_);
- std::size_t map_size ASIO2_GUARDED_BY(retained_mutex_);
- std::size_t next_node_id ASIO2_GUARDED_BY(retained_mutex_);
- inline map_iterator create_topic(std::string_view topic_name) ASIO2_NO_THREAD_SAFETY_ANALYSIS
- {
- map_iterator parent = get_root();
- topic_filter_tokenizer(topic_name, [this, &parent](std::string_view t) mutable
- {
- return this->create_topic_subfun(parent, t);
- });
- return parent;
- }
- inline bool create_topic_subfun(map_iterator& parent, std::string_view t) ASIO2_NO_THREAD_SAFETY_ANALYSIS
- {
- if (t == "+" || t == "#")
- {
- throw_no_wildcards_allowed();
- }
- std::size_t parent_id = parent->second.id;
- map_iterator it = map_.find(key_type(parent_id, t));
- if (it == map_.end())
- {
- it = map_.emplace(
- key_type(parent_id, t),
- path_entry(parent_id, t, next_node_id++)
- ).first;
- wildcard_map_.emplace(parent_id, std::addressof(it->second));
- if (next_node_id == max_node_id)
- {
- throw_max_stored_topics();
- }
- }
- else
- {
- it->second.increase_count();
- }
- parent = it;
- return true;
- }
- inline std::vector<map_iterator> find_topic(std::string_view topic_name) ASIO2_NO_THREAD_SAFETY_ANALYSIS
- {
- std::vector<map_iterator> path;
- map_iterator parent = get_root();
- topic_filter_tokenizer(topic_name, [this, &path, &parent](std::string_view t) mutable
- {
- return this->find_topic_subfun(path, parent, t);
- });
- return path;
- }
- inline bool find_topic_subfun(std::vector<map_iterator>& path, map_iterator& parent, std::string_view t)
- ASIO2_NO_THREAD_SAFETY_ANALYSIS
- {
- auto it = map_.find(key_type(parent->second.id, t));
- if (it == map_.end())
- {
- path.clear();
- return false;
- }
- path.push_back(it);
- parent = it;
- return true;
- }
- // Match all underlying topics when a hash entry is matched
- // perform a breadth-first iteration over all items in the tree below
- template<typename Output>
- inline void match_hash_entries(std::size_t parent_id, Output&& callback, bool ignore_system)
- ASIO2_NO_THREAD_SAFETY_ANALYSIS
- {
- std::deque<std::size_t> ids;
- ids.push_back(parent_id);
- std::deque<std::size_t> new_ids;
- while (!ids.empty())
- {
- new_ids.resize(0);
- for (auto it : ids)
- {
- auto range = wildcard_map_.equal_range(it);
- for (auto i = range.first; i != range.second && i->second->parent_id == it; ++i)
- {
- // Should we ignore system matches
- if (!ignore_system || i->second->name.empty() || i->second->name[0] != '$')
- {
- if (i->second->value)
- {
- callback(i->second->value.value());
- }
- new_ids.push_back(i->second->id);
- }
- }
- }
- // Ignore system only on first level
- ignore_system = false;
- std::swap(ids, new_ids);
- }
- }
- // Find all topics that match the specified topic filter
- template<typename Output>
- inline void find_match(std::string_view topic_filter, Output&& callback) ASIO2_NO_THREAD_SAFETY_ANALYSIS
- {
- std::deque<map_iterator> iters;
- iters.push_back(get_root());
- std::deque<map_iterator> new_iters;
- topic_filter_tokenizer(topic_filter,
- [this, &iters, &new_iters, &callback](std::string_view t) mutable
- {
- return this->find_match_subfun(iters, new_iters, callback, t);
- });
- for (auto& it : iters)
- {
- if (it->second.value)
- {
- callback(it->second.value.value());
- }
- }
- }
- template<typename Output>
- inline bool find_match_subfun(
- std::deque<map_iterator>& iters, std::deque<map_iterator>& new_iters, Output& callback, std::string_view t)
- ASIO2_NO_THREAD_SAFETY_ANALYSIS
- {
- new_iters.resize(0);
- for (auto& it : iters)
- {
- std::size_t parent_id = it->second.id;
- if (t == std::string_view("+"))
- {
- auto range = wildcard_map_.equal_range(parent_id);
- for (auto i = range.first; i != range.second && i->second->parent_id == parent_id; ++i)
- {
- if (parent_id != root_node_id || i->second->name.empty() || i->second->name[0] != '$')
- {
- auto j = map_.find(key_type(i->second->parent_id, i->second->name));
- ASIO2_ASSERT(j != map_.end());
- new_iters.push_back(j);
- }
- else
- {
- break;
- }
- }
- }
- else if (t == std::string_view("#"))
- {
- match_hash_entries(parent_id, callback, parent_id == root_node_id);
- return false;
- }
- else
- {
- map_iterator i = map_.find(key_type(parent_id, t));
- if (i != map_.end())
- {
- new_iters.push_back(i);
- }
- }
- }
- std::swap(new_iters, iters);
- return !iters.empty();
- }
- // Remove a value at the specified topic name
- inline std::size_t erase_topic(std::string_view topic_name) ASIO2_NO_THREAD_SAFETY_ANALYSIS
- {
- auto path = find_topic(topic_name);
- // Reset the value if there is actually something stored
- if (!path.empty() && path.back()->second.value)
- {
- path.back()->second.value = std::nullopt;
- // Do iterators stay valid when erasing ? I think they do ?
- for (auto iter : path)
- {
- iter->second.decrease_count();
- if (iter->second.count == 0)
- {
- auto range = wildcard_map_.equal_range(std::get<0>(iter->first));
- for (auto it = range.first; it != range.second; ++it)
- {
- if (std::addressof(iter->second) == it->second)
- {
- wildcard_map_.erase(it);
- break;
- }
- }
- map_.erase(iter);
- }
- }
- return 1;
- }
- return 0;
- }
- // Increase the number of topics for this path
- inline void increase_topics(std::vector<map_iterator> const &path) ASIO2_NO_THREAD_SAFETY_ANALYSIS
- {
- for (auto& it : path)
- {
- it->second.increase_count();
- }
- }
- // Increase the map size (total number of topics stored)
- inline void increase_map_size() ASIO2_NO_THREAD_SAFETY_ANALYSIS
- {
- if (map_size == (std::numeric_limits<decltype(map_size)>::max)())
- {
- throw_max_stored_topics();
- }
- ++map_size;
- }
- // Decrease the map size (total number of topics stored)
- inline void decrease_map_size(std::size_t count) ASIO2_NO_THREAD_SAFETY_ANALYSIS
- {
- ASIO2_ASSERT(map_size >= count);
- map_size -= count;
- }
- inline void init_map() ASIO2_NO_THREAD_SAFETY_ANALYSIS
- {
- map_size = 0;
- // Create the root node
- auto it = map_.emplace(key_type(root_parent_id, ""),
- path_entry(root_parent_id, "", root_node_id)).first;
- next_node_id = root_node_id + 1;
- //
- wildcard_map_.emplace(root_parent_id, std::addressof(it->second));
- }
- inline map_iterator get_root() ASIO2_NO_THREAD_SAFETY_ANALYSIS
- {
- return map_.find(key_type(root_parent_id, ""));
- }
- public:
- retained_messages()
- {
- init_map();
- }
- // Insert a value at the specified topic name
- template<typename V>
- inline std::size_t insert_or_assign(std::string_view topic_name, V&& value)
- {
- asio2::unique_locker g(this->retained_mutex_);
- auto path = this->find_topic(topic_name);
- if (path.empty())
- {
- auto new_topic = this->create_topic(topic_name);
- new_topic->second.value.emplace(std::forward<V>(value));
- increase_map_size();
- return 1;
- }
- if (!path.back()->second.value)
- {
- this->increase_topics(path);
- path.back()->second.value.emplace(std::forward<V>(value));
- increase_map_size();
- return 1;
- }
- // replace the value
- path.back()->second.value.emplace(std::forward<V>(value));
- return 0;
- }
- // Find all stored topics that math the specified topic_filter
- template<typename Output>
- inline void find(std::string_view topic_filter, Output&& callback)
- {
- asio2::shared_locker g(this->retained_mutex_);
- find_match(topic_filter, std::forward<Output>(callback));
- }
- // Remove a stored value at the specified topic name
- inline std::size_t erase(std::string_view topic_name)
- {
- asio2::unique_locker g(this->retained_mutex_);
- auto result = erase_topic(topic_name);
- decrease_map_size(result);
- return result;
- }
- inline std::size_t size() const
- {
- asio2::shared_locker g(this->retained_mutex_);
- return map_size;
- }
- inline std::size_t internal_size() const
- {
- asio2::shared_locker g(this->retained_mutex_);
- return map_.size();
- }
- // Clear all topics
- inline void clear()
- {
- asio2::unique_locker g(this->retained_mutex_);
- map_.clear();
- wildcard_map_.clear();
- init_map();
- }
- // Dump debug information
- template<typename Output>
- inline void dump(Output &out)
- {
- asio2::shared_locker g(this->retained_mutex_);
- for (auto const&[k, v] : map_)
- {
- std::ignore = k;
- out << v.parent_id << " " << v.name << " " << (v.value ? "init" : "-")
- << " " << v.count << std::endl;
- }
- }
- };
- // A collection of messages that have been retained in
- // case clients add a new subscription to the associated topics.
- struct rmnode
- {
- template<class Message>
- explicit rmnode(Message&& msg, std::shared_ptr<asio::steady_timer> expiry_timer)
- : message(std::forward<Message>(msg))
- , message_expiry_timer(std::move(expiry_timer))
- {
- }
- mqtt::message message;
- std::shared_ptr<asio::steady_timer> message_expiry_timer;
- };
- }
- #endif // !__ASIO2_MQTT_RETAINED_MESSAGES_HPP__
|