| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969 | /* * Copyright (c) 2017-2023 zhllxt * * author   : zhllxt * email    : 37792738@qq.com *  * Distributed under the Boost Software License, Version 1.0. (See accompanying * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) */#ifndef __ASIO2_UDP_CAST_HPP__#define __ASIO2_UDP_CAST_HPP__#if defined(_MSC_VER) && (_MSC_VER >= 1200)#pragma once#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)#include <asio2/base/detail/push_options.hpp>#include <cstdint>#include <memory>#include <chrono>#include <functional>#include <atomic>#include <string>#include <string_view>#include <queue>#include <any>#include <future>#include <tuple>#include <unordered_map>#include <asio2/base/iopool.hpp>#include <asio2/base/listener.hpp>#include <asio2/base/detail/object.hpp>#include <asio2/base/detail/allocator.hpp>#include <asio2/base/detail/util.hpp>#include <asio2/base/detail/buffer_wrap.hpp>#include <asio2/base/detail/ecs.hpp>#include <asio2/base/impl/io_context_cp.hpp>#include <asio2/base/impl/thread_id_cp.hpp>#include <asio2/base/impl/alive_time_cp.hpp>#include <asio2/base/impl/user_data_cp.hpp>#include <asio2/base/impl/socket_cp.hpp>#include <asio2/base/impl/user_timer_cp.hpp>#include <asio2/base/impl/post_cp.hpp>#include <asio2/base/impl/event_queue_cp.hpp>#include <asio2/base/impl/condition_event_cp.hpp>#include <asio2/base/impl/connect_cp.hpp>#include <asio2/base/detail/linear_buffer.hpp>#include <asio2/udp/impl/udp_send_cp.hpp>#include <asio2/udp/impl/udp_send_op.hpp>//#include <asio2/component/socks/socks5_client_cp.hpp>//#include <asio2/proxy/socks5_client.hpp>namespace asio2::detail{	struct template_args_udp_cast : public udp_tag	{		using socket_t = asio::ip::udp::socket;		using buffer_t = asio2::linear_buffer;		//template<class derived_t>		//using socks5_client_t = asio2::socks5_tcp_client_t<derived_t>;		static constexpr std::size_t function_storage_size = 88;		static constexpr std::size_t allocator_storage_size = 256;	};	ASIO2_CLASS_FORWARD_DECLARE_BASE;	ASIO2_CLASS_FORWARD_DECLARE_UDP_BASE;	ASIO2_CLASS_FORWARD_DECLARE_UDP_CLIENT;	template<class derived_t, class args_t = template_args_udp_cast>	class udp_cast_impl_t		: public object_t          <derived_t        >		, public iopool_cp         <derived_t, args_t>		, public io_context_cp     <derived_t, args_t>		, public thread_id_cp      <derived_t, args_t>		, public event_queue_cp    <derived_t, args_t>		, public user_data_cp      <derived_t, args_t>		, public alive_time_cp     <derived_t, args_t>		, public socket_cp         <derived_t, args_t>		, public user_timer_cp     <derived_t, args_t>		, public post_cp           <derived_t, args_t>		, public condition_event_cp<derived_t, args_t>		, public udp_send_cp       <derived_t, args_t>		, public udp_send_op       <derived_t, args_t>		, public connect_cp_member_variables<derived_t, args_t, false>		//, public socks5_client_cp  <derived_t, args_t>		, public udp_tag		, public cast_tag	{		ASIO2_CLASS_FRIEND_DECLARE_BASE;		ASIO2_CLASS_FRIEND_DECLARE_UDP_BASE;		ASIO2_CLASS_FRIEND_DECLARE_UDP_CLIENT;	public:		using super = object_t       <derived_t        >;		using self  = udp_cast_impl_t<derived_t, args_t>;		using iopoolcp = iopool_cp   <derived_t, args_t>;		using args_type   = args_t;		using buffer_type = typename args_t::buffer_t;		/**		 * @brief constructor		 * @throws maybe throw exception "Too many open files" (exception code : 24)		 * asio::error::no_descriptors - Too many open files		 */		explicit udp_cast_impl_t(			std::size_t init_buf_size = udp_frame_size,			std::size_t max_buf_size  = max_buffer_size,			std::size_t concurrency   = 1		)			: super()			, iopool_cp         <derived_t, args_t>(concurrency)			, io_context_cp     <derived_t, args_t>(iopoolcp::_get_io(0))			, event_queue_cp    <derived_t, args_t>()			, user_data_cp      <derived_t, args_t>()			, alive_time_cp     <derived_t, args_t>()			, socket_cp         <derived_t, args_t>(iopoolcp::_get_io(0)->context())			, user_timer_cp     <derived_t, args_t>()			, post_cp           <derived_t, args_t>()			, condition_event_cp<derived_t, args_t>()			, udp_send_cp       <derived_t, args_t>()			, udp_send_op       <derived_t, args_t>()			, rallocator_()			, wallocator_()			, listener_  ()			, buffer_    (init_buf_size, max_buf_size)		{		}		template<class Scheduler, std::enable_if_t<!std::is_integral_v<detail::remove_cvref_t<Scheduler>>, int> = 0>		explicit udp_cast_impl_t(			std::size_t init_buf_size,			std::size_t max_buf_size,			Scheduler && scheduler		)			: super()			, iopool_cp         <derived_t, args_t>(std::forward<Scheduler>(scheduler))			, io_context_cp     <derived_t, args_t>(iopoolcp::_get_io(0))			, event_queue_cp    <derived_t, args_t>()			, user_data_cp      <derived_t, args_t>()			, alive_time_cp     <derived_t, args_t>()			, socket_cp         <derived_t, args_t>(iopoolcp::_get_io(0)->context())			, user_timer_cp     <derived_t, args_t>()			, post_cp           <derived_t, args_t>()			, condition_event_cp<derived_t, args_t>()			, udp_send_cp       <derived_t, args_t>()			, udp_send_op       <derived_t, args_t>()			, rallocator_()			, wallocator_()			, listener_  ()			, buffer_    (init_buf_size, max_buf_size)		{		}		template<class Scheduler, std::enable_if_t<!std::is_integral_v<detail::remove_cvref_t<Scheduler>>, int> = 0>		explicit udp_cast_impl_t(Scheduler&& scheduler)			: udp_cast_impl_t(udp_frame_size, max_buffer_size, std::forward<Scheduler>(scheduler))		{		}		/**		 * @brief destructor		 */		~udp_cast_impl_t()		{			this->stop();		}		/**		 * @brief start the udp cast		 * @param host - A string identifying a location. May be a descriptive name or		 * a numeric address string.		 * @param service - A string identifying the requested service. This may be a		 * descriptive name or a numeric string corresponding to a port number.		 */		template<typename String, typename StrOrInt, typename... Args>		inline bool start(String&& host, StrOrInt&& service, Args&&... args)		{		#if defined(ASIO2_ENABLE_LOG)		#if defined(ASIO2_ALLOCATOR_STORAGE_SIZE)			static_assert(decltype(rallocator_)::storage_size == ASIO2_ALLOCATOR_STORAGE_SIZE);			static_assert(decltype(wallocator_)::storage_size == ASIO2_ALLOCATOR_STORAGE_SIZE);		#else			static_assert(decltype(rallocator_)::storage_size == args_t::allocator_storage_size);			static_assert(decltype(wallocator_)::storage_size == args_t::allocator_storage_size);		#endif		#endif			return this->derived().template _do_start<false>(				std::forward<String>(host), std::forward<StrOrInt>(service),				ecs_helper::make_ecs('0', std::forward<Args>(args)...));		}		/**		 * @brief start the udp cast		 * @param host - A string identifying a location. May be a descriptive name or		 * a numeric address string.		 * @param service - A string identifying the requested service. This may be a		 * descriptive name or a numeric string corresponding to a port number.		 */		template<typename String, typename StrOrInt, typename... Args>		inline bool async_start(String&& host, StrOrInt&& service, Args&&... args)		{			return this->derived().template _do_start<true>(				std::forward<String>(host), std::forward<StrOrInt>(service),				ecs_helper::make_ecs('0', std::forward<Args>(args)...));		}		/**		 * @brief stop the udp cast		 * You can call this function in the communication thread and anywhere to stop the udp cast.		 * If this function is called in the communication thread, it will post a asynchronous		 * event into the event queue, then return immediately.		 * If this function is called not in the communication thread, it will blocking forever		 * util the udp cast is stopped completed.		 */		inline void stop()		{			if (this->is_iopool_stopped())				return;			derived_t& derive = this->derived();			derive.io_->unregobj(&derive);			// use promise to get the result of stop			std::promise<state_t> promise;			std::future<state_t> future = promise.get_future();			// use derfer to ensure the promise's value must be seted.			detail::defer_event pg			{				[this, p = std::move(promise)]() mutable				{					p.set_value(this->state_.load());				}			};			derive.post_event([&derive, this_ptr = derive.selfptr(), pg = std::move(pg)]			(event_queue_guard<derived_t> g) mutable			{				derive._do_stop(asio::error::operation_aborted, std::move(this_ptr), defer_event				{					[pg = std::move(pg)](event_queue_guard<derived_t> g) mutable					{						detail::ignore_unused(pg, g);						// the "pg" should destroyed before the "g", otherwise if the "g"						// is destroyed before "pg", the next event maybe called, then the						// state maybe change to not stopped.						{							[[maybe_unused]] detail::defer_event t{ std::move(pg) };						}					}, std::move(g)				});			});			while (!derive.running_in_this_thread())			{				std::future_status status = future.wait_for(std::chrono::milliseconds(100));				if (status == std::future_status::ready)				{					ASIO2_ASSERT(future.get() == state_t::stopped);					break;				}				else				{					if (derive.get_thread_id() == std::thread::id{})						break;					if (derive.io_->context().stopped())						break;				}			}			this->stop_iopool();		}		/**		 * @brief destroy the content of all member variables, this is used for solve the memory leaks.		 * After this function is called, this class object cannot be used again.		 */		inline void destroy()		{			derived_t& derive = this->derived();			derive.socket_.reset();			derive.io_.reset();			derive.listener_.clear();			derive.destroy_iopool();		}		/**		 * @brief check whether the udp cast is started		 */		inline bool is_started() const		{			return (this->state_ == state_t::started && this->socket().is_open());		}		/**		 * @brief check whether the udp cast is stopped		 */		inline bool is_stopped() const		{			return (this->state_ == state_t::stopped && !this->socket().is_open());		}	public:		/**		 * @brief bind recv listener		 * @param fun - a user defined callback function.		 * @param obj - a pointer or reference to a class object, this parameter can be none.		 * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the		 * the class object's pointer or reference.		 * Function signature : void(asio::ip::udp::endpoint& endpoint, std::string_view data)		 */		template<class F, class ...C>		inline derived_t & bind_recv(F&& fun, C&&... obj)		{			this->listener_.bind(event_type::recv,				observer_t<asio::ip::udp::endpoint&, std::string_view>(					std::forward<F>(fun), std::forward<C>(obj)...));			return (this->derived());		}		/**		 * @brief bind init listener,we should set socket options at here		 * @param fun - a user defined callback function.		 * @param obj - a pointer or reference to a class object, this parameter can be none.		 * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the		 * the class object's pointer or reference.		 * Function signature : void()		 */		template<class F, class ...C>		inline derived_t & bind_init(F&& fun, C&&... obj)		{			this->listener_.bind(event_type::init,				observer_t<>(std::forward<F>(fun), std::forward<C>(obj)...));			return (this->derived());		}		/**		 * @brief bind start listener		 * @param fun - a user defined callback function.		 * @param obj - a pointer or reference to a class object, this parameter can be none.		 * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the		 * the class object's pointer or reference.		 * This notification is called after the server starts up, whether successful or unsuccessful		 * Function signature : void()		 */		template<class F, class ...C>		inline derived_t & bind_start(F&& fun, C&&... obj)		{			this->listener_.bind(event_type::start, observer_t<>(				std::forward<F>(fun), std::forward<C>(obj)...));			return (this->derived());		}		/**		 * @brief bind stop listener		 * @param fun - a user defined callback function.		 * @param obj - a pointer or reference to a class object, this parameter can be none.		 * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the		 * the class object's pointer or reference.		 * This notification is called before the server is ready to stop		 * Function signature : void()		 */		template<class F, class ...C>		inline derived_t & bind_stop(F&& fun, C&&... obj)		{			this->listener_.bind(event_type::stop, observer_t<>(				std::forward<F>(fun), std::forward<C>(obj)...));			return (this->derived());		}	protected:		template<bool IsAsync, typename String, typename StrOrInt, typename C>		bool _do_start(String&& host, StrOrInt&& port, std::shared_ptr<ecs_t<C>> ecs)		{			derived_t& derive = this->derived();			// if log is enabled, init the log first, otherwise when "Too many open files" error occurs,			// the log file will be created failed too.		#if defined(ASIO2_ENABLE_LOG)			asio2::detail::get_logger();		#endif			this->start_iopool();			if (!this->is_iopool_started())			{				set_last_error(asio::error::operation_aborted);				return false;			}			asio::dispatch(derive.io_->context(), [&derive, this_ptr = derive.selfptr()]() mutable			{				detail::ignore_unused(this_ptr);				// init the running thread id 				derive.io_->init_thread_id();			});			// use promise to get the result of async connect			std::promise<error_code> promise;			std::future<error_code> future = promise.get_future();			// use derfer to ensure the promise's value must be seted.			detail::defer_event pg			{				[promise = std::move(promise)]() mutable				{					promise.set_value(get_last_error());				}			};			derive.post_event(			[this, this_ptr = derive.selfptr(), ecs = std::move(ecs), pg = std::move(pg),				host = std::forward<String>(host), port = std::forward<StrOrInt>(port)]			(event_queue_guard<derived_t> g) mutable			{				derived_t& derive = this->derived();				defer_event chain				{					[pg = std::move(pg)](event_queue_guard<derived_t> g) mutable					{						detail::ignore_unused(pg, g);						// the "pg" should destroyed before the "g", otherwise if the "g"						// is destroyed before "pg", the next event maybe called, then the						// state maybe change to not stopped.						{							[[maybe_unused]] detail::defer_event t{ std::move(pg) };						}					}, std::move(g)				};				state_t expected = state_t::stopped;				if (!this->state_.compare_exchange_strong(expected, state_t::starting))				{					// if the state is not stopped, set the last error to already_started					set_last_error(asio::error::already_started);					return;				}				// must read/write ecs in the io_context thread.				derive.ecs_ = ecs;				derive.io_->regobj(&derive);			#if defined(_DEBUG) || defined(DEBUG)				this->is_stop_called_ = false;			#endif				// convert to string maybe throw some exception.				this->host_ = detail::to_string(std::move(host));				this->port_ = detail::to_string(std::move(port));				expected = state_t::starting;				if (!this->state_.compare_exchange_strong(expected, state_t::starting))				{					ASIO2_ASSERT(false);					derive._handle_connect(asio::error::operation_aborted,						std::move(this_ptr), std::move(ecs), std::move(chain));					return;				}				error_code ec, ec_ignore;				this->socket().cancel(ec_ignore);				this->socket().close(ec_ignore);				// parse address and port				asio::ip::udp::resolver resolver(this->io_->context());				auto results = resolver.resolve(this->host_, this->port_,					asio::ip::resolver_base::flags::passive |					asio::ip::resolver_base::flags::address_configured, ec);				if (ec)				{					derive._handle_connect(ec, std::move(this_ptr), std::move(ecs), std::move(chain));					return;				}				if (results.empty())				{					derive._handle_connect(asio::error::host_not_found,						std::move(this_ptr), std::move(ecs), std::move(chain));					return;				}				asio::ip::udp::endpoint endpoint = *results.begin();				this->socket().open(endpoint.protocol(), ec);				if (ec)				{					derive._handle_connect(ec, std::move(this_ptr), std::move(ecs), std::move(chain));					return;				}				// when you close socket in linux system,and start socket				// immediate,you will get like this "the address is in use",				// and bind is failed,but i'm suer i close the socket correct				// already before,why does this happen? the reasion is the				// socket option "TIME_WAIT",although you close the socket,				// but the system not release the socket,util 2~4 seconds later,				// so we can use the SO_REUSEADDR option to avoid this problem,				// like below				// set port reuse				this->socket().set_option(asio::ip::udp::socket::reuse_address(true), ec_ignore);				//derive._socks5_init(ecs);				clear_last_error();				derive._fire_init();				this->socket().bind(endpoint, ec);				if (ec)				{					derive._handle_connect(ec, std::move(this_ptr), std::move(ecs), std::move(chain));					return;				}				derive._post_proxy(ec, std::move(this_ptr), std::move(ecs), std::move(chain));			});			if constexpr (IsAsync)			{				set_last_error(asio::error::in_progress);				return true;			}			else			{				if (!derive.io_->running_in_this_thread())				{					set_last_error(future.get());					return static_cast<bool>(!get_last_error());				}				else				{					set_last_error(asio::error::in_progress);				}				// if the state is stopped , the return value is "is_started()".				// if the state is stopping, the return value is false, the last error is already_started				// if the state is starting, the return value is false, the last error is already_started				// if the state is started , the return value is true , the last error is already_started				return derive.is_started();			}		}		template<typename C, typename DeferEvent>		inline void _post_proxy(			const error_code& ec,			std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)		{			set_last_error(ec);			derived_t& derive = static_cast<derived_t&>(*this);			ASIO2_ASSERT(derive.io_->running_in_this_thread());			//if constexpr (std::is_base_of_v<component_tag, detail::remove_cvref_t<C>>)			//{			//	if (ec)			//		return derive._handle_proxy(ec, std::move(this_ptr), std::move(ecs), std::move(chain));			//	if constexpr (C::has_socks5())			//		derive._socks5_start(std::move(this_ptr), std::move(ecs), std::move(chain));			//	else			//		derive._handle_proxy(ec, std::move(this_ptr), std::move(ecs), std::move(chain));			//}			//else			{				derive._handle_proxy(ec, std::move(this_ptr), std::move(ecs), std::move(chain));			}		}		template<typename C, typename DeferEvent>		inline void _handle_proxy(			const error_code& ec,			std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)		{			set_last_error(ec);			derived_t& derive = static_cast<derived_t&>(*this);			derive._handle_connect(ec, std::move(this_ptr), std::move(ecs), std::move(chain));		}		template<typename C, typename DeferEvent>		inline void _handle_connect(			const error_code& ec,			std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)		{			set_last_error(ec);			this->derived()._handle_start(ec, std::move(this_ptr), std::move(ecs), std::move(chain));		}		template<typename C, typename DeferEvent>		inline void _handle_start(			error_code ec,			std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)		{			ASIO2_ASSERT(this->derived().io_->running_in_this_thread());			// Whether the startup succeeds or fails, always call fire_start notification			state_t expected = state_t::starting;			if (!ec)				if (!this->state_.compare_exchange_strong(expected, state_t::started))					ec = asio::error::operation_aborted;			set_last_error(ec);			this->derived()._fire_start();			expected = state_t::started;			if (!ec)				if (!this->state_.compare_exchange_strong(expected, state_t::started))					ec = asio::error::operation_aborted;			if (ec)			{				this->derived()._do_stop(ec, std::move(this_ptr), std::move(chain));				return;			}			this->buffer_.consume(this->buffer_.size());			this->derived()._post_recv(std::move(this_ptr), std::move(ecs));		}		template<typename E = defer_event<void, derived_t>>		inline void _do_disconnect(			const error_code& ec, std::shared_ptr<derived_t> this_ptr, E chain = defer_event<void, derived_t>{})		{			derived_t& derive = this->derived();			derive.dispatch([&derive, ec, this_ptr = std::move(this_ptr), chain = std::move(chain)]() mutable			{				derive._do_stop(ec, std::move(this_ptr), std::move(chain));			});		}		template<typename E = defer_event<void, derived_t>>		inline void _do_stop(			const error_code& ec, std::shared_ptr<derived_t> this_ptr, E chain = defer_event<void, derived_t>{})		{			ASIO2_ASSERT(this->derived().io_->running_in_this_thread());			state_t expected = state_t::starting;			if (this->state_.compare_exchange_strong(expected, state_t::stopping))				return this->derived()._post_stop(ec, std::move(this_ptr), expected, std::move(chain));			expected = state_t::started;			if (this->state_.compare_exchange_strong(expected, state_t::stopping))				return this->derived()._post_stop(ec, std::move(this_ptr), expected, std::move(chain));		}		template<typename DeferEvent>		inline void _post_stop(			const error_code& ec, std::shared_ptr<derived_t> this_ptr, state_t old_state, DeferEvent chain)		{			// psot a recv signal to ensure that all recv events has finished already.			this->derived().disp_event(			[this, ec, this_ptr = std::move(this_ptr), old_state, e = chain.move_event()]			(event_queue_guard<derived_t> g) mutable			{				detail::ignore_unused(old_state);				// When the code runs here,no new session can be emplace or erase to session_mgr.				// stop all the sessions, the session::stop must be no blocking,				// otherwise it may be cause loop lock.				set_last_error(ec);				defer_event chain(std::move(e), std::move(g));				state_t expected = state_t::stopping;				if (this->state_.compare_exchange_strong(expected, state_t::stopped))				{					this->derived()._fire_stop();					// call CRTP polymorphic stop					this->derived()._handle_stop(ec, std::move(this_ptr), std::move(chain));				}				else				{					ASIO2_ASSERT(false);				}			}, chain.move_guard());		}		template<typename DeferEvent>		inline void _handle_stop(const error_code& ec, std::shared_ptr<derived_t> this_ptr, DeferEvent chain)		{			derived_t& derive = this->derived();			detail::ignore_unused(ec, this_ptr, chain);			ASIO2_ASSERT(derive.io_->running_in_this_thread());			// close user custom timers			derive._dispatch_stop_all_timers();			// close all posted timed tasks			derive._dispatch_stop_all_timed_events();			// close all async_events			derive.notify_all_condition_events();			//derive._socks5_stop();			error_code ec_ignore{};			// call socket's close function to notify the _handle_recv function			// response with error > 0 ,then the socket can get notify to exit			// Call shutdown() to indicate that you will not write any more data to the socket.			derive.socket().shutdown(asio::socket_base::shutdown_both, ec_ignore);			derive.socket().cancel(ec_ignore);			// Call close,otherwise the _handle_recv will never return			derive.socket().close(ec_ignore);			// clear recv buffer			derive.buffer().consume(derive.buffer().size());			// destroy user data, maybe the user data is self shared_ptr,			// if don't destroy it, will cause loop reference.			derive.user_data_.reset();			// destroy the ecs			derive.ecs_.reset();			//			derive.reset_life_id();		}	protected:		template<class Endpoint, class Data, class Callback>		inline bool _do_send(Endpoint& endpoint, Data& data, Callback&& callback)		{			return this->derived()._udp_send_to(endpoint, data, std::forward<Callback>(callback));		}	protected:		template<typename C>		void _post_recv(std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)		{			if (!this->is_started())			{				if (this->derived().state_ == state_t::started)				{					this->derived()._do_stop(asio2::get_last_error(), std::move(this_ptr));				}				return;			}		#if defined(_DEBUG) || defined(DEBUG)			ASIO2_ASSERT(this->derived().post_recv_counter_.load() == 0);			this->derived().post_recv_counter_++;		#endif			this->socket().async_receive_from(				this->buffer_.prepare(this->buffer_.pre_size()),				this->remote_endpoint_,				make_allocator(this->rallocator_,			[this, this_ptr = std::move(this_ptr), ecs = std::move(ecs)]			(const error_code& ec, std::size_t bytes_recvd) mutable			{			#if defined(_DEBUG) || defined(DEBUG)				this->derived().post_recv_counter_--;			#endif				this->derived()._handle_recv(ec, bytes_recvd, std::move(this_ptr), std::move(ecs));			}));		}		template<typename C>		void _handle_recv(			const error_code& ec, std::size_t bytes_recvd,			std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)		{			set_last_error(ec);			if (!this->derived().is_started())			{				if (this->derived().state_ == state_t::started)				{					this->derived()._do_stop(ec, std::move(this_ptr));				}				return;			}			if (ec == asio::error::operation_aborted)			{				this->derived()._do_stop(ec, std::move(this_ptr));				return;			}			this->buffer_.commit(bytes_recvd);			if (!ec)			{				this->derived()._fire_recv(this_ptr, ecs,					std::string_view(static_cast<std::string_view::const_pointer>(						this->buffer_.data().data()), bytes_recvd));			}			this->buffer_.consume(this->buffer_.size());			if (bytes_recvd == this->buffer_.pre_size())			{				this->buffer_.pre_size((std::min)(this->buffer_.pre_size() * 2, this->buffer_.max_size()));			}			this->derived()._post_recv(std::move(this_ptr), std::move(ecs));		}		inline void _fire_init()		{			// the _fire_init must be executed in the thread 0.			ASIO2_ASSERT(this->derived().io_->running_in_this_thread());			ASIO2_ASSERT(!get_last_error());			this->listener_.notify(event_type::init);		}		template<typename C>		inline void _fire_recv(			std::shared_ptr<derived_t>& this_ptr, std::shared_ptr<ecs_t<C>>& ecs, std::string_view data)		{			detail::ignore_unused(this_ptr, ecs);			data = detail::call_data_filter_before_recv(this->derived(), data);			this->listener_.notify(event_type::recv, this->remote_endpoint_, data);		}		inline void _fire_start()		{			// the _fire_start must be executed in the thread 0.			ASIO2_ASSERT(this->derived().io_->running_in_this_thread());		#if defined(_DEBUG) || defined(DEBUG)			ASIO2_ASSERT(this->is_stop_called_ == false);		#endif			this->listener_.notify(event_type::start);		}		inline void _fire_stop()		{			// the _fire_stop must be executed in the thread 0.			ASIO2_ASSERT(this->derived().io_->running_in_this_thread());		#if defined(_DEBUG) || defined(DEBUG)			this->is_stop_called_ = true;		#endif			this->listener_.notify(event_type::stop);		}	public:		/**		 * @brief get the buffer object reference		 */		inline buffer_wrap<buffer_type> & buffer() noexcept { return this->buffer_; }	protected:		/**		 * @brief get the recv/read allocator object reference		 */		inline auto & rallocator() noexcept { return this->rallocator_; }		/**		 * @brief get the send/write allocator object reference		 */		inline auto & wallocator() noexcept { return this->wallocator_; }		inline const char*                  life_id () noexcept { return this->life_id_.get(); }		inline void                   reset_life_id () noexcept { this->life_id_ = std::make_unique<char>(); }	protected:		/// The memory to use for handler-based custom memory allocation. used fo recv/read.		handler_memory<std::true_type , assizer<args_t>>   rallocator_;		/// The memory to use for handler-based custom memory allocation. used fo send/write.		handler_memory<std::false_type, assizer<args_t>>   wallocator_;		/// listener 		listener_t                                  listener_;		/// buffer		buffer_wrap<buffer_type>                    buffer_;		/// state		std::atomic<state_t>                        state_ = state_t::stopped;		/// the pointer of ecs_t		std::shared_ptr<ecs_base>                   ecs_;		/// @see client life id		std::unique_ptr<char>                       life_id_ = std::make_unique<char>();	#if defined(_DEBUG) || defined(DEBUG)		bool                                        is_stop_called_  = false;		std::atomic<int>                            post_send_counter_ = 0;		std::atomic<int>                            post_recv_counter_ = 0;	#endif	};}namespace asio2{	using udp_cast_args = detail::template_args_udp_cast;	template<class derived_t, class args_t>	using udp_cast_impl_t = detail::udp_cast_impl_t<derived_t, args_t>;	/**	 * udp unicast/multicast/broadcast	 * @throws constructor maybe throw exception "Too many open files" (exception code : 24)	 * asio::error::no_descriptors - Too many open files	 */	template<class derived_t>	class udp_cast_t : public detail::udp_cast_impl_t<derived_t, detail::template_args_udp_cast>	{	public:		using detail::udp_cast_impl_t<derived_t, detail::template_args_udp_cast>::udp_cast_impl_t;	};	/*	 * udp unicast/multicast/broadcast	 * If this object is created as a shared_ptr like std::shared_ptr<asio2::udp_cast> cast;	 * you must call the cast->stop() manual when exit, otherwise maybe cause memory leaks.	 * @throws constructor maybe throw exception "Too many open files" (exception code : 24)	 * asio::error::no_descriptors - Too many open files	 */	class udp_cast : public udp_cast_t<udp_cast>	{	public:		using udp_cast_t<udp_cast>::udp_cast_t;	};}#include <asio2/base/detail/pop_options.hpp>#endif // !__ASIO2_UDP_CAST_HPP__
 |