| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543 | /* * Copyright (c) 2017-2023 zhllxt * * author   : zhllxt * email    : 37792738@qq.com *  * Distributed under the Boost Software License, Version 1.0. (See accompanying * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) */#ifndef __ASIO2_MQTT_AOP_CONNECT_HPP__#define __ASIO2_MQTT_AOP_CONNECT_HPP__#if defined(_MSC_VER) && (_MSC_VER >= 1200)#pragma once#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)#include <asio2/base/iopool.hpp>#include <asio2/base/detail/function_traits.hpp>#include <asio2/base/detail/util.hpp>#include <asio2/mqtt/message.hpp>namespace asio2::detail{	template<class caller_t, class args_t>	class mqtt_aop_connect	{		friend caller_t;	protected:		template<class Response>		inline void _set_connack_reason_code_with_invalid_id(error_code& ec, Response& rep)		{			using response_type = typename detail::remove_cvref_t<Response>;			ec = mqtt::make_error_code(mqtt::error::client_identifier_not_valid);			if constexpr /**/ (std::is_same_v<response_type, mqtt::v3::connack>)			{				rep.reason_code(mqtt::v3::connect_reason_code::identifier_rejected);			}			else if constexpr (std::is_same_v<response_type, mqtt::v4::connack>)			{				rep.reason_code(mqtt::v4::connect_reason_code::identifier_rejected);			}			else if constexpr (std::is_same_v<response_type, mqtt::v5::connack>)			{				rep.reason_code(mqtt::error::client_identifier_not_valid);			}			else			{				static_assert(detail::always_false_v<Response>);			}		}		template<class Response>		inline void _set_connack_reason_code_with_bad_user_name_or_password(error_code& ec, Response& rep)		{			using response_type = typename detail::remove_cvref_t<Response>;			ec = mqtt::make_error_code(mqtt::error::bad_user_name_or_password);			if constexpr /**/ (std::is_same_v<response_type, mqtt::v3::connack>)			{				rep.reason_code(mqtt::v3::connect_reason_code::bad_user_name_or_password);			}			else if constexpr (std::is_same_v<response_type, mqtt::v4::connack>)			{				rep.reason_code(mqtt::v4::connect_reason_code::bad_user_name_or_password);			}			else if constexpr (std::is_same_v<response_type, mqtt::v5::connack>)			{				rep.reason_code(mqtt::error::bad_user_name_or_password);			}			else			{				static_assert(detail::always_false_v<Response>);			}		}		template<class Response>		inline void _set_connack_reason_code_with_not_authorized(error_code& ec, Response& rep)		{			using response_type = typename detail::remove_cvref_t<Response>;			ec = mqtt::make_error_code(mqtt::error::not_authorized);			if constexpr /**/ (std::is_same_v<response_type, mqtt::v3::connack>)			{				rep.reason_code(mqtt::v3::connect_reason_code::not_authorized);			}			else if constexpr (std::is_same_v<response_type, mqtt::v4::connack>)			{				rep.reason_code(mqtt::v4::connect_reason_code::not_authorized);			}			else if constexpr (std::is_same_v<response_type, mqtt::v5::connack>)			{				rep.reason_code(mqtt::error::not_authorized);			}			else			{				static_assert(detail::always_false_v<Response>);			}		}		template<class Message, class Response>		inline void _check_connect_client_id(error_code& ec, caller_t* caller, Message& msg, Response& rep)		{			using message_type  [[maybe_unused]] = typename detail::remove_cvref_t<Message>;			using response_type [[maybe_unused]] = typename detail::remove_cvref_t<Response>;			std::string_view client_id = msg.client_id();			if constexpr /**/ (std::is_same_v<response_type, mqtt::v3::connack>)			{				// The first UTF-encoded string. The Client Identifier (Client ID) is between 1 and 23				// characters long, and uniquely identifies the client to the server. It must be unique				// across all clients connecting to a single server, and is the key in handling Message				// IDs messages with QoS levels 1 and 2. If the Client ID contains more than 23 characters,				// the server responds to the CONNECT message with a CONNACK return code 2: Identifier Rejected.				if (client_id.size() < static_cast<std::string_view::size_type>(1) ||					client_id.size() > static_cast<std::string_view::size_type>(23))					return _set_connack_reason_code_with_invalid_id(ec, rep);			}			else if constexpr (std::is_same_v<response_type, mqtt::v4::connack>)			{				// If the Client supplies a zero-byte ClientId, the Client MUST also set CleanSession 				// to 1[MQTT-3.1.3-7].				// If the Client supplies a zero-byte ClientId with CleanSession set to 0, the Server 				// MUST respond to the CONNECT Packet with a CONNACK return code 0x02 (Identifier rejected) 				// and then close the Network Connection[MQTT-3.1.3-8].				// If the Server rejects the ClientId it MUST respond to the CONNECT Packet with a CONNACK				// return code 0x02 (Identifier rejected) and then close the Network Connection[MQTT-3.1.3-9].				if (client_id.empty())				{					if (msg.clean_session() == false)						return _set_connack_reason_code_with_invalid_id(ec, rep);					// assign a unique ClientId to that Client.					msg.client_id(std::to_string(reinterpret_cast<std::size_t>(caller)));				}			}			else if constexpr (std::is_same_v<response_type, mqtt::v5::connack>)			{				// and MUST return the Assigned Client Identifier in the CONNACK packet [MQTT-3.1.3-7].				if (client_id.empty())				{					// assign a unique ClientId to that Client.					msg.client_id(std::to_string(reinterpret_cast<std::size_t>(caller)));					rep.properties().add(mqtt::v5::assigned_client_identifier(msg.client_id()));				}			}			else			{				static_assert(detail::always_false_v<Response>);			}			// check whether uniquely identifier		}		// must be server		template<class Message, class Response, bool IsClient = args_t::is_client>		inline std::enable_if_t<!IsClient, void>		_before_connect_callback(			error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,			Message& msg, Response& rep)		{			detail::ignore_unused(ec, caller_ptr, caller, om, msg, rep);			using message_type  [[maybe_unused]] = typename detail::remove_cvref_t<Message>;			using response_type [[maybe_unused]] = typename detail::remove_cvref_t<Response>;			// if started already and recvd connect message again, disconnect			state_t expected = state_t::started;			if (caller->state_.compare_exchange_strong(expected, state_t::started))			{				ec = mqtt::make_error_code(mqtt::error::malformed_packet);				rep.set_send_flag(false);				return;			}			// mqtt auth			if (caller->security().enabled())			{				std::optional<std::string> username;				if (caller->get_preauthed_username())				{					if (caller->security().login_cert(caller->get_preauthed_username().value()))					{						username = caller->get_preauthed_username();					}				}				else if (msg.has_username() && msg.has_password())				{					username = caller->security().login(msg.username(), msg.password());				}				else				{					username = caller->security().login_anonymous();				}				// If login fails, try the unauthenticated user				if (!username)				{					username = caller->security().login_unauthenticated();				}				if (!username)				{					ASIO2_LOG_TRACE("User failed to login: {}",						(msg.has_username() ? msg.username() : "anonymous user"));					if (msg.has_username() && msg.has_password())						return _set_connack_reason_code_with_bad_user_name_or_password(ec, rep);					return _set_connack_reason_code_with_not_authorized(ec, rep);				}			}			// check client id			if (_check_connect_client_id(ec, caller, msg, rep); ec)				return;			// set keepalive timeout			// If the Keep Alive value is non-zero and the Server does not receive a Control Packet from			// the Client within one and a half times the Keep Alive time period, it MUST disconnect the			// Network Connection to the Client as if the network had failed [MQTT-3.1.2-24].			// A Keep Alive value of zero (0) has the effect of turning off the keep alive mechanism.			// This means that, in this case, the Server is not required to disconnect the Client on the			// grounds of inactivity. Note that a Server is permitted to disconnect a Client that it			// determines to be inactive or non-responsive at any time, regardless of the Keep Alive value			// provided by that Client.			mqtt::two_byte_integer::value_type keepalive = msg.keep_alive();			if (keepalive == 0)			{				caller->set_silence_timeout(std::chrono::milliseconds(detail::tcp_silence_timeout));			}			else			{				caller->set_silence_timeout(std::chrono::milliseconds(keepalive * 1500));			}			// fill the reason code with 0.			rep.reason_code(0);			caller->connect_message_ = msg;			// If a client with the same Client ID is already connected to the server, the "older" client			// must be disconnected by the server before completing the CONNECT flow of the new client.			// If CleanSession is set to 0, the Server MUST resume communications with the Client based on state			// from the current Session (as identified by the Client identifier).			// If there is no Session associated with the Client identifier the Server MUST create a new Session.			// The Client and Server MUST store the Session after the Client and Server are disconnected [MQTT-3.1.2-4].			// After the disconnection of a Session that had CleanSession set to 0, the Server MUST store further			// QoS 1 and QoS 2 messages that match any subscriptions that the client had at the time of disconnection			// as part of the Session state [MQTT-3.1.2-5]. It MAY also store QoS 0 messages that meet the same criteria.			// If CleanSession is set to 1, the Client and Server MUST discard any previous Session and start a new one.			// This Session lasts as long as the Network Connection.State data associated with this Session MUST NOT be			// reused in any subsequent Session[MQTT-3.1.2-6].			// If a CONNECT packet is received with Clean Start is set to 1, the Client and Server MUST discard any			// existing Session and start a new Session [MQTT-3.1.2-4]. Consequently, the Session Present flag in			// CONNACK is always set to 0 if Clean Start is set to 1.			// If a CONNECT packet is received with Clean Start set to 0 and there is a Session associated with the			// Client Identifier, the Server MUST resume communications with the Client based on state from the existing			// Session[MQTT-3.1.2-5].If a CONNECT packet is received with Clean Start set to 0 and there is no Session			// associated with the Client Identifier, the Server MUST create a new Session[MQTT-3.1.2-6].			std::shared_ptr<caller_t> session_ptr = caller->mqtt_sessions().find_mqtt_session(caller->client_id());			// If the Server accepts a connection with Clean Start set to 1, the Server MUST set Session			// Present to 0 in the CONNACK packet in addition to setting a 0x00 (Success) Reason Code in			// the CONNACK packet [MQTT-3.2.2-2].			if (msg.clean_session())				rep.session_present(false);			// If the Server accepts a connection with Clean Start set to 0 and the Server has Session 			// State for the ClientID, it MUST set Session Present to 1 in the CONNACK packet, otherwise			// it MUST set Session Present to 0 in the CONNACK packet. In both cases it MUST set a 0x00			// (Success) Reason Code in the CONNACK packet [MQTT-3.2.2-3].			else				rep.session_present(session_ptr != nullptr);			if (session_ptr == nullptr)			{				caller->mqtt_sessions().push_mqtt_session(caller->client_id(), caller_ptr);				session_ptr = caller_ptr;			}			else			{				if (session_ptr->is_started())				{					// send will message					if (!session_ptr->connect_message_.empty())					{						auto f = [caller_ptr, caller](auto& conn) mutable						{							if (!conn.has_will())								return;							// note : why v5 ?							mqtt::v5::publish pub;							pub.qos(conn.will_qos());							pub.retain(conn.will_retain());							pub.topic_name(conn.will_topic());							pub.payload(conn.will_payload());							caller->push_event(							[caller_ptr, caller, pub = std::move(pub)]							(event_queue_guard<caller_t> g) mutable							{								detail::ignore_unused(g);								caller->_multicast_publish(caller_ptr, caller, std::move(pub), std::string{});							});						};						if /**/ (std::holds_alternative<mqtt::v3::connect>(session_ptr->connect_message_.base()))						{							mqtt::v3::connect* p = session_ptr->connect_message_.template get_if<mqtt::v3::connect>();							f(*p);						}						else if (std::holds_alternative<mqtt::v4::connect>(session_ptr->connect_message_.base()))						{							mqtt::v4::connect* p = session_ptr->connect_message_.template get_if<mqtt::v4::connect>();							f(*p);						}						else if (std::holds_alternative<mqtt::v5::connect>(session_ptr->connect_message_.base()))						{							mqtt::v5::connect* p = session_ptr->connect_message_.template get_if<mqtt::v5::connect>();							f(*p);						}					}					// disconnect session					// In previous versions, the mutex is a global variable, when code reached here,					// the mutex status is locked already, so if we call session_ptr->stop()					// directly, the stop maybe blocked forever, beacuse the session1 stop maybe					// called in the session2 thread, and in the handle disconnect funcion of mqtt					// session, the global mutex is required to lock again, so it caused deaklock,					// to solve this problem, we can use session->post([](){session->stop()}).					// but now, we change the mutex from global variable to member variable for each					// mqtt class, so now, it won't caused deaklock, even we only use session->stop,					// but we still use session->post([](){session->stop()}), to enhanced stability.					session_ptr->post([session_ptr]() mutable { session_ptr->stop(); });					// 					bool clean_session = false;					if /**/ (std::holds_alternative<mqtt::v3::connect>(session_ptr->connect_message_.base()))					{						clean_session = session_ptr->connect_message_.template get_if<mqtt::v3::connect>()->clean_session();					}					else if (std::holds_alternative<mqtt::v4::connect>(session_ptr->connect_message_.base()))					{						clean_session = session_ptr->connect_message_.template get_if<mqtt::v4::connect>()->clean_session();					}					else if (std::holds_alternative<mqtt::v5::connect>(session_ptr->connect_message_.base()))					{						clean_session = session_ptr->connect_message_.template get_if<mqtt::v5::connect>()->clean_session();					}					if (clean_session)					{					}					else					{					}					if (msg.clean_session())					{					}					else					{						// copy session state from old session to new session					}				}				else				{				}				// replace old session to new session				session_ptr = caller_ptr;			}		}		template<class Message, class Response, bool IsClient = args_t::is_client>		inline std::enable_if_t<IsClient, void>		_before_connect_callback(			error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,			Message& msg, Response& rep)		{			detail::ignore_unused(ec, caller_ptr, caller, om, msg, rep);			ASIO2_ASSERT(false && "client should't recv the connect message");		}		// must be server		inline void _before_user_callback_impl(			error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,			mqtt::v3::connect& msg, mqtt::v3::connack& rep)		{			if (_before_connect_callback(ec, caller_ptr, caller, om, msg, rep); ec)				return;		}		// must be server		inline void _before_user_callback_impl(			error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,			mqtt::v4::connect& msg, mqtt::v4::connack& rep)		{			if (_before_connect_callback(ec, caller_ptr, caller, om, msg, rep); ec)				return;		}		// must be server		inline void _before_user_callback_impl(			error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,			mqtt::v5::connect& msg, mqtt::v5::connack& rep)		{			if (_before_connect_callback(ec, caller_ptr, caller, om, msg, rep); ec)				return;		}		// must be server		inline void _before_user_callback_impl(			error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,			mqtt::v5::connect& msg, mqtt::v5::auth& rep)		{			detail::ignore_unused(ec, caller_ptr, caller, om, msg, rep);			//if constexpr (caller_t::is_session())			//{			//	// if started already and recvd connect message again, disconnect			//	state_t expected = state_t::started;			//	if (caller->state_.compare_exchange_strong(expected, state_t::started))			//	{			//		ec = mqtt::make_error_code(mqtt::error::malformed_packet);			//		rep.set_send_flag(false);			//		return;			//	}			//	caller->connect_message_ = msg;			//	std::shared_ptr<caller_t> session_ptr = caller->mqtt_sessions().find_mqtt_session(msg.client_id());			//	if (session_ptr != nullptr)			//	{			//	}			//}			//else			//{			//	std::ignore = true;			//}		}		inline void _after_user_callback_impl(			error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,			mqtt::v3::connect& msg, mqtt::v3::connack& rep)		{			detail::ignore_unused(ec, caller_ptr, caller, om, msg, rep);			// if already has error, return			if (ec)				return;			// If a client with the same Client ID is already connected to the server, the "older" client			// must be disconnected by the server before completing the CONNECT flow of the new client.			switch(rep.reason_code())			{			case mqtt::v3::connect_reason_code::success                       : ec = mqtt::make_error_code(mqtt::error::success                     ); break;			case mqtt::v3::connect_reason_code::unacceptable_protocol_version : ec = mqtt::make_error_code(mqtt::error::unsupported_protocol_version); break;			case mqtt::v3::connect_reason_code::identifier_rejected			  : ec = mqtt::make_error_code(mqtt::error::client_identifier_not_valid ); break;			case mqtt::v3::connect_reason_code::server_unavailable			  : ec = mqtt::make_error_code(mqtt::error::server_unavailable          ); break;			case mqtt::v3::connect_reason_code::bad_user_name_or_password	  : ec = mqtt::make_error_code(mqtt::error::bad_user_name_or_password   ); break;			case mqtt::v3::connect_reason_code::not_authorized				  : ec = mqtt::make_error_code(mqtt::error::not_authorized              ); break;			default                                                           : ec = mqtt::make_error_code(mqtt::error::malformed_packet            ); break;			}		}		inline void _after_user_callback_impl(			error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,			mqtt::v4::connect& msg, mqtt::v4::connack& rep)		{			detail::ignore_unused(ec, caller_ptr, caller, om, msg, rep);			// if already has error, return			if (ec)				return;			switch(rep.reason_code())			{			case mqtt::v4::connect_reason_code::success						  : ec = mqtt::make_error_code(mqtt::error::success                     ); break;			case mqtt::v4::connect_reason_code::unacceptable_protocol_version : ec = mqtt::make_error_code(mqtt::error::unsupported_protocol_version); break;			case mqtt::v4::connect_reason_code::identifier_rejected			  : ec = mqtt::make_error_code(mqtt::error::client_identifier_not_valid ); break;			case mqtt::v4::connect_reason_code::server_unavailable			  : ec = mqtt::make_error_code(mqtt::error::server_unavailable          ); break;			case mqtt::v4::connect_reason_code::bad_user_name_or_password	  : ec = mqtt::make_error_code(mqtt::error::bad_user_name_or_password   ); break;			case mqtt::v4::connect_reason_code::not_authorized				  : ec = mqtt::make_error_code(mqtt::error::not_authorized              ); break;			default                                                           : ec = mqtt::make_error_code(mqtt::error::malformed_packet            ); break;			}		}		inline void _after_user_callback_impl(			error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,			mqtt::v5::connect& msg, mqtt::v5::connack& rep)		{			detail::ignore_unused(ec, caller_ptr, caller, om, msg, rep);			// if already has error, return			if (ec)				return;			ec = mqtt::make_error_code(static_cast<mqtt::error>(rep.reason_code()));			if (!ec)			{				if (rep.properties().has<mqtt::v5::topic_alias_maximum>() == false)					rep.properties().add(mqtt::v5::topic_alias_maximum{ caller->topic_alias_maximum() });			}		}		inline void _after_user_callback_impl(			error_code& ec, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller, mqtt::message& om,			mqtt::v5::connect& msg, mqtt::v5::auth& rep)		{			detail::ignore_unused(ec, caller_ptr, caller, om, msg, rep);			// if already has error, return			if (ec)				return;		}	};}#endif // !__ASIO2_MQTT_AOP_CONNECT_HPP__
 |