| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662 | /* * Copyright (c) 2017-2023 zhllxt * * author   : zhllxt * email    : 37792738@qq.com *  * Distributed under the Boost Software License, Version 1.0. (See accompanying * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) */#ifndef __ASIO2_MQTT_SUBSCRIBE_ROUTER_HPP__#define __ASIO2_MQTT_SUBSCRIBE_ROUTER_HPP__#if defined(_MSC_VER) && (_MSC_VER >= 1200)#pragma once#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)#include <asio2/mqtt/detail/mqtt_message_router.hpp>namespace asio2::detail{	ASIO2_CLASS_FORWARD_DECLARE_BASE;	ASIO2_CLASS_FORWARD_DECLARE_TCP_BASE;	ASIO2_CLASS_FORWARD_DECLARE_TCP_SERVER;	ASIO2_CLASS_FORWARD_DECLARE_TCP_SESSION;	ASIO2_CLASS_FORWARD_DECLARE_TCP_CLIENT;	template<class derived_t, class args_t>	class mqtt_subscribe_router_t	{		friend derived_t;		ASIO2_CLASS_FRIEND_DECLARE_BASE;		ASIO2_CLASS_FRIEND_DECLARE_TCP_BASE;		ASIO2_CLASS_FRIEND_DECLARE_TCP_SERVER;		ASIO2_CLASS_FRIEND_DECLARE_TCP_SESSION;		ASIO2_CLASS_FRIEND_DECLARE_TCP_CLIENT;	public:		using self = mqtt_subscribe_router_t<derived_t, args_t>;		using args_type = args_t;		using subnode_type = typename args_type::template subnode<derived_t>;		using key_type = typename mqtt_message_router_t<derived_t, args_t>::key_type;		struct hasher		{			inline std::size_t operator()(key_type const& pair) const noexcept			{				std::size_t v = asio2::detail::fnv1a_hash<std::size_t>(					(const unsigned char*)(std::addressof(pair.first)), sizeof(pair.first));				return asio2::detail::fnv1a_hash<std::size_t>(v,					(const unsigned char*)(std::addressof(pair.second)), sizeof(pair.second));			}		};		/**		 * @brief constructor		 */		mqtt_subscribe_router_t() = default;		/**		 * @brief destructor		 */		~mqtt_subscribe_router_t() = default;		template<class ReturnT = void, class QosOrInt, class FunctionT>		typename std::enable_if_t<			std::is_same_v<detail::remove_cvref_t<QosOrInt>, mqtt::qos_type> ||			std::is_integral_v<detail::remove_cvref_t<QosOrInt>>, ReturnT>		subscribe(std::string topic_filter, QosOrInt qos, FunctionT&& callback)		{			derived_t& derive = static_cast<derived_t&>(*this);			mqtt::version ver = derive.version();			auto pid = derive.idmgr_.get();			if /**/ (ver == mqtt::version::v3)			{				mqtt::v3::subscribe msg;				msg.packet_id(pid);				msg.add_subscriptions(mqtt::subscription(std::move(topic_filter), qos));				return this->subscribe<ReturnT>(std::move(msg), std::forward<FunctionT>(callback));			}			else if (ver == mqtt::version::v4)			{				mqtt::v4::subscribe msg;				msg.packet_id(pid);				msg.add_subscriptions(mqtt::subscription(std::move(topic_filter), qos));				return this->subscribe<ReturnT>(std::move(msg), std::forward<FunctionT>(callback));			}			else if (ver == mqtt::version::v5)			{				mqtt::v5::subscribe msg;				msg.packet_id(pid);				msg.add_subscriptions(mqtt::subscription(std::move(topic_filter), qos));				return this->subscribe<ReturnT>(std::move(msg), std::forward<FunctionT>(callback));			}			else			{				derive.idmgr_.release(pid);				set_last_error(asio::error::invalid_argument);				ASIO2_ASSERT(false);				return derive.template _empty_result<ReturnT>();			}		}		template<class ReturnT = void, class Message, class FunctionT>		typename std::enable_if_t<			std::is_same_v<detail::remove_cvref_t<Message>, mqtt::v3::subscribe> || 			std::is_same_v<detail::remove_cvref_t<Message>, mqtt::v4::subscribe> ||			std::is_same_v<detail::remove_cvref_t<Message>, mqtt::v5::subscribe>, ReturnT>		subscribe(Message&& msg, FunctionT&& callback)		{			derived_t& derive = static_cast<derived_t&>(*this);			if (!derive.is_started())			{				set_last_error(asio::error::not_connected);				ASIO2_ASSERT(false);				return derive.template _empty_result<ReturnT>();			}			if (msg.subscriptions().data().empty())			{				set_last_error(asio::error::invalid_argument);				ASIO2_ASSERT(false);				return derive.template _empty_result<ReturnT>();			}			for (mqtt::subscription& sub : msg.subscriptions().data())			{				if (!mqtt::is_valid_qos(sub.qos()))				{					set_last_error(asio::error::invalid_argument);					ASIO2_ASSERT(false);					return derive.template _empty_result<ReturnT>();				}			}			clear_last_error();			[[maybe_unused]] key_type key = { msg.packet_type(), msg.packet_id() };			derive._dispatch_subscribe(std::forward<Message>(msg), std::forward<FunctionT>(callback));			if (derive.io_->running_in_this_thread())			{				return derive.template _in_progress<ReturnT>();			}			ASIO2_ASSERT(!derive.io_->running_in_this_thread());			if /**/ constexpr (std::is_same_v<ReturnT, void>)			{				return;			}			else if constexpr (std::is_same_v<ReturnT, bool>)			{				return derive._do_router(key, [](auto& msg) mutable				{					if constexpr (mqtt::is_suback_message<decltype(msg)>())					{						for (auto&& reason : msg.reason_codes().data())						{							if (!mqtt::is_valid_qos(reason.value()))							{								return false;							}						}						return true;					}					else					{						return false;					}				});			}			else if constexpr (std::is_same_v<ReturnT, mqtt::message>)			{				return derive._do_router(key);			}			else			{				static_assert(detail::always_false_v<ReturnT>);			}		}		template<class ReturnT = void>		ReturnT unsubscribe(std::string topic_filter)		{			derived_t& derive = static_cast<derived_t&>(*this);			mqtt::version ver = derive.version();			auto pid = derive.idmgr_.get();			if /**/ (ver == mqtt::version::v3)			{				return this->unsubscribe<ReturnT>(mqtt::v3::unsubscribe(pid, std::move(topic_filter)));			}			else if (ver == mqtt::version::v4)			{				return this->unsubscribe<ReturnT>(mqtt::v4::unsubscribe(pid, std::move(topic_filter)));			}			else if (ver == mqtt::version::v5)			{				return this->unsubscribe<ReturnT>(mqtt::v5::unsubscribe(pid, std::move(topic_filter)));			}			else			{				derive.idmgr_.release(pid);				set_last_error(asio::error::invalid_argument);				ASIO2_ASSERT(false);				return derive.template _empty_result<ReturnT>();			}		}		template<class ReturnT = void, class Message>		typename std::enable_if_t<			std::is_same_v<detail::remove_cvref_t<Message>, mqtt::v3::unsubscribe> || 			std::is_same_v<detail::remove_cvref_t<Message>, mqtt::v4::unsubscribe> ||			std::is_same_v<detail::remove_cvref_t<Message>, mqtt::v5::unsubscribe>, ReturnT>		unsubscribe(Message&& msg)		{			derived_t& derive = static_cast<derived_t&>(*this);			if (!derive.is_started())			{				set_last_error(asio::error::not_connected);				ASIO2_ASSERT(false);				return derive.template _empty_result<ReturnT>();			}			if (msg.topic_filters().data().empty())			{				set_last_error(asio::error::invalid_argument);				ASIO2_ASSERT(false);				return derive.template _empty_result<ReturnT>();			}			clear_last_error();			[[maybe_unused]] key_type key = { msg.packet_type(), msg.packet_id() };			// must ensure the member variable is read write in the io_context thread.			// save the subscribed key and topic filters, beacuse if the unsubscribed			// is sucussed, we need remove the topics from the sub map.			derive.dispatch([&derive, key, topics = msg.topic_filters()]() mutable			{				if (derive.subs_map().get_subscribe_count() > 0)					derive.unsubscribed_topics_.emplace(key, std::move(topics));			});			derive.async_send(std::forward<Message>(msg), [&derive, key]() mutable			{				// if send data failed, we need remove the added key and topics from the map.				if (asio2::get_last_error())				{					derive.unsubscribed_topics_.erase(key);				}			});			if (derive.io_->running_in_this_thread())			{				return derive.template _in_progress<ReturnT>();			}			ASIO2_ASSERT(!derive.io_->running_in_this_thread());			if /**/ constexpr (std::is_same_v<ReturnT, void>)			{				return;			}			else if constexpr (std::is_same_v<ReturnT, bool>)			{				return derive._do_router(key, [](auto& msg) mutable				{					if constexpr (mqtt::is_unsuback_message<decltype(msg)>())					{						// UNSUBACK Payload : The Payload contains a list of Reason Codes.						// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901194						if constexpr (std::is_same_v<detail::remove_cvref_t<decltype(msg)>, mqtt::v5::unsuback>)						{							for (auto&& reason : msg.reason_codes().data())							{								mqtt::error e = static_cast<mqtt::error>(reason.value());								if (!(e == mqtt::error::success || e == mqtt::error::no_subscription_existed))								{									return false;								}							}							return true;						}						else						{							return true;						}					}					else					{						return false;					}				});			}			else if constexpr (std::is_same_v<ReturnT, mqtt::message>)			{				return derive._do_router(key);			}			else			{				static_assert(detail::always_false_v<ReturnT>);			}		}		template<class ReturnT = void, class QosOrInt>		ReturnT publish(std::string topic_name, std::string payload, QosOrInt qos)		{			derived_t& derive = static_cast<derived_t&>(*this);			mqtt::version ver = derive.version();			if /**/ (ver == mqtt::version::v3)			{				return this->publish<ReturnT>(mqtt::v3::publish(std::move(topic_name), std::move(payload), qos));			}			else if (ver == mqtt::version::v4)			{				return this->publish<ReturnT>(mqtt::v4::publish(std::move(topic_name), std::move(payload), qos));			}			else if (ver == mqtt::version::v5)			{				return this->publish<ReturnT>(mqtt::v5::publish(std::move(topic_name), std::move(payload), qos));			}			else			{				set_last_error(asio::error::invalid_argument);				ASIO2_ASSERT(false);				return derive.template _empty_result<ReturnT>();			}		}		template<class ReturnT = void, class Message>		typename std::enable_if_t<			std::is_same_v<detail::remove_cvref_t<Message>, mqtt::v3::publish> || 			std::is_same_v<detail::remove_cvref_t<Message>, mqtt::v4::publish> ||			std::is_same_v<detail::remove_cvref_t<Message>, mqtt::v5::publish>, ReturnT>		publish(Message&& msg)		{			derived_t& derive = static_cast<derived_t&>(*this);			if (!derive.is_started())			{				set_last_error(asio::error::not_connected);				ASIO2_ASSERT(false);				return derive.template _empty_result<ReturnT>();			}			if (msg.qos() > mqtt::qos_type::at_most_once && !msg.has_packet_id())			{				msg.packet_id(derive.idmgr_.get());			}			clear_last_error();			[[maybe_unused]] std::optional<key_type> key{};			if (msg.qos() > mqtt::qos_type::at_most_once && msg.has_packet_id())				key = { msg.packet_type(), msg.packet_id() };			derive.async_send(std::forward<Message>(msg));			// qos 0 don't need a response			if (!key.has_value())			{				return derive.template _empty_result<ReturnT>();			}			if (derive.io_->running_in_this_thread())			{				return derive.template _in_progress<ReturnT>();			}			ASIO2_ASSERT(!derive.io_->running_in_this_thread());			if /**/ constexpr (std::is_same_v<ReturnT, void>)			{				return;			}			else if constexpr (std::is_same_v<ReturnT, bool>)			{				return derive._do_bool_publish(key);			}			else if constexpr (std::is_same_v<ReturnT, mqtt::message>)			{				return derive._do_router(key.value());			}			else			{				static_assert(detail::always_false_v<ReturnT>);			}		}	protected:		template<class K>		bool _do_bool_publish(std::optional<K>& key)		{			derived_t& derive = static_cast<derived_t&>(*this);			return derive._do_router(key.value(), [](auto& msg) mutable			{				// qos 1 response				if constexpr (mqtt::is_puback_message<decltype(msg)>())				{					if constexpr (std::is_same_v<detail::remove_cvref_t<decltype(msg)>, mqtt::v5::puback>)					{						mqtt::error e = static_cast<mqtt::error>(msg.reason_code());						if (!(e == mqtt::error::success))						{							return false;						}						return true;					}					else					{						return true;					}				}				else if constexpr (mqtt::is_pubcomp_message<decltype(msg)>())				{					if constexpr (std::is_same_v<detail::remove_cvref_t<decltype(msg)>, mqtt::v5::pubcomp>)					{						mqtt::error e = static_cast<mqtt::error>(msg.reason_code());						if (!(e == mqtt::error::success))						{							return false;						}						return true;					}					else					{						return true;					}				}				else				{					return false;				}			});		}		template<class ReturnT>		inline ReturnT _empty_result()		{			if constexpr (std::is_same_v<ReturnT, void>)			{				return;			}			else			{				return ReturnT{};			}		}		template<class ReturnT>		inline ReturnT _in_progress()		{			if /**/ constexpr (std::is_same_v<ReturnT, void>)			{				set_last_error(asio::error::in_progress);				return;			}			else if constexpr (std::is_same_v<ReturnT, bool>)			{				set_last_error(asio::error::in_progress);				return true;			}			else if constexpr (std::is_same_v<ReturnT, mqtt::message>)			{				set_last_error(asio::error::in_progress);				return mqtt::message{};			}			else			{				static_assert(detail::always_false_v<ReturnT>);			}		}		/**		 * callback signature : bool (auto& msg)		 */		template<class KeyT, class FunctionT>		bool _do_router(KeyT key, FunctionT&& callback)		{			derived_t& derive = static_cast<derived_t&>(*this);			std::promise<bool> p;			std::future<bool> f = p.get_future();			derive._add_router(key, [&callback, p = std::move(p)](mqtt::message& m) mutable			{				std::visit([&callback, &p](auto& msg) mutable				{					p.set_value(callback(msg));				}, m.base());			});			std::future_status status = f.wait_for(derive.get_default_timeout());			if (status == std::future_status::ready)			{				derive._del_router(key);				return true;			}			set_last_error(asio::error::timed_out);			derive._del_router(key);			return false;		}		template<class KeyT>		mqtt::message _do_router(KeyT key)		{			derived_t& derive = static_cast<derived_t&>(*this);			std::shared_ptr<mqtt::message> r = std::make_shared<mqtt::message>();			std::promise<void> p;			std::future<void> f = p.get_future();			derive._add_router(key, [r, p = std::move(p)](mqtt::message& m) mutable			{				*r = m;				p.set_value();			});			std::future_status status = f.wait_for(derive.get_default_timeout());			if (status == std::future_status::ready)			{				derive._del_router(key);				return std::move(*r);			}			set_last_error(asio::error::timed_out);			derive._del_router(key);			return mqtt::message{};		}		template<class Message, class FunctionT>		inline void _dispatch_subscribe(Message&& msg, FunctionT&& callback)		{			derived_t& derive = static_cast<derived_t&>(*this);			derive.dispatch(			[&derive, msg = std::forward<Message>(msg), cb = std::forward<FunctionT>(callback)]() mutable			{				derive._do_subscribe(std::move(msg), std::move(cb));			});		}		template<class Message, class FunctionT>		void _do_subscribe(Message&& msg, FunctionT&& callback)		{			using message_type = typename detail::remove_cvref_t<Message>;			using fun_traits_type = function_traits<FunctionT>;			using arg0_type = typename std::remove_cv_t<std::remove_reference_t<				typename fun_traits_type::template args<0>::type>>;			derived_t& derive = static_cast<derived_t&>(*this);			ASIO2_ASSERT(derive.io_->running_in_this_thread());			mqtt::v5::properties_set props;			if constexpr (std::is_same_v<message_type, mqtt::v5::subscribe>)			{				props = msg.properties();			}			else			{				std::ignore = true;			}			std::vector<mqtt::subscription>& subs = msg.subscriptions().data();			for (std::size_t i = 0; i < subs.size(); ++i)			{				mqtt::subscription& sub = subs[i];				bool end = (i + 1 == subs.size());				subnode_type node{ derive.selfptr(), sub, end ? std::move(props) : props };				if constexpr (std::is_same_v<arg0_type, mqtt::message>)				{					node.callback = end ? std::forward<FunctionT>(callback) : callback;				}				else				{					node.callback = [cb = end ? std::forward<FunctionT>(callback) : callback]					(mqtt::message& msg) mutable					{						arg0_type* p = std::get_if<arg0_type>(std::addressof(msg.base()));						if (p)						{							cb(*p);						}					};				}				std::string_view share_name   = node.share_name();				std::string_view topic_filter = node.topic_filter();				auto[_1, inserted] = this->subs_map().insert_or_assign(topic_filter, "", std::move(node));				asio2::ignore_unused(share_name, topic_filter, _1, inserted);			}			derive.async_send(std::forward<Message>(msg));		}		inline mqtt::subscription_map<std::string_view, subnode_type>& subs_map() { return subs_map_; }	protected:		/// subscription information map		mqtt::subscription_map<std::string_view, subnode_type>      subs_map_;		/// don't need mutex, beacuse client only has one thread, we use post to ensure this		/// variable was read write in the client io_context thread.		std::unordered_map<key_type, mqtt::utf8_string_set, hasher> unsubscribed_topics_;	};}#endif // !__ASIO2_MQTT_SUBSCRIBE_ROUTER_HPP__
 |