udp_session.hpp 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. */
  10. #ifndef __ASIO2_UDP_SESSION_HPP__
  11. #define __ASIO2_UDP_SESSION_HPP__
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. #pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <asio2/base/detail/push_options.hpp>
  16. #include <asio2/base/session.hpp>
  17. #include <asio2/base/detail/linear_buffer.hpp>
  18. #include <asio2/udp/detail/kcp_util.hpp>
  19. #include <asio2/udp/impl/udp_send_op.hpp>
  20. #include <asio2/udp/impl/udp_recv_op.hpp>
  21. #include <asio2/udp/impl/kcp_stream_cp.hpp>
  22. namespace asio2::detail
  23. {
  24. struct template_args_udp_session : public udp_tag
  25. {
  26. static constexpr bool is_session = true;
  27. static constexpr bool is_client = false;
  28. static constexpr bool is_server = false;
  29. using socket_t = asio::ip::udp::socket;
  30. using buffer_t = detail::proxy_buffer<asio2::linear_buffer>;
  31. using send_data_t = std::string_view;
  32. using recv_data_t = std::string_view;
  33. static constexpr std::size_t allocator_storage_size = 256;
  34. };
  35. ASIO2_CLASS_FORWARD_DECLARE_BASE;
  36. ASIO2_CLASS_FORWARD_DECLARE_UDP_BASE;
  37. ASIO2_CLASS_FORWARD_DECLARE_UDP_SERVER;
  38. ASIO2_CLASS_FORWARD_DECLARE_UDP_SESSION;
  39. template<class derived_t, class args_t = template_args_udp_session>
  40. class udp_session_impl_t
  41. : public session_impl_t<derived_t, args_t>
  42. , public udp_send_op <derived_t, args_t>
  43. , public udp_recv_op <derived_t, args_t>
  44. , public udp_tag
  45. {
  46. ASIO2_CLASS_FRIEND_DECLARE_BASE;
  47. ASIO2_CLASS_FRIEND_DECLARE_UDP_BASE;
  48. ASIO2_CLASS_FRIEND_DECLARE_UDP_SERVER;
  49. ASIO2_CLASS_FRIEND_DECLARE_UDP_SESSION;
  50. public:
  51. using super = session_impl_t <derived_t, args_t>;
  52. using self = udp_session_impl_t<derived_t, args_t>;
  53. using args_type = args_t;
  54. using key_type = asio::ip::udp::endpoint;
  55. using buffer_type = typename args_t::buffer_t;
  56. using send_data_t = typename args_t::send_data_t;
  57. using recv_data_t = typename args_t::recv_data_t;
  58. public:
  59. /**
  60. * @brief constructor
  61. */
  62. explicit udp_session_impl_t(
  63. session_mgr_t<derived_t> & sessions,
  64. listener_t & listener,
  65. std::shared_ptr<io_t> rwio,
  66. std::size_t init_buf_size,
  67. std::size_t max_buf_size,
  68. asio2::linear_buffer & buffer,
  69. std::shared_ptr<typename args_t::socket_t> socket,
  70. asio::ip::udp::endpoint & endpoint
  71. )
  72. : super(sessions, listener, std::move(rwio), init_buf_size, max_buf_size, std::move(socket))
  73. , udp_send_op<derived_t, args_t>()
  74. , udp_recv_op<derived_t, args_t>()
  75. , wallocator_ ()
  76. {
  77. this->remote_endpoint_ = endpoint;
  78. this->buffer_.bind_buffer(&buffer);
  79. this->set_silence_timeout(std::chrono::milliseconds(udp_silence_timeout));
  80. this->set_connect_timeout(std::chrono::milliseconds(udp_connect_timeout));
  81. }
  82. /**
  83. * @brief destructor
  84. */
  85. ~udp_session_impl_t()
  86. {
  87. }
  88. protected:
  89. /**
  90. * @brief start this session for prepare to recv msg
  91. */
  92. template<typename C>
  93. inline void start(std::shared_ptr<ecs_t<C>> ecs)
  94. {
  95. derived_t& derive = this->derived();
  96. #if defined(ASIO2_ENABLE_LOG)
  97. #if defined(ASIO2_ALLOCATOR_STORAGE_SIZE)
  98. static_assert(decltype(wallocator_)::storage_size == ASIO2_ALLOCATOR_STORAGE_SIZE);
  99. #else
  100. static_assert(decltype(wallocator_)::storage_size == args_t::allocator_storage_size);
  101. #endif
  102. #endif
  103. ASIO2_ASSERT(this->sessions_.io_->running_in_this_thread());
  104. ASIO2_ASSERT(this->io_->get_thread_id() != std::thread::id{});
  105. #if defined(_DEBUG) || defined(DEBUG)
  106. this->is_stop_silence_timer_called_ = false;
  107. this->is_stop_connect_timeout_timer_called_ = false;
  108. this->is_disconnect_called_ = false;
  109. #endif
  110. std::shared_ptr<derived_t> this_ptr = derive.selfptr();
  111. state_t expected = state_t::stopped;
  112. if (!this->state_.compare_exchange_strong(expected, state_t::starting))
  113. {
  114. derive._do_disconnect(asio::error::already_started, std::move(this_ptr));
  115. return;
  116. }
  117. // must read/write ecs in the io_context thread.
  118. derive.ecs_ = ecs;
  119. derive._do_init(this_ptr, ecs);
  120. // First call the base class start function
  121. super::start();
  122. // if the ecs has remote data call mode,do some thing.
  123. derive._rdc_init(ecs);
  124. derive.push_event(
  125. [&derive, this_ptr = std::move(this_ptr), ecs = std::move(ecs)]
  126. (event_queue_guard<derived_t> g) mutable
  127. {
  128. derive.sessions_.dispatch(
  129. [&derive, this_ptr, ecs = std::move(ecs), g = std::move(g)]
  130. () mutable
  131. {
  132. derive._handle_connect(
  133. error_code{}, std::move(this_ptr), std::move(ecs), defer_event(std::move(g)));
  134. });
  135. });
  136. }
  137. public:
  138. /**
  139. * @brief stop session
  140. * You can call this function in the communication thread and anywhere to stop the session.
  141. * If this function is called in the communication thread, it will post a asynchronous
  142. * event into the event queue, then return immediately.
  143. * If this function is called not in the communication thread, it will blocking forever
  144. * util the session is stopped completed.
  145. * note : this function must be noblocking if it is called in the communication thread,
  146. * otherwise if it's blocking, maybe cause circle lock.
  147. * If the session stop is called in the server's bind connect callback, then the session
  148. * will can't be added into the session manager, and the session's bind disconnect event
  149. * can't be called also.
  150. */
  151. inline void stop()
  152. {
  153. derived_t& derive = this->derived();
  154. state_t expected = state_t::stopped;
  155. if (this->state_.compare_exchange_strong(expected, state_t::stopped))
  156. return;
  157. expected = state_t::stopping;
  158. if (this->state_.compare_exchange_strong(expected, state_t::stopping))
  159. return;
  160. // use promise to get the result of stop
  161. std::promise<state_t> promise;
  162. std::future<state_t> future = promise.get_future();
  163. // use derfer to ensure the promise's value must be seted.
  164. detail::defer_event pg
  165. {
  166. [this, p = std::move(promise)]() mutable
  167. {
  168. p.set_value(this->state_.load());
  169. }
  170. };
  171. derive.post_event([&derive, this_ptr = derive.selfptr(), pg = std::move(pg)]
  172. (event_queue_guard<derived_t> g) mutable
  173. {
  174. derive._do_disconnect(asio::error::operation_aborted, derive.selfptr(), defer_event
  175. {
  176. [&derive, this_ptr = std::move(this_ptr), pg = std::move(pg)]
  177. (event_queue_guard<derived_t> g) mutable
  178. {
  179. detail::ignore_unused(derive, pg, g);
  180. // the "pg" should destroyed before the "g", otherwise if the "g"
  181. // is destroyed before "pg", the next event maybe called, then the
  182. // state maybe change to not stopped.
  183. {
  184. [[maybe_unused]] detail::defer_event t{ std::move(pg) };
  185. }
  186. }, std::move(g)
  187. });
  188. });
  189. // use this to ensure the client is stopped completed when the stop is called not in the io_context thread
  190. while (!derive.running_in_this_thread() && !derive.sessions_.io_->running_in_this_thread())
  191. {
  192. std::future_status status = future.wait_for(std::chrono::milliseconds(100));
  193. if (status == std::future_status::ready)
  194. {
  195. ASIO2_ASSERT(future.get() == state_t::stopped);
  196. break;
  197. }
  198. else
  199. {
  200. if (derive.get_thread_id() == std::thread::id{})
  201. break;
  202. if (derive.sessions_.io_->get_thread_id() == std::thread::id{})
  203. break;
  204. if (derive.io_->context().stopped())
  205. break;
  206. }
  207. }
  208. }
  209. /**
  210. * @brief check whether the session is stopped
  211. */
  212. inline bool is_stopped() const noexcept
  213. {
  214. return (this->state_ == state_t::stopped);
  215. }
  216. public:
  217. /**
  218. * @brief get the remote address
  219. */
  220. inline std::string get_remote_address() const noexcept
  221. {
  222. try
  223. {
  224. return this->remote_endpoint_.address().to_string();
  225. }
  226. catch (system_error & e) { set_last_error(e); }
  227. return std::string();
  228. }
  229. /**
  230. * @brief get the remote address, same as get_remote_address
  231. */
  232. inline std::string remote_address() const noexcept
  233. {
  234. return this->get_remote_address();
  235. }
  236. /**
  237. * @brief get the remote port
  238. */
  239. inline unsigned short get_remote_port() const noexcept
  240. {
  241. return this->remote_endpoint_.port();
  242. }
  243. /**
  244. * @brief get the remote port, same as get_remote_port
  245. */
  246. inline unsigned short remote_port() const noexcept
  247. {
  248. return this->get_remote_port();
  249. }
  250. /**
  251. * @brief get this object hash key,used for session map
  252. */
  253. inline key_type hash_key() const noexcept
  254. {
  255. // after test, there are a lot of hash collisions for asio::ip::udp::endpoint.
  256. // so the map key can't be the hash result of asio::ip::udp::endpoint, it must
  257. // be the asio::ip::udp::endpoint itself.
  258. return this->remote_endpoint_;
  259. }
  260. /**
  261. * @brief get the kcp stream component
  262. */
  263. inline kcp_stream_cp<derived_t, args_t>* get_kcp_stream() noexcept
  264. {
  265. return this->kcp_stream_.get();
  266. }
  267. /**
  268. * @brief get the kcp stream component
  269. */
  270. inline const kcp_stream_cp<derived_t, args_t>* get_kcp_stream() const noexcept
  271. {
  272. return this->kcp_stream_.get();
  273. }
  274. /**
  275. * @brief get the kcp pointer, just used for kcp mode
  276. * default mode : ikcp_nodelay(kcp, 0, 10, 0, 0);
  277. * generic mode : ikcp_nodelay(kcp, 0, 10, 0, 1);
  278. * fast mode : ikcp_nodelay(kcp, 1, 10, 2, 1);
  279. */
  280. inline kcp::ikcpcb* get_kcp() noexcept
  281. {
  282. return (this->kcp_stream_ ? this->kcp_stream_->kcp_ : nullptr);
  283. }
  284. /**
  285. * @brief get the kcp pointer, just used for kcp mode
  286. * default mode : ikcp_nodelay(kcp, 0, 10, 0, 0);
  287. * generic mode : ikcp_nodelay(kcp, 0, 10, 0, 1);
  288. * fast mode : ikcp_nodelay(kcp, 1, 10, 2, 1);
  289. */
  290. inline const kcp::ikcpcb* get_kcp() const noexcept
  291. {
  292. return (this->kcp_stream_ ? this->kcp_stream_->kcp_ : nullptr);
  293. }
  294. /**
  295. * @brief get the kcp pointer, just used for kcp mode. same as get_kcp
  296. * default mode : ikcp_nodelay(kcp, 0, 10, 0, 0);
  297. * generic mode : ikcp_nodelay(kcp, 0, 10, 0, 1);
  298. * fast mode : ikcp_nodelay(kcp, 1, 10, 2, 1);
  299. */
  300. inline kcp::ikcpcb* kcp() noexcept
  301. {
  302. return this->get_kcp();
  303. }
  304. /**
  305. * @brief get the kcp pointer, just used for kcp mode. same as get_kcp
  306. * default mode : ikcp_nodelay(kcp, 0, 10, 0, 0);
  307. * generic mode : ikcp_nodelay(kcp, 0, 10, 0, 1);
  308. * fast mode : ikcp_nodelay(kcp, 1, 10, 2, 1);
  309. */
  310. inline const kcp::ikcpcb* kcp() const noexcept
  311. {
  312. return this->get_kcp();
  313. }
  314. protected:
  315. template<typename C>
  316. inline void _do_init(std::shared_ptr<derived_t>& this_ptr, std::shared_ptr<ecs_t<C>>& ecs)
  317. {
  318. detail::ignore_unused(this_ptr, ecs);
  319. // reset the variable to default status
  320. this->derived().reset_connect_time();
  321. this->derived().update_alive_time();
  322. }
  323. template<typename C, typename DeferEvent>
  324. inline void _handle_connect(
  325. const error_code& ec, std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs,
  326. DeferEvent chain)
  327. {
  328. detail::ignore_unused(ec);
  329. ASIO2_ASSERT(!ec);
  330. ASIO2_ASSERT(this->derived().sessions_.io_->running_in_this_thread());
  331. if constexpr (std::is_same_v<typename ecs_t<C>::condition_lowest_type, use_kcp_t>)
  332. {
  333. std::string& data = *(this->first_data_);
  334. // step 3 : server recvd syn from client (the first data is syn)
  335. // Check whether the first data packet is SYN handshake
  336. if (!kcp::is_kcphdr_syn(data))
  337. {
  338. set_last_error(asio::error::address_family_not_supported);
  339. this->derived()._fire_handshake(this_ptr);
  340. this->derived()._do_disconnect(asio::error::address_family_not_supported,
  341. std::move(this_ptr), std::move(chain));
  342. return;
  343. }
  344. this->kcp_stream_ = std::make_unique<kcp_stream_cp<derived_t, args_t>>(
  345. this->derived(), this->io_->context());
  346. this->kcp_stream_->_post_handshake(std::move(this_ptr), std::move(ecs), std::move(chain));
  347. }
  348. else
  349. {
  350. this->kcp_stream_.reset();
  351. this->derived()._done_connect(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  352. }
  353. }
  354. template<typename C, typename DeferEvent>
  355. inline void _do_start(
  356. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
  357. {
  358. derived_t& derive = this->derived();
  359. if constexpr (std::is_same_v<typename ecs_t<C>::condition_lowest_type, use_kcp_t>)
  360. {
  361. ASIO2_ASSERT(this->kcp_stream_);
  362. if (this->kcp_stream_)
  363. this->kcp_stream_->send_fin_ = true;
  364. }
  365. else
  366. {
  367. ASIO2_ASSERT(!this->kcp_stream_);
  368. }
  369. derive.post_event(
  370. [&derive, this_ptr = std::move(this_ptr), ecs = std::move(ecs), e = chain.move_event()]
  371. (event_queue_guard<derived_t> g) mutable
  372. {
  373. defer_event chain(std::move(e), std::move(g));
  374. if (!derive.is_started())
  375. {
  376. derive._do_disconnect(asio::error::operation_aborted, std::move(this_ptr), std::move(chain));
  377. return;
  378. }
  379. derive._join_session(std::move(this_ptr), std::move(ecs), std::move(chain));
  380. });
  381. }
  382. template<typename DeferEvent>
  383. inline void _handle_disconnect(const error_code& ec, std::shared_ptr<derived_t> this_ptr, DeferEvent chain)
  384. {
  385. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  386. ASIO2_ASSERT(this->state_ == state_t::stopped);
  387. ASIO2_ASSERT(this->reading_ == false);
  388. set_last_error(ec);
  389. this->derived()._rdc_stop();
  390. super::_handle_disconnect(ec, std::move(this_ptr), std::move(chain));
  391. }
  392. template<typename DeferEvent>
  393. inline void _do_stop(const error_code& ec, std::shared_ptr<derived_t> this_ptr, DeferEvent chain)
  394. {
  395. this->derived()._post_stop(ec, std::move(this_ptr), std::move(chain));
  396. }
  397. template<typename DeferEvent>
  398. inline void _post_stop(const error_code& ec, std::shared_ptr<derived_t> this_ptr, DeferEvent chain)
  399. {
  400. // if we use asio::dispatch in server's _exec_stop, then we need:
  401. // put _kcp_stop in front of super::stop, othwise the super::stop will execute
  402. // "counter_ptr_.reset()", it will cause the udp server's _exec_stop is called,
  403. // and the _handle_stop is called, and the socket will be closed, then the
  404. // _kcp_stop send kcphdr will failed.
  405. // but if we use asio::post in server's _exec_stop, there is no such problem.
  406. if (this->kcp_stream_)
  407. this->kcp_stream_->_kcp_stop();
  408. // call the base class stop function
  409. super::stop();
  410. // call CRTP polymorphic stop
  411. this->derived()._handle_stop(ec, std::move(this_ptr), std::move(chain));
  412. }
  413. template<typename DeferEvent>
  414. inline void _handle_stop(const error_code& ec, std::shared_ptr<derived_t> this_ptr, DeferEvent chain)
  415. {
  416. detail::ignore_unused(ec, this_ptr, chain);
  417. ASIO2_ASSERT(this->state_ == state_t::stopped);
  418. }
  419. template<typename C, typename DeferEvent>
  420. inline void _join_session(
  421. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
  422. {
  423. this->sessions_.emplace(this_ptr,
  424. [this, this_ptr, ecs = std::move(ecs), chain = std::move(chain)]
  425. (bool inserted) mutable
  426. {
  427. if (inserted)
  428. this->derived()._start_recv(std::move(this_ptr), std::move(ecs), std::move(chain));
  429. else
  430. this->derived()._do_disconnect(asio::error::address_in_use, std::move(this_ptr), std::move(chain));
  431. });
  432. }
  433. template<typename C, typename DeferEvent>
  434. inline void _start_recv(
  435. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
  436. {
  437. // to avlid the user call stop in another thread,then it may be socket.async_read_some
  438. // and socket.close be called at the same time
  439. asio::dispatch(this->io_->context(), make_allocator(this->wallocator_,
  440. [this, this_ptr = std::move(this_ptr), ecs = std::move(ecs), chain = std::move(chain)]
  441. () mutable
  442. {
  443. using condition_lowest_type = typename ecs_t<C>::condition_lowest_type;
  444. detail::ignore_unused(chain);
  445. // start the timer of check silence timeout
  446. this->derived()._post_silence_timer(this->silence_timeout_, this_ptr);
  447. if constexpr (std::is_same_v<condition_lowest_type, asio2::detail::use_kcp_t>)
  448. {
  449. detail::ignore_unused(this_ptr, ecs);
  450. }
  451. else
  452. {
  453. std::string& data = *(this->first_data_);
  454. this->derived()._fire_recv(this_ptr, ecs, data);
  455. }
  456. this->first_data_.reset();
  457. ASIO2_ASSERT(!this->first_data_);
  458. ASIO2_ASSERT(this->reading_ == false);
  459. }));
  460. }
  461. protected:
  462. template<class Data, class Callback>
  463. inline bool _do_send(Data& data, Callback&& callback)
  464. {
  465. if (!this->kcp_stream_)
  466. return this->derived()._udp_send_to(
  467. this->remote_endpoint_, data, std::forward<Callback>(callback));
  468. return this->kcp_stream_->_kcp_send(data, std::forward<Callback>(callback));
  469. }
  470. template<class Data>
  471. inline send_data_t _rdc_convert_to_send_data(Data& data) noexcept
  472. {
  473. auto buffer = asio::buffer(data);
  474. return send_data_t{ reinterpret_cast<
  475. std::string_view::const_pointer>(buffer.data()),buffer.size() };
  476. }
  477. template<class Invoker>
  478. inline void _rdc_invoke_with_none(const error_code& ec, Invoker& invoker)
  479. {
  480. if (invoker)
  481. invoker(ec, send_data_t{}, recv_data_t{});
  482. }
  483. template<class Invoker>
  484. inline void _rdc_invoke_with_recv(const error_code& ec, Invoker& invoker, recv_data_t data)
  485. {
  486. if (invoker)
  487. invoker(ec, send_data_t{}, data);
  488. }
  489. template<class Invoker>
  490. inline void _rdc_invoke_with_send(const error_code& ec, Invoker& invoker, send_data_t data)
  491. {
  492. if (invoker)
  493. invoker(ec, data, recv_data_t{});
  494. }
  495. protected:
  496. // this function will can't be called forever
  497. template<typename C>
  498. inline void _post_recv(std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  499. {
  500. ASIO2_ASSERT(false);
  501. this->derived()._udp_post_recv(std::move(this_ptr), std::move(ecs));
  502. }
  503. template<typename C>
  504. inline void _handle_recv(
  505. const error_code& ec, std::size_t bytes_recvd,
  506. std::shared_ptr<derived_t>& this_ptr, std::shared_ptr<ecs_t<C>>& ecs)
  507. {
  508. this->derived()._udp_handle_recv(ec, bytes_recvd, this_ptr, ecs);
  509. }
  510. template<typename C>
  511. inline void _fire_recv(
  512. std::shared_ptr<derived_t>& this_ptr, std::shared_ptr<ecs_t<C>>& ecs, std::string_view data)
  513. {
  514. data = detail::call_data_filter_before_recv(this->derived(), data);
  515. this->listener_.notify(event_type::recv, this_ptr, data);
  516. this->derived()._rdc_handle_recv(this_ptr, ecs, data);
  517. }
  518. inline void _fire_handshake(std::shared_ptr<derived_t>& this_ptr)
  519. {
  520. // the _fire_handshake must be executed in the thread 0.
  521. ASIO2_ASSERT(this->sessions_.io_->running_in_this_thread());
  522. this->listener_.notify(event_type::handshake, this_ptr);
  523. }
  524. template<typename C>
  525. inline void _fire_connect(std::shared_ptr<derived_t>& this_ptr, std::shared_ptr<ecs_t<C>>& ecs)
  526. {
  527. // the _fire_connect must be executed in the thread 0.
  528. ASIO2_ASSERT(this->sessions_.io_->running_in_this_thread());
  529. #if defined(_DEBUG) || defined(DEBUG)
  530. ASIO2_ASSERT(this->is_disconnect_called_ == false);
  531. #endif
  532. this->derived()._rdc_start(this_ptr, ecs);
  533. this->listener_.notify(event_type::connect, this_ptr);
  534. }
  535. inline void _fire_disconnect(std::shared_ptr<derived_t>& this_ptr)
  536. {
  537. // the _fire_disconnect must be executed in the thread 0.
  538. ASIO2_ASSERT(this->sessions_.io_->running_in_this_thread());
  539. #if defined(_DEBUG) || defined(DEBUG)
  540. this->is_disconnect_called_ = true;
  541. #endif
  542. this->listener_.notify(event_type::disconnect, this_ptr);
  543. }
  544. protected:
  545. /**
  546. * @brief get the recv/read allocator object reference
  547. */
  548. inline auto & rallocator() noexcept { return this->wallocator_; }
  549. /**
  550. * @brief get the send/write allocator object reference
  551. */
  552. inline auto & wallocator() noexcept { return this->wallocator_; }
  553. protected:
  554. /// The memory to use for handler-based custom memory allocation. used fo send/write.
  555. handler_memory<std::false_type, assizer<args_t>> wallocator_;
  556. std::unique_ptr<kcp_stream_cp<derived_t, args_t>> kcp_stream_;
  557. std::uint32_t kcp_conv_ = 0;
  558. /// first recvd data packet
  559. std::unique_ptr<std::string> first_data_;
  560. #if defined(_DEBUG) || defined(DEBUG)
  561. bool is_disconnect_called_ = false;
  562. #endif
  563. };
  564. }
  565. namespace asio2
  566. {
  567. using udp_session_args = detail::template_args_udp_session;
  568. template<class derived_t, class args_t>
  569. using udp_session_impl_t = detail::udp_session_impl_t<derived_t, args_t>;
  570. template<class derived_t>
  571. class udp_session_t : public detail::udp_session_impl_t<derived_t, detail::template_args_udp_session>
  572. {
  573. public:
  574. using detail::udp_session_impl_t<derived_t, detail::template_args_udp_session>::udp_session_impl_t;
  575. };
  576. class udp_session : public udp_session_t<udp_session>
  577. {
  578. public:
  579. using udp_session_t<udp_session>::udp_session_t;
  580. };
  581. }
  582. #include <asio2/base/detail/pop_options.hpp>
  583. #endif // !__ASIO2_UDP_SESSION_HPP__