udp_server.hpp 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. */
  10. #ifndef __ASIO2_UDP_SERVER_HPP__
  11. #define __ASIO2_UDP_SERVER_HPP__
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. #pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <asio2/base/detail/push_options.hpp>
  16. #include <asio2/base/server.hpp>
  17. #include <asio2/udp/udp_session.hpp>
  18. namespace asio2::detail
  19. {
  20. ASIO2_CLASS_FORWARD_DECLARE_BASE;
  21. ASIO2_CLASS_FORWARD_DECLARE_UDP_BASE;
  22. ASIO2_CLASS_FORWARD_DECLARE_UDP_SERVER;
  23. template<class derived_t, class session_t>
  24. class udp_server_impl_t : public server_impl_t<derived_t, session_t>, public udp_tag
  25. {
  26. ASIO2_CLASS_FRIEND_DECLARE_BASE;
  27. ASIO2_CLASS_FRIEND_DECLARE_UDP_BASE;
  28. ASIO2_CLASS_FRIEND_DECLARE_UDP_SERVER;
  29. public:
  30. using super = server_impl_t <derived_t, session_t>;
  31. using self = udp_server_impl_t<derived_t, session_t>;
  32. using session_type = session_t;
  33. public:
  34. /**
  35. * @brief constructor
  36. */
  37. explicit udp_server_impl_t(
  38. std::size_t init_buf_size = udp_frame_size,
  39. std::size_t max_buf_size = max_buffer_size,
  40. std::size_t concurrency = 1
  41. )
  42. : super(concurrency)
  43. , acceptor_(std::make_shared<asio::ip::udp::socket>(this->io_->context()))
  44. , remote_endpoint_()
  45. , buffer_(init_buf_size, max_buf_size)
  46. {
  47. }
  48. template<class Scheduler, std::enable_if_t<!std::is_integral_v<detail::remove_cvref_t<Scheduler>>, int> = 0>
  49. explicit udp_server_impl_t(
  50. std::size_t init_buf_size,
  51. std::size_t max_buf_size,
  52. Scheduler&& scheduler
  53. )
  54. : super(std::forward<Scheduler>(scheduler))
  55. , acceptor_(std::make_shared<asio::ip::udp::socket>(this->io_->context()))
  56. , remote_endpoint_()
  57. , buffer_(init_buf_size, max_buf_size)
  58. {
  59. }
  60. template<class Scheduler, std::enable_if_t<!std::is_integral_v<detail::remove_cvref_t<Scheduler>>, int> = 0>
  61. explicit udp_server_impl_t(Scheduler&& scheduler)
  62. : udp_server_impl_t(udp_frame_size, max_buffer_size, std::forward<Scheduler>(scheduler))
  63. {
  64. }
  65. /**
  66. * @brief destructor
  67. */
  68. ~udp_server_impl_t()
  69. {
  70. this->stop();
  71. }
  72. /**
  73. * @brief start the server
  74. * @param host - A string identifying a location. May be a descriptive name or
  75. * a numeric address string.
  76. * @param service - A string identifying the requested service. This may be a
  77. * descriptive name or a numeric string corresponding to a port number.
  78. */
  79. template<typename String, typename StrOrInt, typename... Args>
  80. inline bool start(String&& host, StrOrInt&& service, Args&&... args)
  81. {
  82. return this->derived()._do_start(
  83. std::forward<String>(host), std::forward<StrOrInt>(service),
  84. ecs_helper::make_ecs('0', std::forward<Args>(args)...));
  85. }
  86. /**
  87. * @brief stop the server
  88. * You can call this function on the communication thread and anywhere to stop the server.
  89. */
  90. inline void stop()
  91. {
  92. if (this->is_iopool_stopped())
  93. return;
  94. derived_t& derive = this->derived();
  95. derive.io_->unregobj(&derive);
  96. derive.post([&derive]() mutable
  97. {
  98. derive._do_stop(asio::error::operation_aborted, derive.selfptr());
  99. });
  100. this->stop_iopool();
  101. }
  102. /**
  103. * @brief destroy the content of all member variables, this is used for solve the memory leaks.
  104. * After this function is called, this class object cannot be used again.
  105. */
  106. inline void destroy()
  107. {
  108. derived_t& derive = this->derived();
  109. derive.acceptor_.reset();
  110. super::destroy();
  111. }
  112. /**
  113. * @brief check whether the server is started
  114. */
  115. inline bool is_started() const { return (super::is_started() && this->acceptor_->is_open()); }
  116. /**
  117. * @brief check whether the server is stopped
  118. */
  119. inline bool is_stopped() const
  120. {
  121. return (this->state_ == state_t::stopped && !this->acceptor_->is_open());
  122. }
  123. public:
  124. /**
  125. * @brief bind recv listener
  126. * @param fun - a user defined callback function.
  127. * @param obj - a pointer or reference to a class object, this parameter can be none.
  128. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  129. * the class object's pointer or reference.
  130. * Function signature : void(std::shared_ptr<asio2::udp_session>& session_ptr, std::string_view data)
  131. */
  132. template<class F, class ...C>
  133. inline derived_t & bind_recv(F&& fun, C&&... obj)
  134. {
  135. this->listener_.bind(event_type::recv,
  136. observer_t<std::shared_ptr<session_t>&, std::string_view>(
  137. std::forward<F>(fun), std::forward<C>(obj)...));
  138. return (this->derived());
  139. }
  140. /**
  141. * @brief bind connect listener
  142. * @param fun - a user defined callback function.
  143. * @param obj - a pointer or reference to a class object, this parameter can be none.
  144. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  145. * the class object's pointer or reference.
  146. * This notification is invoked after the connection is fully established,
  147. * and only after the connect/handshake/upgrade are completed.
  148. * Function signature : void(std::shared_ptr<asio2::udp_session>& session_ptr)
  149. */
  150. template<class F, class ...C>
  151. inline derived_t & bind_connect(F&& fun, C&&... obj)
  152. {
  153. this->listener_.bind(event_type::connect,
  154. observer_t<std::shared_ptr<session_t>&>(
  155. std::forward<F>(fun), std::forward<C>(obj)...));
  156. return (this->derived());
  157. }
  158. /**
  159. * @brief bind disconnect listener
  160. * @param fun - a user defined callback function.
  161. * @param obj - a pointer or reference to a class object, this parameter can be none.
  162. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  163. * the class object's pointer or reference.
  164. * This notification is invoked before the connection is disconnected, you can call
  165. * get_last_error/last_error_msg, etc, to get the disconnected error information
  166. * Function signature : void(std::shared_ptr<asio2::udp_session>& session_ptr)
  167. */
  168. template<class F, class ...C>
  169. inline derived_t & bind_disconnect(F&& fun, C&&... obj)
  170. {
  171. this->listener_.bind(event_type::disconnect,
  172. observer_t<std::shared_ptr<session_t>&>(
  173. std::forward<F>(fun), std::forward<C>(obj)...));
  174. return (this->derived());
  175. }
  176. /**
  177. * @brief bind init listener,we should set socket options at here
  178. * @param fun - a user defined callback function.
  179. * @param obj - a pointer or reference to a class object, this parameter can be none.
  180. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  181. * the class object's pointer or reference.
  182. * This notification is called after the socket is opened.
  183. * You can set the socket option in this notification.
  184. * Function signature : void()
  185. */
  186. template<class F, class ...C>
  187. inline derived_t & bind_init(F&& fun, C&&... obj)
  188. {
  189. this->listener_.bind(event_type::init, observer_t<>(
  190. std::forward<F>(fun), std::forward<C>(obj)...));
  191. return (this->derived());
  192. }
  193. /**
  194. * @brief bind start listener
  195. * @param fun - a user defined callback function.
  196. * @param obj - a pointer or reference to a class object, this parameter can be none.
  197. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  198. * the class object's pointer or reference.
  199. * This notification is called after the server starts up, whether successful or unsuccessful
  200. * Function signature : void()
  201. */
  202. template<class F, class ...C>
  203. inline derived_t & bind_start(F&& fun, C&&... obj)
  204. {
  205. this->listener_.bind(event_type::start, observer_t<>(
  206. std::forward<F>(fun), std::forward<C>(obj)...));
  207. return (this->derived());
  208. }
  209. /**
  210. * @brief bind stop listener
  211. * @param fun - a user defined callback function.
  212. * @param obj - a pointer or reference to a class object, this parameter can be none.
  213. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  214. * the class object's pointer or reference.
  215. * This notification is called before the server is ready to stop
  216. * Function signature : void()
  217. */
  218. template<class F, class ...C>
  219. inline derived_t & bind_stop(F&& fun, C&&... obj)
  220. {
  221. this->listener_.bind(event_type::stop, observer_t<>(
  222. std::forward<F>(fun), std::forward<C>(obj)...));
  223. return (this->derived());
  224. }
  225. /**
  226. * @brief bind kcp handshake listener, just used fo kcp mode
  227. * @param fun - a user defined callback function.
  228. * @param obj - a pointer or reference to a class object, this parameter can be none.
  229. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  230. * the class object's pointer or reference.
  231. * Function signature : void(std::shared_ptr<asio2::udp_session>& session_ptr)
  232. */
  233. template<class F, class ...C>
  234. inline derived_t & bind_handshake(F&& fun, C&&... obj)
  235. {
  236. this->listener_.bind(event_type::handshake,
  237. observer_t<std::shared_ptr<session_t>&>(
  238. std::forward<F>(fun), std::forward<C>(obj)...));
  239. return (this->derived());
  240. }
  241. public:
  242. /**
  243. * @brief get the acceptor reference
  244. */
  245. inline asio::ip::udp::socket & acceptor() noexcept { return *(this->acceptor_); }
  246. /**
  247. * @brief get the acceptor reference
  248. */
  249. inline asio::ip::udp::socket const& acceptor() const noexcept { return *(this->acceptor_); }
  250. protected:
  251. template<typename String, typename StrOrInt, typename C>
  252. inline bool _do_start(String&& host, StrOrInt&& port, std::shared_ptr<ecs_t<C>> ecs)
  253. {
  254. derived_t& derive = this->derived();
  255. // if log is enabled, init the log first, otherwise when "Too many open files" error occurs,
  256. // the log file will be created failed too.
  257. #if defined(ASIO2_ENABLE_LOG)
  258. asio2::detail::get_logger();
  259. #endif
  260. this->start_iopool();
  261. if (!this->is_iopool_started())
  262. {
  263. set_last_error(asio::error::operation_aborted);
  264. return false;
  265. }
  266. asio::dispatch(derive.io_->context(), [&derive, this_ptr = derive.selfptr()]() mutable
  267. {
  268. detail::ignore_unused(this_ptr);
  269. // init the running thread id
  270. derive.io_->init_thread_id();
  271. });
  272. // use promise to get the result of async accept
  273. std::promise<error_code> promise;
  274. std::future<error_code> future = promise.get_future();
  275. // use derfer to ensure the promise's value must be seted.
  276. detail::defer_event pg
  277. {
  278. [promise = std::move(promise)]() mutable
  279. {
  280. promise.set_value(get_last_error());
  281. }
  282. };
  283. derive.post(
  284. [this, this_ptr = derive.selfptr(), ecs = std::move(ecs), pg = std::move(pg),
  285. host = std::forward<String>(host), port = std::forward<StrOrInt>(port)]
  286. () mutable
  287. {
  288. derived_t& derive = this->derived();
  289. state_t expected = state_t::stopped;
  290. if (!this->state_.compare_exchange_strong(expected, state_t::starting))
  291. {
  292. // if the state is not stopped, set the last error to already_started
  293. set_last_error(asio::error::already_started);
  294. return;
  295. }
  296. // must read/write ecs in the io_context thread.
  297. derive.ecs_ = ecs;
  298. derive.io_->regobj(&derive);
  299. #if defined(_DEBUG) || defined(DEBUG)
  300. this->sessions_.is_all_session_stop_called_ = false;
  301. this->is_stop_called_ = false;
  302. #endif
  303. // convert to string maybe throw some exception.
  304. std::string h = detail::to_string(std::move(host));
  305. std::string p = detail::to_string(std::move(port));
  306. expected = state_t::starting;
  307. if (!this->state_.compare_exchange_strong(expected, state_t::starting))
  308. {
  309. ASIO2_ASSERT(false);
  310. derive._handle_start(asio::error::operation_aborted, std::move(this_ptr), std::move(ecs));
  311. return;
  312. }
  313. super::start();
  314. // should hold the server shared ptr too, if server is constructed with iopool, and
  315. // server is a tmp local variable, then the server maybe destroyed before sessions.
  316. // so we need hold this ptr to ensure server must be destroyed after sessions.
  317. this->counter_ptr_ = std::shared_ptr<void>((void*)1, [&derive, this_ptr](void*) mutable
  318. {
  319. derive._exec_stop(asio::error::operation_aborted, std::move(this_ptr));
  320. });
  321. error_code ec, ec_ignore;
  322. // parse address and port
  323. asio::ip::udp::resolver resolver(this->io_->context());
  324. auto results = resolver.resolve(h, p,
  325. asio::ip::resolver_base::flags::passive |
  326. asio::ip::resolver_base::flags::address_configured, ec);
  327. if (ec)
  328. {
  329. derive._handle_start(ec, std::move(this_ptr), std::move(ecs));
  330. return;
  331. }
  332. if (results.empty())
  333. {
  334. derive._handle_start(asio::error::host_not_found, std::move(this_ptr), std::move(ecs));
  335. return;
  336. }
  337. asio::ip::udp::endpoint endpoint = *results.begin();
  338. this->acceptor_->cancel(ec_ignore);
  339. this->acceptor_->close(ec_ignore);
  340. this->acceptor_->open(endpoint.protocol(), ec);
  341. if (ec)
  342. {
  343. derive._handle_start(ec, std::move(this_ptr), std::move(ecs));
  344. return;
  345. }
  346. // when you close socket in linux system,and start socket
  347. // immediate,you will get like this "the address is in use",
  348. // and bind is failed,but i'm suer i close the socket correct
  349. // already before,why does this happen? the reasion is the
  350. // socket option "TIME_WAIT",although you close the socket,
  351. // but the system not release the socket,util 2~4 seconds later,
  352. // so we can use the SO_REUSEADDR option to avoid this problem,
  353. // like below
  354. // set port reuse
  355. this->acceptor_->set_option(asio::ip::udp::socket::reuse_address(true), ec_ignore);
  356. //// Join the multicast group. you can set this option in the on_init(_fire_init) function.
  357. //this->acceptor_->set_option(
  358. // // for ipv6, the host must be a ipv6 address like 0::0
  359. // asio::ip::multicast::join_group(asio::ip::make_address("ff31::8000:1234")));
  360. // // for ipv4, the host must be a ipv4 address like 0.0.0.0
  361. // //asio::ip::multicast::join_group(asio::ip::make_address("239.255.0.1")));
  362. clear_last_error();
  363. derive._fire_init();
  364. this->acceptor_->bind(endpoint, ec);
  365. if (ec)
  366. {
  367. derive._handle_start(ec, std::move(this_ptr), std::move(ecs));
  368. return;
  369. }
  370. derive._handle_start(ec, std::move(this_ptr), std::move(ecs));
  371. });
  372. if (!derive.io_->running_in_this_thread())
  373. {
  374. set_last_error(future.get());
  375. return static_cast<bool>(!get_last_error());
  376. }
  377. else
  378. {
  379. set_last_error(asio::error::in_progress);
  380. }
  381. // if the state is stopped , the return value is "is_started()".
  382. // if the state is stopping, the return value is false, the last error is already_started
  383. // if the state is starting, the return value is false, the last error is already_started
  384. // if the state is started , the return value is true , the last error is already_started
  385. return derive.is_started();
  386. }
  387. template<typename C>
  388. inline void _handle_start(error_code ec, std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  389. {
  390. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  391. // Whether the startup succeeds or fails, always call fire_start notification
  392. state_t expected = state_t::starting;
  393. if (!ec)
  394. if (!this->state_.compare_exchange_strong(expected, state_t::started))
  395. ec = asio::error::operation_aborted;
  396. set_last_error(ec);
  397. this->derived()._fire_start();
  398. expected = state_t::started;
  399. if (!ec)
  400. if (!this->state_.compare_exchange_strong(expected, state_t::started))
  401. ec = asio::error::operation_aborted;
  402. if (ec)
  403. {
  404. this->derived()._do_stop(ec, std::move(this_ptr));
  405. return;
  406. }
  407. this->buffer_.consume(this->buffer_.size());
  408. this->derived()._post_recv(std::move(this_ptr), std::move(ecs));
  409. }
  410. inline void _do_stop(const error_code& ec, std::shared_ptr<derived_t> this_ptr)
  411. {
  412. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  413. state_t expected = state_t::starting;
  414. if (this->state_.compare_exchange_strong(expected, state_t::stopping))
  415. return this->derived()._post_stop(ec, std::move(this_ptr), expected);
  416. expected = state_t::started;
  417. if (this->state_.compare_exchange_strong(expected, state_t::stopping))
  418. return this->derived()._post_stop(ec, std::move(this_ptr), expected);
  419. }
  420. inline void _post_stop(const error_code& ec, std::shared_ptr<derived_t> this_ptr, state_t old_state)
  421. {
  422. // asio don't allow operate the same socket in multi thread,
  423. // if you close socket in one thread and another thread is
  424. // calling socket's async_... function,it will crash.so we
  425. // must care for operate the socket.when need close the
  426. // socket ,we use the io_context to post a event,make sure the
  427. // socket's close operation is in the same thread.
  428. // psot a recv signal to ensure that all recv events has finished already.
  429. this->derived().post(
  430. [this, ec, old_state, this_ptr = std::move(this_ptr)]() mutable
  431. {
  432. detail::ignore_unused(this, ec, old_state, this_ptr);
  433. // When the code runs here,no new session can be emplace or erase to session_mgr.
  434. // stop all the sessions, the session::stop must be no blocking,
  435. // otherwise it may be cause loop lock.
  436. set_last_error(ec);
  437. ASIO2_ASSERT(this->state_ == state_t::stopping);
  438. // stop all the sessions, the session::stop must be no blocking,
  439. // otherwise it may be cause loop lock.
  440. this->sessions_.quick_for_each([](std::shared_ptr<session_t> & session_ptr) mutable
  441. {
  442. session_ptr->stop();
  443. });
  444. #if defined(_DEBUG) || defined(DEBUG)
  445. this->sessions_.is_all_session_stop_called_ = true;
  446. #endif
  447. if (this->counter_ptr_)
  448. {
  449. this->counter_ptr_.reset();
  450. }
  451. else
  452. {
  453. this->derived()._exec_stop(ec, std::move(this_ptr));
  454. }
  455. });
  456. }
  457. inline void _exec_stop(const error_code& ec, std::shared_ptr<derived_t> this_ptr)
  458. {
  459. // use asio::post to ensure this server's _handle_stop is called must be after
  460. // all sessions _handle_stop has been called already.
  461. // is use asio::dispatch, session's _handle_stop maybe called first.
  462. asio::post(this->derived().io_->context(), make_allocator(this->derived().wallocator(),
  463. [this, ec, this_ptr = std::move(this_ptr)]() mutable
  464. {
  465. state_t expected = state_t::stopping;
  466. if (this->state_.compare_exchange_strong(expected, state_t::stopped))
  467. {
  468. this->derived()._handle_stop(ec, std::move(this_ptr));
  469. }
  470. else
  471. {
  472. ASIO2_ASSERT(false);
  473. }
  474. }));
  475. }
  476. inline void _handle_stop(const error_code& ec, std::shared_ptr<derived_t> this_ptr)
  477. {
  478. set_last_error(ec);
  479. this->derived()._fire_stop();
  480. // call the base class stop function
  481. super::stop();
  482. error_code ec_ignore{};
  483. // Call shutdown() to indicate that you will not write any more data to the socket.
  484. this->acceptor_->shutdown(asio::socket_base::shutdown_both, ec_ignore);
  485. this->acceptor_->cancel(ec_ignore);
  486. // Call close,otherwise the _handle_recv will never return
  487. this->acceptor_->close(ec_ignore);
  488. ASIO2_ASSERT(this->state_ == state_t::stopped);
  489. }
  490. template<typename C>
  491. inline void _post_recv(std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  492. {
  493. if (!this->derived().is_started())
  494. {
  495. if (this->derived().state_ == state_t::started)
  496. {
  497. this->derived()._do_stop(asio2::get_last_error(), std::move(this_ptr));
  498. }
  499. return;
  500. }
  501. #if defined(_DEBUG) || defined(DEBUG)
  502. ASIO2_ASSERT(this->derived().post_recv_counter_.load() == 0);
  503. this->derived().post_recv_counter_++;
  504. #endif
  505. this->acceptor_->async_receive_from(
  506. this->buffer_.prepare(this->buffer_.pre_size()),
  507. this->remote_endpoint_,
  508. make_allocator(this->rallocator_, [this, this_ptr = std::move(this_ptr), ecs = std::move(ecs)]
  509. (const error_code& ec, std::size_t bytes_recvd) mutable
  510. {
  511. #if defined(_DEBUG) || defined(DEBUG)
  512. this->derived().post_recv_counter_--;
  513. #endif
  514. this->derived()._handle_recv(ec, bytes_recvd, std::move(this_ptr), std::move(ecs));
  515. }));
  516. }
  517. template<typename C>
  518. inline void _handle_recv(
  519. const error_code& ec, std::size_t bytes_recvd,
  520. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  521. {
  522. set_last_error(ec);
  523. if (!this->derived().is_started())
  524. {
  525. if (this->derived().state_ == state_t::started)
  526. {
  527. this->derived()._do_stop(ec, std::move(this_ptr));
  528. }
  529. return;
  530. }
  531. if (ec == asio::error::operation_aborted)
  532. {
  533. this->derived()._do_stop(ec, std::move(this_ptr));
  534. return;
  535. }
  536. this->buffer_.commit(bytes_recvd);
  537. if (!ec)
  538. {
  539. std::string_view data = std::string_view(static_cast<std::string_view::const_pointer>
  540. (this->buffer_.data().data()), bytes_recvd);
  541. // first we find whether the session is in the session_mgr pool already,if not ,
  542. // we new a session and put it into the session_mgr pool
  543. std::shared_ptr<session_t> session_ptr = this->sessions_.find(this->remote_endpoint_);
  544. if (!session_ptr)
  545. {
  546. this->derived()._handle_accept(ec, data, session_ptr, ecs);
  547. }
  548. else
  549. {
  550. session_ptr->_handle_recv(ec, bytes_recvd, session_ptr, ecs);
  551. }
  552. }
  553. else
  554. {
  555. #ifdef ASIO2_STOP_SESSION_WHEN_RECVD_0BYTES
  556. // has error, and bytes_recvd == 0
  557. if (bytes_recvd == 0)
  558. {
  559. std::shared_ptr<session_t> session_ptr = this->sessions_.find(this->remote_endpoint_);
  560. if (session_ptr)
  561. {
  562. ASIO2_LOG_INFOR("udp session stoped by recvd 0 bytes: {}",
  563. session_ptr->get_remote_address());
  564. session_ptr->stop();
  565. }
  566. }
  567. #endif
  568. }
  569. this->buffer_.consume(this->buffer_.size());
  570. if (bytes_recvd == this->buffer_.pre_size())
  571. {
  572. this->buffer_.pre_size((std::min)(this->buffer_.pre_size() * 2, this->buffer_.max_size()));
  573. }
  574. this->derived()._post_recv(std::move(this_ptr), std::move(ecs));
  575. }
  576. template<typename... Args>
  577. inline std::shared_ptr<session_t> _make_session(Args&&... args)
  578. {
  579. return std::make_shared<session_t>(
  580. std::forward<Args>(args)...,
  581. this->sessions_,
  582. this->listener_,
  583. this->io_,
  584. this->buffer_.pre_size(),
  585. this->buffer_.max_size(),
  586. this->buffer_,
  587. this->acceptor_,
  588. this->remote_endpoint_);
  589. }
  590. template<typename C>
  591. inline void _handle_accept(
  592. const error_code& ec, std::string_view first_data,
  593. std::shared_ptr<session_t> session_ptr, std::shared_ptr<ecs_t<C>>& ecs)
  594. {
  595. session_ptr = this->derived()._make_session();
  596. session_ptr->counter_ptr_ = this->counter_ptr_;
  597. session_ptr->first_data_ = std::make_unique<std::string>(first_data);
  598. session_ptr->kcp_conv_ = this->derived()._make_kcp_conv(first_data, ecs);
  599. session_ptr->start(detail::to_shared_ptr(ecs->clone()));
  600. }
  601. template<typename C>
  602. inline std::uint32_t _do_make_kcp_conv(std::string_view first_data, std::shared_ptr<ecs_t<C>>& ecs)
  603. {
  604. detail::ignore_unused(ecs);
  605. std::uint32_t conv = 0;
  606. if (kcp::is_kcphdr_syn(first_data))
  607. {
  608. kcp::kcphdr syn = kcp::to_kcphdr(first_data);
  609. // the syn.th_ack is the kcp conv
  610. if (syn.th_ack == 0)
  611. {
  612. conv = this->kcp_convs_.fetch_add(1);
  613. if (conv == 0)
  614. conv = this->kcp_convs_.fetch_add(1);
  615. }
  616. else
  617. {
  618. conv = syn.th_ack;
  619. }
  620. }
  621. return conv;
  622. }
  623. template<typename C>
  624. inline std::uint32_t _make_kcp_conv(std::string_view first_data, std::shared_ptr<ecs_t<C>>& ecs)
  625. {
  626. if constexpr (std::is_same_v<typename ecs_t<C>::condition_lowest_type, use_kcp_t>)
  627. {
  628. return this->_do_make_kcp_conv(first_data, ecs);
  629. }
  630. else
  631. {
  632. detail::ignore_unused(first_data, ecs);
  633. return 0;
  634. }
  635. }
  636. inline void _fire_init()
  637. {
  638. // the _fire_init must be executed in the thread 0.
  639. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  640. ASIO2_ASSERT(!get_last_error());
  641. this->listener_.notify(event_type::init);
  642. }
  643. inline void _fire_start()
  644. {
  645. // the _fire_start must be executed in the thread 0.
  646. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  647. #if defined(_DEBUG) || defined(DEBUG)
  648. ASIO2_ASSERT(this->is_stop_called_ == false);
  649. #endif
  650. this->listener_.notify(event_type::start);
  651. }
  652. inline void _fire_stop()
  653. {
  654. // the _fire_stop must be executed in the thread 0.
  655. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  656. #if defined(_DEBUG) || defined(DEBUG)
  657. this->is_stop_called_ = true;
  658. #endif
  659. this->listener_.notify(event_type::stop);
  660. }
  661. protected:
  662. /// acceptor to accept client connection
  663. std::shared_ptr<asio::ip::udp::socket> acceptor_;
  664. /// endpoint for udp
  665. asio::ip::udp::endpoint remote_endpoint_;
  666. /// buffer
  667. asio2::buffer_wrap<asio2::linear_buffer> buffer_;
  668. std::atomic<std::uint32_t> kcp_convs_ = 1;
  669. #if defined(_DEBUG) || defined(DEBUG)
  670. bool is_stop_called_ = false;
  671. #endif
  672. };
  673. }
  674. namespace asio2
  675. {
  676. template<class derived_t, class session_t>
  677. using udp_server_impl_t = detail::udp_server_impl_t<derived_t, session_t>;
  678. /**
  679. * @throws constructor maybe throw exception "Too many open files" (exception code : 24)
  680. * asio::error::no_descriptors - Too many open files
  681. */
  682. template<class session_t>
  683. class udp_server_t : public detail::udp_server_impl_t<udp_server_t<session_t>, session_t>
  684. {
  685. public:
  686. using detail::udp_server_impl_t<udp_server_t<session_t>, session_t>::udp_server_impl_t;
  687. };
  688. /**
  689. * If this object is created as a shared_ptr like std::shared_ptr<asio2::udp_server> server;
  690. * you must call the server->stop() manual when exit, otherwise maybe cause memory leaks.
  691. * @throws constructor maybe throw exception "Too many open files" (exception code : 24)
  692. * asio::error::no_descriptors - Too many open files
  693. */
  694. using udp_server = udp_server_t<udp_session>;
  695. }
  696. #include <asio2/base/detail/pop_options.hpp>
  697. #endif // !__ASIO2_UDP_SERVER_HPP__