udp_cast.hpp 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. */
  10. #ifndef __ASIO2_UDP_CAST_HPP__
  11. #define __ASIO2_UDP_CAST_HPP__
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. #pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <asio2/base/detail/push_options.hpp>
  16. #include <cstdint>
  17. #include <memory>
  18. #include <chrono>
  19. #include <functional>
  20. #include <atomic>
  21. #include <string>
  22. #include <string_view>
  23. #include <queue>
  24. #include <any>
  25. #include <future>
  26. #include <tuple>
  27. #include <unordered_map>
  28. #include <asio2/base/iopool.hpp>
  29. #include <asio2/base/listener.hpp>
  30. #include <asio2/base/detail/object.hpp>
  31. #include <asio2/base/detail/allocator.hpp>
  32. #include <asio2/base/detail/util.hpp>
  33. #include <asio2/base/detail/buffer_wrap.hpp>
  34. #include <asio2/base/detail/ecs.hpp>
  35. #include <asio2/base/impl/io_context_cp.hpp>
  36. #include <asio2/base/impl/thread_id_cp.hpp>
  37. #include <asio2/base/impl/alive_time_cp.hpp>
  38. #include <asio2/base/impl/user_data_cp.hpp>
  39. #include <asio2/base/impl/socket_cp.hpp>
  40. #include <asio2/base/impl/user_timer_cp.hpp>
  41. #include <asio2/base/impl/post_cp.hpp>
  42. #include <asio2/base/impl/event_queue_cp.hpp>
  43. #include <asio2/base/impl/condition_event_cp.hpp>
  44. #include <asio2/base/impl/connect_cp.hpp>
  45. #include <asio2/base/detail/linear_buffer.hpp>
  46. #include <asio2/udp/impl/udp_send_cp.hpp>
  47. #include <asio2/udp/impl/udp_send_op.hpp>
  48. //#include <asio2/component/socks/socks5_client_cp.hpp>
  49. //#include <asio2/proxy/socks5_client.hpp>
  50. namespace asio2::detail
  51. {
  52. struct template_args_udp_cast : public udp_tag
  53. {
  54. using socket_t = asio::ip::udp::socket;
  55. using buffer_t = asio2::linear_buffer;
  56. //template<class derived_t>
  57. //using socks5_client_t = asio2::socks5_tcp_client_t<derived_t>;
  58. static constexpr std::size_t function_storage_size = 88;
  59. static constexpr std::size_t allocator_storage_size = 256;
  60. };
  61. ASIO2_CLASS_FORWARD_DECLARE_BASE;
  62. ASIO2_CLASS_FORWARD_DECLARE_UDP_BASE;
  63. ASIO2_CLASS_FORWARD_DECLARE_UDP_CLIENT;
  64. template<class derived_t, class args_t = template_args_udp_cast>
  65. class udp_cast_impl_t
  66. : public object_t <derived_t >
  67. , public iopool_cp <derived_t, args_t>
  68. , public io_context_cp <derived_t, args_t>
  69. , public thread_id_cp <derived_t, args_t>
  70. , public event_queue_cp <derived_t, args_t>
  71. , public user_data_cp <derived_t, args_t>
  72. , public alive_time_cp <derived_t, args_t>
  73. , public socket_cp <derived_t, args_t>
  74. , public user_timer_cp <derived_t, args_t>
  75. , public post_cp <derived_t, args_t>
  76. , public condition_event_cp<derived_t, args_t>
  77. , public udp_send_cp <derived_t, args_t>
  78. , public udp_send_op <derived_t, args_t>
  79. , public connect_cp_member_variables<derived_t, args_t, false>
  80. //, public socks5_client_cp <derived_t, args_t>
  81. , public udp_tag
  82. , public cast_tag
  83. {
  84. ASIO2_CLASS_FRIEND_DECLARE_BASE;
  85. ASIO2_CLASS_FRIEND_DECLARE_UDP_BASE;
  86. ASIO2_CLASS_FRIEND_DECLARE_UDP_CLIENT;
  87. public:
  88. using super = object_t <derived_t >;
  89. using self = udp_cast_impl_t<derived_t, args_t>;
  90. using iopoolcp = iopool_cp <derived_t, args_t>;
  91. using args_type = args_t;
  92. using buffer_type = typename args_t::buffer_t;
  93. /**
  94. * @brief constructor
  95. * @throws maybe throw exception "Too many open files" (exception code : 24)
  96. * asio::error::no_descriptors - Too many open files
  97. */
  98. explicit udp_cast_impl_t(
  99. std::size_t init_buf_size = udp_frame_size,
  100. std::size_t max_buf_size = max_buffer_size,
  101. std::size_t concurrency = 1
  102. )
  103. : super()
  104. , iopool_cp <derived_t, args_t>(concurrency)
  105. , io_context_cp <derived_t, args_t>(iopoolcp::_get_io(0))
  106. , event_queue_cp <derived_t, args_t>()
  107. , user_data_cp <derived_t, args_t>()
  108. , alive_time_cp <derived_t, args_t>()
  109. , socket_cp <derived_t, args_t>(iopoolcp::_get_io(0)->context())
  110. , user_timer_cp <derived_t, args_t>()
  111. , post_cp <derived_t, args_t>()
  112. , condition_event_cp<derived_t, args_t>()
  113. , udp_send_cp <derived_t, args_t>()
  114. , udp_send_op <derived_t, args_t>()
  115. , rallocator_()
  116. , wallocator_()
  117. , listener_ ()
  118. , buffer_ (init_buf_size, max_buf_size)
  119. {
  120. }
  121. template<class Scheduler, std::enable_if_t<!std::is_integral_v<detail::remove_cvref_t<Scheduler>>, int> = 0>
  122. explicit udp_cast_impl_t(
  123. std::size_t init_buf_size,
  124. std::size_t max_buf_size,
  125. Scheduler && scheduler
  126. )
  127. : super()
  128. , iopool_cp <derived_t, args_t>(std::forward<Scheduler>(scheduler))
  129. , io_context_cp <derived_t, args_t>(iopoolcp::_get_io(0))
  130. , event_queue_cp <derived_t, args_t>()
  131. , user_data_cp <derived_t, args_t>()
  132. , alive_time_cp <derived_t, args_t>()
  133. , socket_cp <derived_t, args_t>(iopoolcp::_get_io(0)->context())
  134. , user_timer_cp <derived_t, args_t>()
  135. , post_cp <derived_t, args_t>()
  136. , condition_event_cp<derived_t, args_t>()
  137. , udp_send_cp <derived_t, args_t>()
  138. , udp_send_op <derived_t, args_t>()
  139. , rallocator_()
  140. , wallocator_()
  141. , listener_ ()
  142. , buffer_ (init_buf_size, max_buf_size)
  143. {
  144. }
  145. template<class Scheduler, std::enable_if_t<!std::is_integral_v<detail::remove_cvref_t<Scheduler>>, int> = 0>
  146. explicit udp_cast_impl_t(Scheduler&& scheduler)
  147. : udp_cast_impl_t(udp_frame_size, max_buffer_size, std::forward<Scheduler>(scheduler))
  148. {
  149. }
  150. /**
  151. * @brief destructor
  152. */
  153. ~udp_cast_impl_t()
  154. {
  155. this->stop();
  156. }
  157. /**
  158. * @brief start the udp cast
  159. * @param host - A string identifying a location. May be a descriptive name or
  160. * a numeric address string.
  161. * @param service - A string identifying the requested service. This may be a
  162. * descriptive name or a numeric string corresponding to a port number.
  163. */
  164. template<typename String, typename StrOrInt, typename... Args>
  165. inline bool start(String&& host, StrOrInt&& service, Args&&... args)
  166. {
  167. #if defined(ASIO2_ENABLE_LOG)
  168. #if defined(ASIO2_ALLOCATOR_STORAGE_SIZE)
  169. static_assert(decltype(rallocator_)::storage_size == ASIO2_ALLOCATOR_STORAGE_SIZE);
  170. static_assert(decltype(wallocator_)::storage_size == ASIO2_ALLOCATOR_STORAGE_SIZE);
  171. #else
  172. static_assert(decltype(rallocator_)::storage_size == args_t::allocator_storage_size);
  173. static_assert(decltype(wallocator_)::storage_size == args_t::allocator_storage_size);
  174. #endif
  175. #endif
  176. return this->derived().template _do_start<false>(
  177. std::forward<String>(host), std::forward<StrOrInt>(service),
  178. ecs_helper::make_ecs('0', std::forward<Args>(args)...));
  179. }
  180. /**
  181. * @brief start the udp cast
  182. * @param host - A string identifying a location. May be a descriptive name or
  183. * a numeric address string.
  184. * @param service - A string identifying the requested service. This may be a
  185. * descriptive name or a numeric string corresponding to a port number.
  186. */
  187. template<typename String, typename StrOrInt, typename... Args>
  188. inline bool async_start(String&& host, StrOrInt&& service, Args&&... args)
  189. {
  190. return this->derived().template _do_start<true>(
  191. std::forward<String>(host), std::forward<StrOrInt>(service),
  192. ecs_helper::make_ecs('0', std::forward<Args>(args)...));
  193. }
  194. /**
  195. * @brief stop the udp cast
  196. * You can call this function in the communication thread and anywhere to stop the udp cast.
  197. * If this function is called in the communication thread, it will post a asynchronous
  198. * event into the event queue, then return immediately.
  199. * If this function is called not in the communication thread, it will blocking forever
  200. * util the udp cast is stopped completed.
  201. */
  202. inline void stop()
  203. {
  204. if (this->is_iopool_stopped())
  205. return;
  206. derived_t& derive = this->derived();
  207. derive.io_->unregobj(&derive);
  208. // use promise to get the result of stop
  209. std::promise<state_t> promise;
  210. std::future<state_t> future = promise.get_future();
  211. // use derfer to ensure the promise's value must be seted.
  212. detail::defer_event pg
  213. {
  214. [this, p = std::move(promise)]() mutable
  215. {
  216. p.set_value(this->state_.load());
  217. }
  218. };
  219. derive.post_event([&derive, this_ptr = derive.selfptr(), pg = std::move(pg)]
  220. (event_queue_guard<derived_t> g) mutable
  221. {
  222. derive._do_stop(asio::error::operation_aborted, std::move(this_ptr), defer_event
  223. {
  224. [pg = std::move(pg)](event_queue_guard<derived_t> g) mutable
  225. {
  226. detail::ignore_unused(pg, g);
  227. // the "pg" should destroyed before the "g", otherwise if the "g"
  228. // is destroyed before "pg", the next event maybe called, then the
  229. // state maybe change to not stopped.
  230. {
  231. [[maybe_unused]] detail::defer_event t{ std::move(pg) };
  232. }
  233. }, std::move(g)
  234. });
  235. });
  236. while (!derive.running_in_this_thread())
  237. {
  238. std::future_status status = future.wait_for(std::chrono::milliseconds(100));
  239. if (status == std::future_status::ready)
  240. {
  241. ASIO2_ASSERT(future.get() == state_t::stopped);
  242. break;
  243. }
  244. else
  245. {
  246. if (derive.get_thread_id() == std::thread::id{})
  247. break;
  248. if (derive.io_->context().stopped())
  249. break;
  250. }
  251. }
  252. this->stop_iopool();
  253. }
  254. /**
  255. * @brief destroy the content of all member variables, this is used for solve the memory leaks.
  256. * After this function is called, this class object cannot be used again.
  257. */
  258. inline void destroy()
  259. {
  260. derived_t& derive = this->derived();
  261. derive.socket_.reset();
  262. derive.io_.reset();
  263. derive.listener_.clear();
  264. derive.destroy_iopool();
  265. }
  266. /**
  267. * @brief check whether the udp cast is started
  268. */
  269. inline bool is_started() const
  270. {
  271. return (this->state_ == state_t::started && this->socket().is_open());
  272. }
  273. /**
  274. * @brief check whether the udp cast is stopped
  275. */
  276. inline bool is_stopped() const
  277. {
  278. return (this->state_ == state_t::stopped && !this->socket().is_open());
  279. }
  280. public:
  281. /**
  282. * @brief bind recv listener
  283. * @param fun - a user defined callback function.
  284. * @param obj - a pointer or reference to a class object, this parameter can be none.
  285. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  286. * the class object's pointer or reference.
  287. * Function signature : void(asio::ip::udp::endpoint& endpoint, std::string_view data)
  288. */
  289. template<class F, class ...C>
  290. inline derived_t & bind_recv(F&& fun, C&&... obj)
  291. {
  292. this->listener_.bind(event_type::recv,
  293. observer_t<asio::ip::udp::endpoint&, std::string_view>(
  294. std::forward<F>(fun), std::forward<C>(obj)...));
  295. return (this->derived());
  296. }
  297. /**
  298. * @brief bind init listener,we should set socket options at here
  299. * @param fun - a user defined callback function.
  300. * @param obj - a pointer or reference to a class object, this parameter can be none.
  301. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  302. * the class object's pointer or reference.
  303. * Function signature : void()
  304. */
  305. template<class F, class ...C>
  306. inline derived_t & bind_init(F&& fun, C&&... obj)
  307. {
  308. this->listener_.bind(event_type::init,
  309. observer_t<>(std::forward<F>(fun), std::forward<C>(obj)...));
  310. return (this->derived());
  311. }
  312. /**
  313. * @brief bind start listener
  314. * @param fun - a user defined callback function.
  315. * @param obj - a pointer or reference to a class object, this parameter can be none.
  316. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  317. * the class object's pointer or reference.
  318. * This notification is called after the server starts up, whether successful or unsuccessful
  319. * Function signature : void()
  320. */
  321. template<class F, class ...C>
  322. inline derived_t & bind_start(F&& fun, C&&... obj)
  323. {
  324. this->listener_.bind(event_type::start, observer_t<>(
  325. std::forward<F>(fun), std::forward<C>(obj)...));
  326. return (this->derived());
  327. }
  328. /**
  329. * @brief bind stop listener
  330. * @param fun - a user defined callback function.
  331. * @param obj - a pointer or reference to a class object, this parameter can be none.
  332. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  333. * the class object's pointer or reference.
  334. * This notification is called before the server is ready to stop
  335. * Function signature : void()
  336. */
  337. template<class F, class ...C>
  338. inline derived_t & bind_stop(F&& fun, C&&... obj)
  339. {
  340. this->listener_.bind(event_type::stop, observer_t<>(
  341. std::forward<F>(fun), std::forward<C>(obj)...));
  342. return (this->derived());
  343. }
  344. protected:
  345. template<bool IsAsync, typename String, typename StrOrInt, typename C>
  346. bool _do_start(String&& host, StrOrInt&& port, std::shared_ptr<ecs_t<C>> ecs)
  347. {
  348. derived_t& derive = this->derived();
  349. // if log is enabled, init the log first, otherwise when "Too many open files" error occurs,
  350. // the log file will be created failed too.
  351. #if defined(ASIO2_ENABLE_LOG)
  352. asio2::detail::get_logger();
  353. #endif
  354. this->start_iopool();
  355. if (!this->is_iopool_started())
  356. {
  357. set_last_error(asio::error::operation_aborted);
  358. return false;
  359. }
  360. asio::dispatch(derive.io_->context(), [&derive, this_ptr = derive.selfptr()]() mutable
  361. {
  362. detail::ignore_unused(this_ptr);
  363. // init the running thread id
  364. derive.io_->init_thread_id();
  365. });
  366. // use promise to get the result of async connect
  367. std::promise<error_code> promise;
  368. std::future<error_code> future = promise.get_future();
  369. // use derfer to ensure the promise's value must be seted.
  370. detail::defer_event pg
  371. {
  372. [promise = std::move(promise)]() mutable
  373. {
  374. promise.set_value(get_last_error());
  375. }
  376. };
  377. derive.post_event(
  378. [this, this_ptr = derive.selfptr(), ecs = std::move(ecs), pg = std::move(pg),
  379. host = std::forward<String>(host), port = std::forward<StrOrInt>(port)]
  380. (event_queue_guard<derived_t> g) mutable
  381. {
  382. derived_t& derive = this->derived();
  383. defer_event chain
  384. {
  385. [pg = std::move(pg)](event_queue_guard<derived_t> g) mutable
  386. {
  387. detail::ignore_unused(pg, g);
  388. // the "pg" should destroyed before the "g", otherwise if the "g"
  389. // is destroyed before "pg", the next event maybe called, then the
  390. // state maybe change to not stopped.
  391. {
  392. [[maybe_unused]] detail::defer_event t{ std::move(pg) };
  393. }
  394. }, std::move(g)
  395. };
  396. state_t expected = state_t::stopped;
  397. if (!this->state_.compare_exchange_strong(expected, state_t::starting))
  398. {
  399. // if the state is not stopped, set the last error to already_started
  400. set_last_error(asio::error::already_started);
  401. return;
  402. }
  403. // must read/write ecs in the io_context thread.
  404. derive.ecs_ = ecs;
  405. derive.io_->regobj(&derive);
  406. #if defined(_DEBUG) || defined(DEBUG)
  407. this->is_stop_called_ = false;
  408. #endif
  409. // convert to string maybe throw some exception.
  410. this->host_ = detail::to_string(std::move(host));
  411. this->port_ = detail::to_string(std::move(port));
  412. expected = state_t::starting;
  413. if (!this->state_.compare_exchange_strong(expected, state_t::starting))
  414. {
  415. ASIO2_ASSERT(false);
  416. derive._handle_connect(asio::error::operation_aborted,
  417. std::move(this_ptr), std::move(ecs), std::move(chain));
  418. return;
  419. }
  420. error_code ec, ec_ignore;
  421. this->socket().cancel(ec_ignore);
  422. this->socket().close(ec_ignore);
  423. // parse address and port
  424. asio::ip::udp::resolver resolver(this->io_->context());
  425. auto results = resolver.resolve(this->host_, this->port_,
  426. asio::ip::resolver_base::flags::passive |
  427. asio::ip::resolver_base::flags::address_configured, ec);
  428. if (ec)
  429. {
  430. derive._handle_connect(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  431. return;
  432. }
  433. if (results.empty())
  434. {
  435. derive._handle_connect(asio::error::host_not_found,
  436. std::move(this_ptr), std::move(ecs), std::move(chain));
  437. return;
  438. }
  439. asio::ip::udp::endpoint endpoint = *results.begin();
  440. this->socket().open(endpoint.protocol(), ec);
  441. if (ec)
  442. {
  443. derive._handle_connect(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  444. return;
  445. }
  446. // when you close socket in linux system,and start socket
  447. // immediate,you will get like this "the address is in use",
  448. // and bind is failed,but i'm suer i close the socket correct
  449. // already before,why does this happen? the reasion is the
  450. // socket option "TIME_WAIT",although you close the socket,
  451. // but the system not release the socket,util 2~4 seconds later,
  452. // so we can use the SO_REUSEADDR option to avoid this problem,
  453. // like below
  454. // set port reuse
  455. this->socket().set_option(asio::ip::udp::socket::reuse_address(true), ec_ignore);
  456. //derive._socks5_init(ecs);
  457. clear_last_error();
  458. derive._fire_init();
  459. this->socket().bind(endpoint, ec);
  460. if (ec)
  461. {
  462. derive._handle_connect(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  463. return;
  464. }
  465. derive._post_proxy(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  466. });
  467. if constexpr (IsAsync)
  468. {
  469. set_last_error(asio::error::in_progress);
  470. return true;
  471. }
  472. else
  473. {
  474. if (!derive.io_->running_in_this_thread())
  475. {
  476. set_last_error(future.get());
  477. return static_cast<bool>(!get_last_error());
  478. }
  479. else
  480. {
  481. set_last_error(asio::error::in_progress);
  482. }
  483. // if the state is stopped , the return value is "is_started()".
  484. // if the state is stopping, the return value is false, the last error is already_started
  485. // if the state is starting, the return value is false, the last error is already_started
  486. // if the state is started , the return value is true , the last error is already_started
  487. return derive.is_started();
  488. }
  489. }
  490. template<typename C, typename DeferEvent>
  491. inline void _post_proxy(
  492. const error_code& ec,
  493. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
  494. {
  495. set_last_error(ec);
  496. derived_t& derive = static_cast<derived_t&>(*this);
  497. ASIO2_ASSERT(derive.io_->running_in_this_thread());
  498. //if constexpr (std::is_base_of_v<component_tag, detail::remove_cvref_t<C>>)
  499. //{
  500. // if (ec)
  501. // return derive._handle_proxy(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  502. // if constexpr (C::has_socks5())
  503. // derive._socks5_start(std::move(this_ptr), std::move(ecs), std::move(chain));
  504. // else
  505. // derive._handle_proxy(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  506. //}
  507. //else
  508. {
  509. derive._handle_proxy(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  510. }
  511. }
  512. template<typename C, typename DeferEvent>
  513. inline void _handle_proxy(
  514. const error_code& ec,
  515. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
  516. {
  517. set_last_error(ec);
  518. derived_t& derive = static_cast<derived_t&>(*this);
  519. derive._handle_connect(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  520. }
  521. template<typename C, typename DeferEvent>
  522. inline void _handle_connect(
  523. const error_code& ec,
  524. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
  525. {
  526. set_last_error(ec);
  527. this->derived()._handle_start(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  528. }
  529. template<typename C, typename DeferEvent>
  530. inline void _handle_start(
  531. error_code ec,
  532. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
  533. {
  534. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  535. // Whether the startup succeeds or fails, always call fire_start notification
  536. state_t expected = state_t::starting;
  537. if (!ec)
  538. if (!this->state_.compare_exchange_strong(expected, state_t::started))
  539. ec = asio::error::operation_aborted;
  540. set_last_error(ec);
  541. this->derived()._fire_start();
  542. expected = state_t::started;
  543. if (!ec)
  544. if (!this->state_.compare_exchange_strong(expected, state_t::started))
  545. ec = asio::error::operation_aborted;
  546. if (ec)
  547. {
  548. this->derived()._do_stop(ec, std::move(this_ptr), std::move(chain));
  549. return;
  550. }
  551. this->buffer_.consume(this->buffer_.size());
  552. this->derived()._post_recv(std::move(this_ptr), std::move(ecs));
  553. }
  554. template<typename E = defer_event<void, derived_t>>
  555. inline void _do_disconnect(
  556. const error_code& ec, std::shared_ptr<derived_t> this_ptr, E chain = defer_event<void, derived_t>{})
  557. {
  558. derived_t& derive = this->derived();
  559. derive.dispatch([&derive, ec, this_ptr = std::move(this_ptr), chain = std::move(chain)]() mutable
  560. {
  561. derive._do_stop(ec, std::move(this_ptr), std::move(chain));
  562. });
  563. }
  564. template<typename E = defer_event<void, derived_t>>
  565. inline void _do_stop(
  566. const error_code& ec, std::shared_ptr<derived_t> this_ptr, E chain = defer_event<void, derived_t>{})
  567. {
  568. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  569. state_t expected = state_t::starting;
  570. if (this->state_.compare_exchange_strong(expected, state_t::stopping))
  571. return this->derived()._post_stop(ec, std::move(this_ptr), expected, std::move(chain));
  572. expected = state_t::started;
  573. if (this->state_.compare_exchange_strong(expected, state_t::stopping))
  574. return this->derived()._post_stop(ec, std::move(this_ptr), expected, std::move(chain));
  575. }
  576. template<typename DeferEvent>
  577. inline void _post_stop(
  578. const error_code& ec, std::shared_ptr<derived_t> this_ptr, state_t old_state, DeferEvent chain)
  579. {
  580. // psot a recv signal to ensure that all recv events has finished already.
  581. this->derived().disp_event(
  582. [this, ec, this_ptr = std::move(this_ptr), old_state, e = chain.move_event()]
  583. (event_queue_guard<derived_t> g) mutable
  584. {
  585. detail::ignore_unused(old_state);
  586. // When the code runs here,no new session can be emplace or erase to session_mgr.
  587. // stop all the sessions, the session::stop must be no blocking,
  588. // otherwise it may be cause loop lock.
  589. set_last_error(ec);
  590. defer_event chain(std::move(e), std::move(g));
  591. state_t expected = state_t::stopping;
  592. if (this->state_.compare_exchange_strong(expected, state_t::stopped))
  593. {
  594. this->derived()._fire_stop();
  595. // call CRTP polymorphic stop
  596. this->derived()._handle_stop(ec, std::move(this_ptr), std::move(chain));
  597. }
  598. else
  599. {
  600. ASIO2_ASSERT(false);
  601. }
  602. }, chain.move_guard());
  603. }
  604. template<typename DeferEvent>
  605. inline void _handle_stop(const error_code& ec, std::shared_ptr<derived_t> this_ptr, DeferEvent chain)
  606. {
  607. derived_t& derive = this->derived();
  608. detail::ignore_unused(ec, this_ptr, chain);
  609. ASIO2_ASSERT(derive.io_->running_in_this_thread());
  610. // close user custom timers
  611. derive._dispatch_stop_all_timers();
  612. // close all posted timed tasks
  613. derive._dispatch_stop_all_timed_events();
  614. // close all async_events
  615. derive.notify_all_condition_events();
  616. //derive._socks5_stop();
  617. error_code ec_ignore{};
  618. // call socket's close function to notify the _handle_recv function
  619. // response with error > 0 ,then the socket can get notify to exit
  620. // Call shutdown() to indicate that you will not write any more data to the socket.
  621. derive.socket().shutdown(asio::socket_base::shutdown_both, ec_ignore);
  622. derive.socket().cancel(ec_ignore);
  623. // Call close,otherwise the _handle_recv will never return
  624. derive.socket().close(ec_ignore);
  625. // clear recv buffer
  626. derive.buffer().consume(derive.buffer().size());
  627. // destroy user data, maybe the user data is self shared_ptr,
  628. // if don't destroy it, will cause loop reference.
  629. derive.user_data_.reset();
  630. // destroy the ecs
  631. derive.ecs_.reset();
  632. //
  633. derive.reset_life_id();
  634. }
  635. protected:
  636. template<class Endpoint, class Data, class Callback>
  637. inline bool _do_send(Endpoint& endpoint, Data& data, Callback&& callback)
  638. {
  639. return this->derived()._udp_send_to(endpoint, data, std::forward<Callback>(callback));
  640. }
  641. protected:
  642. template<typename C>
  643. void _post_recv(std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  644. {
  645. if (!this->is_started())
  646. {
  647. if (this->derived().state_ == state_t::started)
  648. {
  649. this->derived()._do_stop(asio2::get_last_error(), std::move(this_ptr));
  650. }
  651. return;
  652. }
  653. #if defined(_DEBUG) || defined(DEBUG)
  654. ASIO2_ASSERT(this->derived().post_recv_counter_.load() == 0);
  655. this->derived().post_recv_counter_++;
  656. #endif
  657. this->socket().async_receive_from(
  658. this->buffer_.prepare(this->buffer_.pre_size()),
  659. this->remote_endpoint_,
  660. make_allocator(this->rallocator_,
  661. [this, this_ptr = std::move(this_ptr), ecs = std::move(ecs)]
  662. (const error_code& ec, std::size_t bytes_recvd) mutable
  663. {
  664. #if defined(_DEBUG) || defined(DEBUG)
  665. this->derived().post_recv_counter_--;
  666. #endif
  667. this->derived()._handle_recv(ec, bytes_recvd, std::move(this_ptr), std::move(ecs));
  668. }));
  669. }
  670. template<typename C>
  671. void _handle_recv(
  672. const error_code& ec, std::size_t bytes_recvd,
  673. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  674. {
  675. set_last_error(ec);
  676. if (!this->derived().is_started())
  677. {
  678. if (this->derived().state_ == state_t::started)
  679. {
  680. this->derived()._do_stop(ec, std::move(this_ptr));
  681. }
  682. return;
  683. }
  684. if (ec == asio::error::operation_aborted)
  685. {
  686. this->derived()._do_stop(ec, std::move(this_ptr));
  687. return;
  688. }
  689. this->buffer_.commit(bytes_recvd);
  690. if (!ec)
  691. {
  692. this->derived()._fire_recv(this_ptr, ecs,
  693. std::string_view(static_cast<std::string_view::const_pointer>(
  694. this->buffer_.data().data()), bytes_recvd));
  695. }
  696. this->buffer_.consume(this->buffer_.size());
  697. if (bytes_recvd == this->buffer_.pre_size())
  698. {
  699. this->buffer_.pre_size((std::min)(this->buffer_.pre_size() * 2, this->buffer_.max_size()));
  700. }
  701. this->derived()._post_recv(std::move(this_ptr), std::move(ecs));
  702. }
  703. inline void _fire_init()
  704. {
  705. // the _fire_init must be executed in the thread 0.
  706. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  707. ASIO2_ASSERT(!get_last_error());
  708. this->listener_.notify(event_type::init);
  709. }
  710. template<typename C>
  711. inline void _fire_recv(
  712. std::shared_ptr<derived_t>& this_ptr, std::shared_ptr<ecs_t<C>>& ecs, std::string_view data)
  713. {
  714. detail::ignore_unused(this_ptr, ecs);
  715. data = detail::call_data_filter_before_recv(this->derived(), data);
  716. this->listener_.notify(event_type::recv, this->remote_endpoint_, data);
  717. }
  718. inline void _fire_start()
  719. {
  720. // the _fire_start must be executed in the thread 0.
  721. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  722. #if defined(_DEBUG) || defined(DEBUG)
  723. ASIO2_ASSERT(this->is_stop_called_ == false);
  724. #endif
  725. this->listener_.notify(event_type::start);
  726. }
  727. inline void _fire_stop()
  728. {
  729. // the _fire_stop must be executed in the thread 0.
  730. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  731. #if defined(_DEBUG) || defined(DEBUG)
  732. this->is_stop_called_ = true;
  733. #endif
  734. this->listener_.notify(event_type::stop);
  735. }
  736. public:
  737. /**
  738. * @brief get the buffer object reference
  739. */
  740. inline buffer_wrap<buffer_type> & buffer() noexcept { return this->buffer_; }
  741. protected:
  742. /**
  743. * @brief get the recv/read allocator object reference
  744. */
  745. inline auto & rallocator() noexcept { return this->rallocator_; }
  746. /**
  747. * @brief get the send/write allocator object reference
  748. */
  749. inline auto & wallocator() noexcept { return this->wallocator_; }
  750. inline const char* life_id () noexcept { return this->life_id_.get(); }
  751. inline void reset_life_id () noexcept { this->life_id_ = std::make_unique<char>(); }
  752. protected:
  753. /// The memory to use for handler-based custom memory allocation. used fo recv/read.
  754. handler_memory<std::true_type , assizer<args_t>> rallocator_;
  755. /// The memory to use for handler-based custom memory allocation. used fo send/write.
  756. handler_memory<std::false_type, assizer<args_t>> wallocator_;
  757. /// listener
  758. listener_t listener_;
  759. /// buffer
  760. buffer_wrap<buffer_type> buffer_;
  761. /// state
  762. std::atomic<state_t> state_ = state_t::stopped;
  763. /// the pointer of ecs_t
  764. std::shared_ptr<ecs_base> ecs_;
  765. /// @see client life id
  766. std::unique_ptr<char> life_id_ = std::make_unique<char>();
  767. #if defined(_DEBUG) || defined(DEBUG)
  768. bool is_stop_called_ = false;
  769. std::atomic<int> post_send_counter_ = 0;
  770. std::atomic<int> post_recv_counter_ = 0;
  771. #endif
  772. };
  773. }
  774. namespace asio2
  775. {
  776. using udp_cast_args = detail::template_args_udp_cast;
  777. template<class derived_t, class args_t>
  778. using udp_cast_impl_t = detail::udp_cast_impl_t<derived_t, args_t>;
  779. /**
  780. * udp unicast/multicast/broadcast
  781. * @throws constructor maybe throw exception "Too many open files" (exception code : 24)
  782. * asio::error::no_descriptors - Too many open files
  783. */
  784. template<class derived_t>
  785. class udp_cast_t : public detail::udp_cast_impl_t<derived_t, detail::template_args_udp_cast>
  786. {
  787. public:
  788. using detail::udp_cast_impl_t<derived_t, detail::template_args_udp_cast>::udp_cast_impl_t;
  789. };
  790. /*
  791. * udp unicast/multicast/broadcast
  792. * If this object is created as a shared_ptr like std::shared_ptr<asio2::udp_cast> cast;
  793. * you must call the cast->stop() manual when exit, otherwise maybe cause memory leaks.
  794. * @throws constructor maybe throw exception "Too many open files" (exception code : 24)
  795. * asio::error::no_descriptors - Too many open files
  796. */
  797. class udp_cast : public udp_cast_t<udp_cast>
  798. {
  799. public:
  800. using udp_cast_t<udp_cast>::udp_cast_t;
  801. };
  802. }
  803. #include <asio2/base/detail/pop_options.hpp>
  804. #endif // !__ASIO2_UDP_CAST_HPP__