udp_client.hpp 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. */
  10. #ifndef __ASIO2_UDP_CLIENT_HPP__
  11. #define __ASIO2_UDP_CLIENT_HPP__
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. #pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <asio2/base/detail/push_options.hpp>
  16. #include <asio2/base/client.hpp>
  17. #include <asio2/base/detail/linear_buffer.hpp>
  18. #include <asio2/udp/detail/kcp_util.hpp>
  19. #include <asio2/udp/impl/udp_send_op.hpp>
  20. #include <asio2/udp/impl/udp_recv_op.hpp>
  21. #include <asio2/udp/impl/kcp_stream_cp.hpp>
  22. #include <asio2/proxy/socks5_client.hpp>
  23. namespace asio2::detail
  24. {
  25. struct template_args_udp_client : public udp_tag
  26. {
  27. static constexpr bool is_session = false;
  28. static constexpr bool is_client = true;
  29. static constexpr bool is_server = false;
  30. using socket_t = asio::ip::udp::socket;
  31. using buffer_t = asio2::linear_buffer;
  32. using send_data_t = std::string_view;
  33. using recv_data_t = std::string_view;
  34. template<class derived_t>
  35. using socks5_client_t = asio2::socks5_tcp_client_t<derived_t>;
  36. static constexpr std::size_t allocator_storage_size = 256;
  37. };
  38. ASIO2_CLASS_FORWARD_DECLARE_BASE;
  39. ASIO2_CLASS_FORWARD_DECLARE_UDP_BASE;
  40. ASIO2_CLASS_FORWARD_DECLARE_UDP_CLIENT;
  41. template<class derived_t, class args_t = template_args_udp_client>
  42. class udp_client_impl_t
  43. : public client_impl_t<derived_t, args_t>
  44. , public udp_send_op <derived_t, args_t>
  45. , public udp_recv_op <derived_t, args_t>
  46. , public udp_tag
  47. {
  48. ASIO2_CLASS_FRIEND_DECLARE_BASE;
  49. ASIO2_CLASS_FRIEND_DECLARE_UDP_BASE;
  50. ASIO2_CLASS_FRIEND_DECLARE_UDP_CLIENT;
  51. public:
  52. using super = client_impl_t <derived_t, args_t>;
  53. using self = udp_client_impl_t<derived_t, args_t>;
  54. using args_type = args_t;
  55. using buffer_type = typename args_t::buffer_t;
  56. using send_data_t = typename args_t::send_data_t;
  57. using recv_data_t = typename args_t::recv_data_t;
  58. public:
  59. /**
  60. * @brief constructor
  61. */
  62. explicit udp_client_impl_t(
  63. std::size_t init_buf_size = udp_frame_size,
  64. std::size_t max_buf_size = max_buffer_size,
  65. std::size_t concurrency = 1
  66. )
  67. : super(init_buf_size, max_buf_size, concurrency)
  68. , udp_send_op<derived_t, args_t>()
  69. , udp_recv_op<derived_t, args_t>()
  70. {
  71. this->set_connect_timeout(std::chrono::milliseconds(udp_connect_timeout));
  72. }
  73. template<class Scheduler, std::enable_if_t<!std::is_integral_v<detail::remove_cvref_t<Scheduler>>, int> = 0>
  74. explicit udp_client_impl_t(
  75. std::size_t init_buf_size,
  76. std::size_t max_buf_size,
  77. Scheduler && scheduler
  78. )
  79. : super(init_buf_size, max_buf_size, std::forward<Scheduler>(scheduler))
  80. , udp_send_op<derived_t, args_t>()
  81. , udp_recv_op<derived_t, args_t>()
  82. {
  83. this->set_connect_timeout(std::chrono::milliseconds(udp_connect_timeout));
  84. }
  85. template<class Scheduler, std::enable_if_t<!std::is_integral_v<detail::remove_cvref_t<Scheduler>>, int> = 0>
  86. explicit udp_client_impl_t(Scheduler && scheduler)
  87. : udp_client_impl_t(udp_frame_size, max_buffer_size, std::forward<Scheduler>(scheduler))
  88. {
  89. }
  90. /**
  91. * @brief destructor
  92. */
  93. ~udp_client_impl_t()
  94. {
  95. this->stop();
  96. }
  97. /**
  98. * @brief start the client, blocking connect to server
  99. * @param host - A string identifying a location. May be a descriptive name or
  100. * a numeric address string.
  101. * @param port - A string identifying the requested service. This may be a
  102. * descriptive name or a numeric string corresponding to a port number.
  103. */
  104. template<typename String, typename StrOrInt, typename... Args>
  105. inline bool start(String&& host, StrOrInt&& port, Args&&... args)
  106. {
  107. return this->derived().template _do_connect<false>(
  108. std::forward<String>(host), std::forward<StrOrInt>(port),
  109. ecs_helper::make_ecs('0', std::forward<Args>(args)...));
  110. }
  111. /**
  112. * @brief start the client, asynchronous connect to server
  113. * @param host - A string identifying a location. May be a descriptive name or
  114. * a numeric address string.
  115. * @param port - A string identifying the requested service. This may be a
  116. * descriptive name or a numeric string corresponding to a port number.
  117. */
  118. template<typename String, typename StrOrInt, typename... Args>
  119. inline bool async_start(String&& host, StrOrInt&& port, Args&&... args)
  120. {
  121. return this->derived().template _do_connect<true>(
  122. std::forward<String>(host), std::forward<StrOrInt>(port),
  123. ecs_helper::make_ecs('0', std::forward<Args>(args)...));
  124. }
  125. /**
  126. * @brief stop the client
  127. * You can call this function in the communication thread and anywhere to stop the client.
  128. * If this function is called in the communication thread, it will post a asynchronous
  129. * event into the event queue, then return immediately.
  130. * If this function is called not in the communication thread, it will blocking forever
  131. * util the client is stopped completed.
  132. */
  133. inline void stop()
  134. {
  135. if (this->is_iopool_stopped())
  136. return;
  137. derived_t& derive = this->derived();
  138. derive.io_->unregobj(&derive);
  139. // use promise to get the result of stop
  140. std::promise<state_t> promise;
  141. std::future<state_t> future = promise.get_future();
  142. // use derfer to ensure the promise's value must be seted.
  143. detail::defer_event pg
  144. {
  145. [this, p = std::move(promise)]() mutable
  146. {
  147. p.set_value(this->state_.load());
  148. }
  149. };
  150. // if user call stop in the recv callback, use post event to executed a async event.
  151. derive.post_event([&derive, this_ptr = derive.selfptr(), pg = std::move(pg)]
  152. (event_queue_guard<derived_t> g) mutable
  153. {
  154. // first close the reconnect timer
  155. derive._stop_reconnect_timer();
  156. derive._do_disconnect(asio::error::operation_aborted, derive.selfptr(), defer_event
  157. {
  158. [&derive, this_ptr = std::move(this_ptr), pg = std::move(pg)]
  159. (event_queue_guard<derived_t> g) mutable
  160. {
  161. derive._do_stop(asio::error::operation_aborted, std::move(this_ptr), defer_event
  162. {
  163. [pg = std::move(pg)](event_queue_guard<derived_t> g) mutable
  164. {
  165. detail::ignore_unused(pg, g);
  166. // the "pg" should destroyed before the "g", otherwise if the "g"
  167. // is destroyed before "pg", the next event maybe called, then the
  168. // state maybe change to not stopped.
  169. {
  170. [[maybe_unused]] detail::defer_event t{ std::move(pg) };
  171. }
  172. }, std::move(g)
  173. });
  174. }, std::move(g)
  175. });
  176. });
  177. while (!derive.running_in_this_thread())
  178. {
  179. std::future_status status = future.wait_for(std::chrono::milliseconds(100));
  180. if (status == std::future_status::ready)
  181. {
  182. ASIO2_ASSERT(future.get() == state_t::stopped);
  183. break;
  184. }
  185. else
  186. {
  187. if (derive.get_thread_id() == std::thread::id{})
  188. break;
  189. if (derive.io_->context().stopped())
  190. break;
  191. }
  192. }
  193. this->stop_iopool();
  194. }
  195. public:
  196. /**
  197. * @brief get the kcp stream component
  198. */
  199. inline kcp_stream_cp<derived_t, args_t>* get_kcp_stream() noexcept
  200. {
  201. return this->kcp_stream_.get();
  202. }
  203. /**
  204. * @brief get the kcp stream component
  205. */
  206. inline const kcp_stream_cp<derived_t, args_t>* get_kcp_stream() const noexcept
  207. {
  208. return this->kcp_stream_.get();
  209. }
  210. /**
  211. * @brief get the kcp pointer, just used for kcp mode
  212. * default mode : ikcp_nodelay(kcp, 0, 10, 0, 0);
  213. * generic mode : ikcp_nodelay(kcp, 0, 10, 0, 1);
  214. * fast mode : ikcp_nodelay(kcp, 1, 10, 2, 1);
  215. */
  216. inline kcp::ikcpcb* get_kcp() noexcept
  217. {
  218. return (this->kcp_stream_ ? this->kcp_stream_->kcp_ : nullptr);
  219. }
  220. /**
  221. * @brief get the kcp pointer, just used for kcp mode
  222. * default mode : ikcp_nodelay(kcp, 0, 10, 0, 0);
  223. * generic mode : ikcp_nodelay(kcp, 0, 10, 0, 1);
  224. * fast mode : ikcp_nodelay(kcp, 1, 10, 2, 1);
  225. */
  226. inline const kcp::ikcpcb* get_kcp() const noexcept
  227. {
  228. return (this->kcp_stream_ ? this->kcp_stream_->kcp_ : nullptr);
  229. }
  230. /**
  231. * @brief get the kcp pointer, just used for kcp mode. same as get_kcp
  232. * default mode : ikcp_nodelay(kcp, 0, 10, 0, 0);
  233. * generic mode : ikcp_nodelay(kcp, 0, 10, 0, 1);
  234. * fast mode : ikcp_nodelay(kcp, 1, 10, 2, 1);
  235. */
  236. inline kcp::ikcpcb* kcp() noexcept
  237. {
  238. return this->get_kcp();
  239. }
  240. /**
  241. * @brief get the kcp pointer, just used for kcp mode. same as get_kcp
  242. * default mode : ikcp_nodelay(kcp, 0, 10, 0, 0);
  243. * generic mode : ikcp_nodelay(kcp, 0, 10, 0, 1);
  244. * fast mode : ikcp_nodelay(kcp, 1, 10, 2, 1);
  245. */
  246. inline const kcp::ikcpcb* kcp() const noexcept
  247. {
  248. return this->get_kcp();
  249. }
  250. /**
  251. * @brief Provide a kcp conv, If the conv is not provided, it will be generated by the server.
  252. */
  253. inline derived_t& set_kcp_conv(std::uint32_t conv)
  254. {
  255. this->kcp_conv_ = conv;
  256. return (this->derived());
  257. }
  258. public:
  259. /**
  260. * @brief bind recv listener
  261. * @param fun - a user defined callback function.
  262. * @param obj - a pointer or reference to a class object, this parameter can be none.
  263. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  264. * the class object's pointer or reference.
  265. * Function signature : void(std::string_view data)
  266. */
  267. template<class F, class ...C>
  268. inline derived_t & bind_recv(F&& fun, C&&... obj)
  269. {
  270. this->listener_.bind(event_type::recv,
  271. observer_t<std::string_view>(std::forward<F>(fun), std::forward<C>(obj)...));
  272. return (this->derived());
  273. }
  274. /**
  275. * @brief bind connect listener
  276. * @param fun - a user defined callback function.
  277. * @param obj - a pointer or reference to a class object, this parameter can be none.
  278. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  279. * the class object's pointer or reference.
  280. * This notification is called after the client connection completed, whether successful or unsuccessful
  281. * Function signature : void()
  282. */
  283. template<class F, class ...C>
  284. inline derived_t & bind_connect(F&& fun, C&&... obj)
  285. {
  286. this->listener_.bind(event_type::connect,
  287. observer_t<>(std::forward<F>(fun), std::forward<C>(obj)...));
  288. return (this->derived());
  289. }
  290. /**
  291. * @brief bind disconnect listener
  292. * @param fun - a user defined callback function.
  293. * @param obj - a pointer or reference to a class object, this parameter can be none.
  294. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  295. * the class object's pointer or reference.
  296. * This notification is called before the client is ready to disconnect
  297. * Function signature : void()
  298. */
  299. template<class F, class ...C>
  300. inline derived_t & bind_disconnect(F&& fun, C&&... obj)
  301. {
  302. this->listener_.bind(event_type::disconnect,
  303. observer_t<>(std::forward<F>(fun), std::forward<C>(obj)...));
  304. return (this->derived());
  305. }
  306. /**
  307. * @brief bind init listener,we should set socket options at here
  308. * @param fun - a user defined callback function.
  309. * @param obj - a pointer or reference to a class object, this parameter can be none.
  310. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  311. * the class object's pointer or reference.
  312. * Function signature : void()
  313. */
  314. template<class F, class ...C>
  315. inline derived_t & bind_init(F&& fun, C&&... obj)
  316. {
  317. this->listener_.bind(event_type::init,
  318. observer_t<>(std::forward<F>(fun), std::forward<C>(obj)...));
  319. return (this->derived());
  320. }
  321. /**
  322. * @brief bind kcp handshake listener, just used fo kcp mode
  323. * @param fun - a user defined callback function.
  324. * @param obj - a pointer or reference to a class object, this parameter can be none.
  325. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  326. * the class object's pointer or reference.
  327. * Function signature : void()
  328. */
  329. template<class F, class ...C>
  330. inline derived_t & bind_handshake(F&& fun, C&&... obj)
  331. {
  332. this->listener_.bind(event_type::handshake,
  333. observer_t<>(std::forward<F>(fun), std::forward<C>(obj)...));
  334. return (this->derived());
  335. }
  336. protected:
  337. template<bool IsAsync, typename String, typename StrOrInt, typename C>
  338. inline bool _do_connect(String&& host, StrOrInt&& port, std::shared_ptr<ecs_t<C>> ecs)
  339. {
  340. derived_t& derive = this->derived();
  341. // if log is enabled, init the log first, otherwise when "Too many open files" error occurs,
  342. // the log file will be created failed too.
  343. #if defined(ASIO2_ENABLE_LOG)
  344. asio2::detail::get_logger();
  345. #endif
  346. this->start_iopool();
  347. if (!this->is_iopool_started())
  348. {
  349. set_last_error(asio::error::operation_aborted);
  350. return false;
  351. }
  352. asio::dispatch(derive.io_->context(), [&derive, this_ptr = derive.selfptr()]() mutable
  353. {
  354. detail::ignore_unused(this_ptr);
  355. // init the running thread id
  356. derive.io_->init_thread_id();
  357. });
  358. // use promise to get the result of async connect
  359. std::promise<error_code> promise;
  360. std::future<error_code> future = promise.get_future();
  361. // use derfer to ensure the promise's value must be seted.
  362. detail::defer_event pg
  363. {
  364. [promise = std::move(promise)]() mutable
  365. {
  366. promise.set_value(get_last_error());
  367. }
  368. };
  369. // if user call start in the recv callback, use post event to executed a async event.
  370. derive.post_event(
  371. [this, this_ptr = derive.selfptr(), ecs = std::move(ecs), pg = std::move(pg),
  372. host = std::forward<String>(host), port = std::forward<StrOrInt>(port)]
  373. (event_queue_guard<derived_t> g) mutable
  374. {
  375. derived_t& derive = this->derived();
  376. defer_event chain
  377. {
  378. [pg = std::move(pg)] (event_queue_guard<derived_t> g) mutable
  379. {
  380. detail::ignore_unused(pg, g);
  381. // the "pg" should destroyed before the "g", otherwise if the "g"
  382. // is destroyed before "pg", the next event maybe called, then the
  383. // state maybe change to not stopped.
  384. {
  385. [[maybe_unused]] detail::defer_event t{ std::move(pg) };
  386. }
  387. }, std::move(g)
  388. };
  389. state_t expected = state_t::stopped;
  390. if (!derive.state_.compare_exchange_strong(expected, state_t::starting))
  391. {
  392. // if the state is not stopped, set the last error to already_started
  393. set_last_error(asio::error::already_started);
  394. return;
  395. }
  396. // must read/write ecs in the io_context thread.
  397. derive.ecs_ = ecs;
  398. clear_last_error();
  399. derive.io_->regobj(&derive);
  400. #if defined(_DEBUG) || defined(DEBUG)
  401. this->is_stop_reconnect_timer_called_ = false;
  402. this->is_post_reconnect_timer_called_ = false;
  403. this->is_stop_connect_timeout_timer_called_ = false;
  404. this->is_disconnect_called_ = false;
  405. #endif
  406. // convert to string maybe throw some exception.
  407. this->host_ = detail::to_string(std::move(host));
  408. this->port_ = detail::to_string(std::move(port));
  409. super::start();
  410. derive._do_init(ecs);
  411. // ecs init
  412. derive._rdc_init(ecs);
  413. derive._socks5_init(ecs);
  414. derive.template _start_connect<IsAsync>(std::move(this_ptr), std::move(ecs), std::move(chain));
  415. });
  416. if constexpr (IsAsync)
  417. {
  418. set_last_error(asio::error::in_progress);
  419. return true;
  420. }
  421. else
  422. {
  423. if (!derive.io_->running_in_this_thread())
  424. {
  425. set_last_error(future.get());
  426. return static_cast<bool>(!get_last_error());
  427. }
  428. else
  429. {
  430. set_last_error(asio::error::in_progress);
  431. }
  432. // if the state is stopped , the return value is "is_started()".
  433. // if the state is stopping, the return value is false, the last error is already_started
  434. // if the state is starting, the return value is false, the last error is already_started
  435. // if the state is started , the return value is true , the last error is already_started
  436. return derive.is_started();
  437. }
  438. }
  439. template<typename C>
  440. inline void _do_init(std::shared_ptr<ecs_t<C>>&)
  441. {
  442. if constexpr (std::is_same_v<typename ecs_t<C>::condition_lowest_type, use_kcp_t>)
  443. this->kcp_stream_ = std::make_unique<kcp_stream_cp<derived_t, args_t>>(
  444. this->derived(), this->io_->context());
  445. else
  446. this->kcp_stream_.reset();
  447. }
  448. template<typename C, typename DeferEvent>
  449. inline void _handle_connect(
  450. const error_code& ec,
  451. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
  452. {
  453. set_last_error(ec);
  454. derived_t& derive = static_cast<derived_t&>(*this);
  455. if (ec)
  456. return derive._done_connect(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  457. if constexpr (std::is_same_v<typename ecs_t<C>::condition_lowest_type, use_kcp_t>)
  458. this->kcp_stream_->_post_handshake(std::move(this_ptr), std::move(ecs), std::move(chain));
  459. else
  460. derive._done_connect(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  461. }
  462. template<typename C, typename DeferEvent>
  463. inline void _do_start(
  464. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
  465. {
  466. this->derived().update_alive_time();
  467. this->derived().reset_connect_time();
  468. if constexpr (std::is_same_v<typename ecs_t<C>::condition_lowest_type, use_kcp_t>)
  469. {
  470. ASIO2_ASSERT(this->kcp_stream_);
  471. if (this->kcp_stream_)
  472. this->kcp_stream_->send_fin_ = true;
  473. }
  474. this->derived()._start_recv(std::move(this_ptr), std::move(ecs), std::move(chain));
  475. }
  476. template<typename DeferEvent>
  477. inline void _handle_disconnect(const error_code& ec, std::shared_ptr<derived_t> this_ptr, DeferEvent chain)
  478. {
  479. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  480. ASIO2_ASSERT(this->state_ == state_t::stopped);
  481. this->derived()._rdc_stop();
  482. if (this->kcp_stream_)
  483. this->kcp_stream_->_kcp_stop();
  484. error_code ec_ignore{};
  485. // call socket's close function to notify the _handle_recv function response with
  486. // error > 0 ,then the socket can get notify to exit
  487. // Call shutdown() to indicate that you will not write any more data to the socket.
  488. this->socket().shutdown(asio::socket_base::shutdown_both, ec_ignore);
  489. this->socket().cancel(ec_ignore);
  490. // Call close,otherwise the _handle_recv will never return
  491. this->socket().close(ec_ignore);
  492. super::_handle_disconnect(ec, std::move(this_ptr), std::move(chain));
  493. }
  494. template<typename DeferEvent>
  495. inline void _do_stop(const error_code& ec, std::shared_ptr<derived_t> this_ptr, DeferEvent chain)
  496. {
  497. ASIO2_ASSERT(this->state_ == state_t::stopped);
  498. this->derived()._post_stop(ec, std::move(this_ptr), std::move(chain));
  499. }
  500. template<typename DeferEvent>
  501. inline void _post_stop(const error_code& ec, std::shared_ptr<derived_t> this_ptr, DeferEvent chain)
  502. {
  503. // All pending sending events will be cancelled after enter the callback below.
  504. this->derived().disp_event(
  505. [this, ec, this_ptr = std::move(this_ptr), e = chain.move_event()]
  506. (event_queue_guard<derived_t> g) mutable
  507. {
  508. set_last_error(ec);
  509. defer_event chain(std::move(e), std::move(g));
  510. // call the base class stop function
  511. super::stop();
  512. // call CRTP polymorphic stop
  513. this->derived()._handle_stop(ec, std::move(this_ptr), std::move(chain));
  514. }, chain.move_guard());
  515. }
  516. template<typename DeferEvent>
  517. inline void _handle_stop(const error_code& ec, std::shared_ptr<derived_t> this_ptr, DeferEvent chain)
  518. {
  519. detail::ignore_unused(ec, this_ptr, chain);
  520. this->derived()._socks5_stop();
  521. ASIO2_ASSERT(this->state_ == state_t::stopped);
  522. }
  523. template<typename C, typename DeferEvent>
  524. inline void _start_recv(
  525. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
  526. {
  527. // Connect succeeded. post recv request.
  528. asio::dispatch(this->derived().io_->context(), make_allocator(this->derived().wallocator(),
  529. [this, this_ptr = std::move(this_ptr), ecs = std::move(ecs), chain = std::move(chain)]
  530. () mutable
  531. {
  532. detail::ignore_unused(chain);
  533. this->derived().buffer().consume(this->derived().buffer().size());
  534. this->derived()._post_recv(std::move(this_ptr), std::move(ecs));
  535. }));
  536. }
  537. template<class Data, class Callback>
  538. inline bool _do_send(Data& data, Callback&& callback)
  539. {
  540. if (!this->kcp_stream_)
  541. return this->derived()._udp_send(data, std::forward<Callback>(callback));
  542. return this->kcp_stream_->_kcp_send(data, std::forward<Callback>(callback));
  543. }
  544. template<class Data>
  545. inline send_data_t _rdc_convert_to_send_data(Data& data) noexcept
  546. {
  547. auto buffer = asio::buffer(data);
  548. return send_data_t{ reinterpret_cast<
  549. std::string_view::const_pointer>(buffer.data()),buffer.size() };
  550. }
  551. template<class Invoker>
  552. inline void _rdc_invoke_with_none(const error_code& ec, Invoker& invoker)
  553. {
  554. if (invoker)
  555. invoker(ec, send_data_t{}, recv_data_t{});
  556. }
  557. template<class Invoker>
  558. inline void _rdc_invoke_with_recv(const error_code& ec, Invoker& invoker, recv_data_t data)
  559. {
  560. if (invoker)
  561. invoker(ec, send_data_t{}, data);
  562. }
  563. template<class Invoker>
  564. inline void _rdc_invoke_with_send(const error_code& ec, Invoker& invoker, send_data_t data)
  565. {
  566. if (invoker)
  567. invoker(ec, data, recv_data_t{});
  568. }
  569. protected:
  570. template<typename C>
  571. inline void _post_recv(std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  572. {
  573. this->derived()._udp_post_recv(std::move(this_ptr), std::move(ecs));
  574. }
  575. template<typename C>
  576. inline void _handle_recv(
  577. const error_code& ec, std::size_t bytes_recvd,
  578. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  579. {
  580. this->derived()._udp_handle_recv(ec, bytes_recvd, this_ptr, ecs);
  581. }
  582. inline void _fire_init()
  583. {
  584. // the _fire_init must be executed in the thread 0.
  585. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  586. ASIO2_ASSERT(!get_last_error());
  587. this->listener_.notify(event_type::init);
  588. }
  589. template<typename C>
  590. inline void _fire_recv(
  591. std::shared_ptr<derived_t>& this_ptr, std::shared_ptr<ecs_t<C>>& ecs, std::string_view data)
  592. {
  593. data = detail::call_data_filter_before_recv(this->derived(), data);
  594. this->listener_.notify(event_type::recv, data);
  595. this->derived()._rdc_handle_recv(this_ptr, ecs, data);
  596. }
  597. inline void _fire_handshake(std::shared_ptr<derived_t>& this_ptr)
  598. {
  599. // the _fire_handshake must be executed in the thread 0.
  600. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  601. detail::ignore_unused(this_ptr);
  602. this->listener_.notify(event_type::handshake);
  603. }
  604. template<typename C>
  605. inline void _fire_connect(std::shared_ptr<derived_t>& this_ptr, std::shared_ptr<ecs_t<C>>& ecs)
  606. {
  607. // the _fire_connect must be executed in the thread 0.
  608. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  609. #if defined(_DEBUG) || defined(DEBUG)
  610. ASIO2_ASSERT(this->is_disconnect_called_ == false);
  611. #endif
  612. if (!get_last_error())
  613. {
  614. this->derived()._rdc_start(this_ptr, ecs);
  615. }
  616. this->listener_.notify(event_type::connect);
  617. }
  618. inline void _fire_disconnect(std::shared_ptr<derived_t>& this_ptr)
  619. {
  620. // the _fire_disconnect must be executed in the thread 0.
  621. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  622. #if defined(_DEBUG) || defined(DEBUG)
  623. this->is_disconnect_called_ = true;
  624. #endif
  625. detail::ignore_unused(this_ptr);
  626. this->listener_.notify(event_type::disconnect);
  627. }
  628. protected:
  629. std::unique_ptr<kcp_stream_cp<derived_t, args_t>> kcp_stream_;
  630. std::uint32_t kcp_conv_ = 0;
  631. #if defined(_DEBUG) || defined(DEBUG)
  632. bool is_disconnect_called_ = false;
  633. #endif
  634. };
  635. }
  636. namespace asio2
  637. {
  638. using udp_client_args = detail::template_args_udp_client;
  639. template<class derived_t, class args_t>
  640. using udp_client_impl_t = detail::udp_client_impl_t<derived_t, args_t>;
  641. /**
  642. * @throws constructor maybe throw exception "Too many open files" (exception code : 24)
  643. * asio::error::no_descriptors - Too many open files
  644. */
  645. template<class derived_t>
  646. class udp_client_t : public detail::udp_client_impl_t<derived_t, detail::template_args_udp_client>
  647. {
  648. public:
  649. using detail::udp_client_impl_t<derived_t, detail::template_args_udp_client>::udp_client_impl_t;
  650. };
  651. /**
  652. * If this object is created as a shared_ptr like std::shared_ptr<asio2::udp_client> client;
  653. * you must call the client->stop() manual when exit, otherwise maybe cause memory leaks.
  654. * @throws constructor maybe throw exception "Too many open files" (exception code : 24)
  655. * asio::error::no_descriptors - Too many open files
  656. */
  657. class udp_client : public udp_client_t<udp_client>
  658. {
  659. public:
  660. using udp_client_t<udp_client>::udp_client_t;
  661. };
  662. }
  663. #include <asio2/base/detail/pop_options.hpp>
  664. #endif // !__ASIO2_UDP_CLIENT_HPP__