tcp_server.hpp 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. */
  10. #ifndef __ASIO2_TCP_SERVER_HPP__
  11. #define __ASIO2_TCP_SERVER_HPP__
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. #pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <asio2/base/detail/push_options.hpp>
  16. #include <asio2/base/server.hpp>
  17. #include <asio2/tcp/tcp_session.hpp>
  18. namespace asio2::detail
  19. {
  20. ASIO2_CLASS_FORWARD_DECLARE_BASE;
  21. ASIO2_CLASS_FORWARD_DECLARE_TCP_BASE;
  22. ASIO2_CLASS_FORWARD_DECLARE_TCP_SERVER;
  23. template<class derived_t, class session_t>
  24. class tcp_server_impl_t : public server_impl_t<derived_t, session_t>, public tcp_tag
  25. {
  26. ASIO2_CLASS_FRIEND_DECLARE_BASE;
  27. ASIO2_CLASS_FRIEND_DECLARE_TCP_BASE;
  28. ASIO2_CLASS_FRIEND_DECLARE_TCP_SERVER;
  29. public:
  30. using super = server_impl_t <derived_t, session_t>;
  31. using self = tcp_server_impl_t<derived_t, session_t>;
  32. using session_type = session_t;
  33. public:
  34. /**
  35. * @brief constructor
  36. */
  37. explicit tcp_server_impl_t(
  38. std::size_t init_buf_size = tcp_frame_size,
  39. std::size_t max_buf_size = max_buffer_size,
  40. std::size_t concurrency = default_concurrency() + 1 // The 1 is used for tcp acceptor
  41. )
  42. : super(concurrency)
  43. , acceptor_ (std::make_unique<asio::ip::tcp::acceptor>(this->io_->context()))
  44. , acceptor_timer_ (std::make_unique<asio::steady_timer>(this->io_->context()))
  45. , counter_timer_ (std::make_unique<asio::steady_timer>(this->io_->context()))
  46. , init_buffer_size_(init_buf_size)
  47. , max_buffer_size_ ( max_buf_size)
  48. {
  49. }
  50. template<class Scheduler, std::enable_if_t<!std::is_integral_v<detail::remove_cvref_t<Scheduler>>, int> = 0>
  51. explicit tcp_server_impl_t(
  52. std::size_t init_buf_size,
  53. std::size_t max_buf_size,
  54. Scheduler&& scheduler
  55. )
  56. : super(std::forward<Scheduler>(scheduler))
  57. , acceptor_ (std::make_unique<asio::ip::tcp::acceptor>(this->io_->context()))
  58. , acceptor_timer_ (std::make_unique<asio::steady_timer>(this->io_->context()))
  59. , counter_timer_ (std::make_unique<asio::steady_timer>(this->io_->context()))
  60. , init_buffer_size_(init_buf_size)
  61. , max_buffer_size_ ( max_buf_size)
  62. {
  63. }
  64. template<class Scheduler, std::enable_if_t<!std::is_integral_v<detail::remove_cvref_t<Scheduler>>, int> = 0>
  65. explicit tcp_server_impl_t(Scheduler&& scheduler)
  66. : tcp_server_impl_t(tcp_frame_size, max_buffer_size, std::forward<Scheduler>(scheduler))
  67. {
  68. }
  69. // -- Support initializer_list causes the code of inherited classes to be not concised
  70. //template<class Scheduler, std::enable_if_t<!std::is_integral_v<detail::remove_cvref_t<Scheduler>>, int> = 0>
  71. //explicit tcp_server_impl_t(
  72. // std::size_t init_buf_size,
  73. // std::size_t max_buf_size,
  74. // std::initializer_list<Scheduler> scheduler
  75. //)
  76. // : tcp_server_impl_t(init_buf_size, max_buf_size, std::vector<Scheduler>{std::move(scheduler)})
  77. //{
  78. //}
  79. //template<class Scheduler, std::enable_if_t<!std::is_integral_v<detail::remove_cvref_t<Scheduler>>, int> = 0>
  80. //explicit tcp_server_impl_t(std::initializer_list<Scheduler> scheduler)
  81. // : tcp_server_impl_t(tcp_frame_size, max_buffer_size, std::move(scheduler))
  82. //{
  83. //}
  84. /**
  85. * @brief destructor
  86. */
  87. ~tcp_server_impl_t()
  88. {
  89. this->stop();
  90. }
  91. /**
  92. * @brief start the server
  93. * @param host - A string identifying a location. May be a descriptive name or
  94. * a numeric address string.
  95. * @param service - A string identifying the requested service. This may be a
  96. * descriptive name or a numeric string corresponding to a port number.
  97. * @param args - The delimiter condition.Valid value types include the following:
  98. * char,std::string,std::string_view,
  99. * function:std::pair<iterator, bool> match_condition(iterator begin, iterator end),
  100. * asio::transfer_at_least,asio::transfer_exactly
  101. * more details see asio::read_until
  102. */
  103. template<typename String, typename StrOrInt, typename... Args>
  104. inline bool start(String&& host, StrOrInt&& service, Args&&... args)
  105. {
  106. return this->derived()._do_start(
  107. std::forward<String>(host), std::forward<StrOrInt>(service),
  108. ecs_helper::make_ecs(asio::transfer_at_least(1), std::forward<Args>(args)...));
  109. }
  110. /**
  111. * @brief stop the server
  112. * You can call this function on the communication thread and anywhere to stop the server.
  113. */
  114. inline void stop()
  115. {
  116. if (this->is_iopool_stopped())
  117. return;
  118. derived_t& derive = this->derived();
  119. derive.io_->unregobj(&derive);
  120. derive.post([&derive]() mutable
  121. {
  122. derive._do_stop(asio::error::operation_aborted, derive.selfptr());
  123. });
  124. this->stop_iopool();
  125. // asio bug , see : https://www.boost.org/users/history/version_1_72_0.html
  126. // Fixed a lost "outstanding work count" that can occur when an asynchronous
  127. // accept operation is automatically restarted.
  128. // Using boost 1.72.0 or above version can avoid this problem (asio 1.16.0)
  129. }
  130. /**
  131. * @brief destroy the content of all member variables, this is used for solve the memory leaks.
  132. * After this function is called, this class object cannot be used again.
  133. */
  134. inline void destroy()
  135. {
  136. derived_t& derive = this->derived();
  137. derive.counter_timer_.reset();
  138. derive.acceptor_timer_.reset();
  139. derive.acceptor_.reset();
  140. super::destroy();
  141. }
  142. /**
  143. * @brief check whether the server is started
  144. */
  145. inline bool is_started() const { return (super::is_started() && this->acceptor_->is_open()); }
  146. /**
  147. * @brief check whether the server is stopped
  148. */
  149. inline bool is_stopped() const
  150. {
  151. return (this->state_ == state_t::stopped && !this->acceptor_->is_open());
  152. }
  153. public:
  154. /**
  155. * @brief bind recv listener
  156. * @param fun - a user defined callback function.
  157. * @param obj - a pointer or reference to a class object, this parameter can be none.
  158. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  159. * the class object's pointer or reference.
  160. * Function signature : void(std::shared_ptr<asio2::tcp_session>& session_ptr, std::string_view data)
  161. */
  162. template<class F, class ...C>
  163. inline derived_t & bind_recv(F&& fun, C&&... obj)
  164. {
  165. this->listener_.bind(event_type::recv,
  166. observer_t<std::shared_ptr<session_t>&, std::string_view>(
  167. std::forward<F>(fun), std::forward<C>(obj)...));
  168. return (this->derived());
  169. }
  170. /**
  171. * @brief bind accept listener
  172. * @param fun - a user defined callback function.
  173. * @param obj - a pointer or reference to a class object, this parameter can be none.
  174. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  175. * the class object's pointer or reference.
  176. * This notification is invoked immediately when the server accept a new connection
  177. * Function signature : void(std::shared_ptr<asio2::tcp_session>& session_ptr)
  178. */
  179. template<class F, class ...C>
  180. inline derived_t & bind_accept(F&& fun, C&&... obj)
  181. {
  182. this->listener_.bind(event_type::accept,
  183. observer_t<std::shared_ptr<session_t>&>(
  184. std::forward<F>(fun), std::forward<C>(obj)...));
  185. return (this->derived());
  186. }
  187. /**
  188. * @brief bind connect listener
  189. * @param fun - a user defined callback function.
  190. * @param obj - a pointer or reference to a class object, this parameter can be none.
  191. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  192. * the class object's pointer or reference.
  193. * This notification is invoked after the connection is fully established,
  194. * and only after the connect/handshake/upgrade are completed.
  195. * Function signature : void(std::shared_ptr<asio2::tcp_session>& session_ptr)
  196. */
  197. template<class F, class ...C>
  198. inline derived_t & bind_connect(F&& fun, C&&... obj)
  199. {
  200. this->listener_.bind(event_type::connect,
  201. observer_t<std::shared_ptr<session_t>&>(
  202. std::forward<F>(fun), std::forward<C>(obj)...));
  203. return (this->derived());
  204. }
  205. /**
  206. * @brief bind disconnect listener
  207. * @param fun - a user defined callback function.
  208. * @param obj - a pointer or reference to a class object, this parameter can be none.
  209. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  210. * the class object's pointer or reference.
  211. * This notification is invoked before the connection is disconnected, you can call
  212. * get_last_error/last_error_msg, etc, to get the disconnected error information
  213. * Function signature : void(std::shared_ptr<asio2::tcp_session>& session_ptr)
  214. * If is http or websocket server, when enter the disconnect callback, the socket of the
  215. * session maybe closed already.
  216. */
  217. template<class F, class ...C>
  218. inline derived_t & bind_disconnect(F&& fun, C&&... obj)
  219. {
  220. this->listener_.bind(event_type::disconnect,
  221. observer_t<std::shared_ptr<session_t>&>(
  222. std::forward<F>(fun), std::forward<C>(obj)...));
  223. return (this->derived());
  224. }
  225. /**
  226. * @brief bind init listener,we should set socket options at here
  227. * @param fun - a user defined callback function.
  228. * @param obj - a pointer or reference to a class object, this parameter can be none.
  229. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  230. * the class object's pointer or reference.
  231. * This notification is called after the socket is opened.
  232. * You can set the socket option in this notification.
  233. * Function signature : void()
  234. */
  235. template<class F, class ...C>
  236. inline derived_t & bind_init(F&& fun, C&&... obj)
  237. {
  238. this->listener_.bind(event_type::init, observer_t<>(
  239. std::forward<F>(fun), std::forward<C>(obj)...));
  240. return (this->derived());
  241. }
  242. /**
  243. * @brief bind start listener
  244. * @param fun - a user defined callback function.
  245. * @param obj - a pointer or reference to a class object, this parameter can be none.
  246. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  247. * the class object's pointer or reference.
  248. * This notification is called after the server starts up, whether successful or unsuccessful
  249. * Function signature : void()
  250. */
  251. template<class F, class ...C>
  252. inline derived_t & bind_start(F&& fun, C&&... obj)
  253. {
  254. this->listener_.bind(event_type::start, observer_t<>(
  255. std::forward<F>(fun), std::forward<C>(obj)...));
  256. return (this->derived());
  257. }
  258. /**
  259. * @brief bind stop listener
  260. * @param fun - a user defined callback function.
  261. * @param obj - a pointer or reference to a class object, this parameter can be none.
  262. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  263. * the class object's pointer or reference.
  264. * This notification is called before the server is ready to stop
  265. * Function signature : void()
  266. */
  267. template<class F, class ...C>
  268. inline derived_t & bind_stop(F&& fun, C&&... obj)
  269. {
  270. this->listener_.bind(event_type::stop, observer_t<>(
  271. std::forward<F>(fun), std::forward<C>(obj)...));
  272. return (this->derived());
  273. }
  274. public:
  275. /**
  276. * @brief get the acceptor reference
  277. */
  278. inline asio::ip::tcp::acceptor & acceptor() noexcept { return *(this->acceptor_); }
  279. /**
  280. * @brief get the acceptor reference
  281. */
  282. inline asio::ip::tcp::acceptor const& acceptor() const noexcept { return *(this->acceptor_); }
  283. protected:
  284. template<typename String, typename StrOrInt, typename C>
  285. inline bool _do_start(String&& host, StrOrInt&& port, std::shared_ptr<ecs_t<C>> ecs)
  286. {
  287. derived_t& derive = this->derived();
  288. // if log is enabled, init the log first, otherwise when "Too many open files" error occurs,
  289. // the log file will be created failed too.
  290. #if defined(ASIO2_ENABLE_LOG)
  291. asio2::detail::get_logger();
  292. #endif
  293. this->start_iopool();
  294. if (!this->is_iopool_started())
  295. {
  296. set_last_error(asio::error::operation_aborted);
  297. return false;
  298. }
  299. asio::dispatch(derive.io_->context(), [&derive, this_ptr = derive.selfptr()]() mutable
  300. {
  301. detail::ignore_unused(this_ptr);
  302. // init the running thread id
  303. derive.io_->init_thread_id();
  304. });
  305. // use promise to get the result of async accept
  306. std::promise<error_code> promise;
  307. std::future<error_code> future = promise.get_future();
  308. // use derfer to ensure the promise's value must be seted.
  309. detail::defer_event pg
  310. {
  311. [promise = std::move(promise)]() mutable
  312. {
  313. promise.set_value(get_last_error());
  314. }
  315. };
  316. derive.post(
  317. [this, this_ptr = derive.selfptr(), ecs = std::move(ecs), pg = std::move(pg),
  318. host = std::forward<String>(host), port = std::forward<StrOrInt>(port)]
  319. () mutable
  320. {
  321. derived_t& derive = this->derived();
  322. state_t expected = state_t::stopped;
  323. if (!this->state_.compare_exchange_strong(expected, state_t::starting))
  324. {
  325. // if the state is not stopped, set the last error to already_started
  326. set_last_error(asio::error::already_started);
  327. return;
  328. }
  329. // must read/write ecs in the io_context thread.
  330. derive.ecs_ = ecs;
  331. derive.io_->regobj(&derive);
  332. #if defined(_DEBUG) || defined(DEBUG)
  333. this->sessions_.is_all_session_stop_called_ = false;
  334. this->is_stop_called_ = false;
  335. #endif
  336. // convert to string maybe throw some exception.
  337. std::string h = detail::to_string(std::move(host));
  338. std::string p = detail::to_string(std::move(port));
  339. expected = state_t::starting;
  340. if (!this->state_.compare_exchange_strong(expected, state_t::starting))
  341. {
  342. ASIO2_ASSERT(false);
  343. derive._handle_start(asio::error::operation_aborted, std::move(this_ptr), std::move(ecs));
  344. return;
  345. }
  346. super::start();
  347. this->counter_ptr_ = std::shared_ptr<void>((void*)1, [&derive](void*) mutable
  348. {
  349. derive._exec_stop(asio::error::operation_aborted, derive.selfptr());
  350. });
  351. error_code ec, ec_ignore;
  352. this->acceptor_->cancel(ec_ignore);
  353. this->acceptor_->close(ec_ignore);
  354. // parse address and port
  355. asio::ip::tcp::resolver resolver(this->io_->context());
  356. auto results = resolver.resolve(h, p,
  357. asio::ip::resolver_base::flags::passive |
  358. asio::ip::resolver_base::flags::address_configured, ec);
  359. if (ec)
  360. {
  361. derive._handle_start(ec, std::move(this_ptr), std::move(ecs));
  362. return;
  363. }
  364. if (results.empty())
  365. {
  366. derive._handle_start(asio::error::host_not_found, std::move(this_ptr), std::move(ecs));
  367. return;
  368. }
  369. asio::ip::tcp::endpoint endpoint = *results.begin();
  370. this->acceptor_->open(endpoint.protocol(), ec);
  371. if (ec)
  372. {
  373. derive._handle_start(ec, std::move(this_ptr), std::move(ecs));
  374. return;
  375. }
  376. // when you close socket in linux system,and start socket
  377. // immediate,you will get like this "the address is in use",
  378. // and bind is failed,but i'm suer i close the socket correct
  379. // already before,why does this happen? the reasion is the
  380. // socket option "TIME_WAIT",although you close the socket,
  381. // but the system not release the socket,util 2~4 seconds
  382. // later,so we can use the SO_REUSEADDR option to avoid this
  383. // problem,like below
  384. // set port reuse
  385. this->acceptor_->set_option(asio::ip::tcp::acceptor::reuse_address(true), ec_ignore);
  386. clear_last_error();
  387. derive._fire_init();
  388. this->acceptor_->bind(endpoint, ec);
  389. if (ec)
  390. {
  391. derive._handle_start(ec, std::move(this_ptr), std::move(ecs));
  392. return;
  393. }
  394. this->acceptor_->listen(asio::socket_base::max_listen_connections, ec);
  395. if (ec)
  396. {
  397. derive._handle_start(ec, std::move(this_ptr), std::move(ecs));
  398. return;
  399. }
  400. // if the some error occured in the _fire_init notify function, the
  401. // get_last_error maybe not zero, so if we use _handle_start(get_last_error()...
  402. // at here, the start will failed, and the user don't know what happend.
  403. // so we need use as this : _handle_start(error_code{}...
  404. derive._handle_start(ec, std::move(this_ptr), std::move(ecs));
  405. });
  406. if (!derive.io_->running_in_this_thread())
  407. {
  408. set_last_error(future.get());
  409. return static_cast<bool>(!get_last_error());
  410. }
  411. else
  412. {
  413. set_last_error(asio::error::in_progress);
  414. }
  415. // if the state is stopped , the return value is "is_started()".
  416. // if the state is stopping, the return value is false, the last error is already_started
  417. // if the state is starting, the return value is false, the last error is already_started
  418. // if the state is started , the return value is true , the last error is already_started
  419. return derive.is_started();
  420. }
  421. template<typename C>
  422. inline void _handle_start(error_code ec, std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  423. {
  424. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  425. // Whether the startup succeeds or fails, always call fire_start notification
  426. state_t expected = state_t::starting;
  427. if (!ec)
  428. if (!this->state_.compare_exchange_strong(expected, state_t::started))
  429. ec = asio::error::operation_aborted;
  430. set_last_error(ec);
  431. this->derived()._fire_start();
  432. expected = state_t::started;
  433. if (!ec)
  434. if (!this->state_.compare_exchange_strong(expected, state_t::started))
  435. ec = asio::error::operation_aborted;
  436. if (ec)
  437. {
  438. this->derived()._do_stop(ec, std::move(this_ptr));
  439. return;
  440. }
  441. this->derived()._post_accept(std::move(this_ptr), std::move(ecs));
  442. }
  443. inline void _do_stop(const error_code& ec, std::shared_ptr<derived_t> this_ptr)
  444. {
  445. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  446. state_t expected = state_t::starting;
  447. if (this->state_.compare_exchange_strong(expected, state_t::stopping))
  448. return this->derived()._post_stop(ec, std::move(this_ptr), expected);
  449. expected = state_t::started;
  450. if (this->state_.compare_exchange_strong(expected, state_t::stopping))
  451. return this->derived()._post_stop(ec, std::move(this_ptr), expected);
  452. }
  453. inline void _post_stop(const error_code& ec, std::shared_ptr<derived_t> this_ptr, state_t old_state)
  454. {
  455. // asio don't allow operate the same socket in multi thread,
  456. // if you close socket in one thread and another thread is
  457. // calling socket's async_... function,it will crash.so we
  458. // must care for operate the socket. when need close the
  459. // socket ,we use the io_context to post a event,make sure the
  460. // socket's close operation is in the same thread.
  461. asio::dispatch(this->derived().io_->context(), make_allocator(this->derived().wallocator(),
  462. [this, ec, old_state, this_ptr = std::move(this_ptr)]() mutable
  463. {
  464. detail::ignore_unused(this, ec, old_state, this_ptr);
  465. set_last_error(ec);
  466. ASIO2_ASSERT(this->state_ == state_t::stopping);
  467. // start timer to hold the acceptor io_context
  468. // should hold the server shared ptr too, if server is constructed with iopool, and
  469. // server is a tmp local variable, then the server maybe destroyed before sessions.
  470. // so we need hold this ptr to ensure server must be destroyed after sessions.
  471. this->counter_timer_->expires_after((std::chrono::nanoseconds::max)());
  472. this->counter_timer_->async_wait([this_ptr](const error_code&)
  473. {
  474. detail::ignore_unused(this_ptr);
  475. });
  476. // stop all the sessions, the session::stop must be no blocking,
  477. // otherwise it may be cause loop lock.
  478. this->sessions_.quick_for_each([](std::shared_ptr<session_t> & session_ptr) mutable
  479. {
  480. session_ptr->stop();
  481. });
  482. #if defined(_DEBUG) || defined(DEBUG)
  483. // Check whether all sessions are evenly distributed in io threads
  484. std::vector<std::shared_ptr<io_t>> iots;
  485. std::vector<int> session_counter;
  486. iots.resize(this->iopool().size());
  487. session_counter.resize(this->iopool().size());
  488. for (std::size_t i = 0; i < iots.size(); ++i)
  489. {
  490. iots[i] = this->_get_io(i);
  491. }
  492. this->sessions_.quick_for_each([&iots, &session_counter](std::shared_ptr<session_t>& session_ptr) mutable
  493. {
  494. for (std::size_t i = 0; i < iots.size(); ++i)
  495. {
  496. if (session_ptr->io_ == iots[i])
  497. {
  498. session_counter[i]++;
  499. break;
  500. }
  501. }
  502. });
  503. if (iots.size() > std::size_t(2) && this->get_session_count() > ((iots.size() - 1) * 5))
  504. {
  505. ASIO2_ASSERT(session_counter[0] == 0);
  506. int count_diff = (std::max)(int(this->get_session_count() / (iots.size() - 1) / 10), 10);
  507. for (std::size_t i = 1; i < iots.size(); ++i)
  508. {
  509. ASIO2_ASSERT(std::abs(session_counter[1] - session_counter[i]) < count_diff);
  510. }
  511. }
  512. asio2::ignore_unused(iots, session_counter);
  513. asio2::ignore_unused(this->sessions_.empty()); // used to test ThreadSafetyAnalysis
  514. #endif
  515. #if defined(_DEBUG) || defined(DEBUG)
  516. this->sessions_.is_all_session_stop_called_ = true;
  517. #endif
  518. if (this->counter_ptr_)
  519. {
  520. this->counter_ptr_.reset();
  521. }
  522. else
  523. {
  524. this->derived()._exec_stop(ec, std::move(this_ptr));
  525. }
  526. }));
  527. }
  528. inline void _exec_stop(const error_code& ec, std::shared_ptr<derived_t> this_ptr)
  529. {
  530. // use asio::post to ensure this server's _handle_stop is called must be after
  531. // all sessions _handle_stop has been called already.
  532. // if use asio::dispatch, session's _handle_stop maybe called first.
  533. asio::post(this->derived().io_->context(), make_allocator(this->derived().wallocator(),
  534. [this, ec, this_ptr = std::move(this_ptr)]() mutable
  535. {
  536. state_t expected = state_t::stopping;
  537. if (this->state_.compare_exchange_strong(expected, state_t::stopped))
  538. {
  539. this->derived()._handle_stop(ec, std::move(this_ptr));
  540. }
  541. else
  542. {
  543. ASIO2_ASSERT(false);
  544. }
  545. }));
  546. }
  547. inline void _handle_stop(const error_code& ec, std::shared_ptr<derived_t> this_ptr)
  548. {
  549. set_last_error(ec);
  550. this->derived()._fire_stop();
  551. detail::cancel_timer(*(this->acceptor_timer_));
  552. detail::cancel_timer(*(this->counter_timer_));
  553. // call the base class stop function
  554. super::stop();
  555. error_code ec_ignore{};
  556. // call acceptor's close function to notify the _handle_accept
  557. // function response with error > 0 , then the listen socket
  558. // can get notify to exit must ensure the close function has
  559. // been called,otherwise the _handle_accept will never return
  560. this->acceptor_->cancel(ec_ignore);
  561. this->acceptor_->close(ec_ignore);
  562. ASIO2_ASSERT(this->state_ == state_t::stopped);
  563. }
  564. template<typename... Args>
  565. inline std::shared_ptr<session_t> _make_session(Args&&... args)
  566. {
  567. // skip zero io, the 0 io is used for acceptor.
  568. // but if the iopool size is 1, this io will be the zero io forever.
  569. std::shared_ptr<io_t> iot;
  570. if (this->iots_.size() > std::size_t(1))
  571. {
  572. iot = this->_get_io();
  573. if (iot == this->_get_io(0))
  574. iot = this->_get_io();
  575. }
  576. else
  577. {
  578. iot = this->_get_io();
  579. }
  580. return std::make_shared<session_t>(std::forward<Args>(args)...,
  581. this->sessions_, this->listener_, std::move(iot),
  582. this->init_buffer_size_, this->max_buffer_size_);
  583. }
  584. template<typename C>
  585. inline void _post_accept(std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  586. {
  587. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  588. if (!this->derived().is_started())
  589. return;
  590. std::shared_ptr<session_t> session_ptr = this->derived()._make_session();
  591. #if defined(_DEBUG) || defined(DEBUG)
  592. ASIO2_ASSERT(this->derived().post_recv_counter_.load() == 0);
  593. this->derived().post_recv_counter_++;
  594. #endif
  595. asio::io_context& ex = session_ptr->io_->context();
  596. this->acceptor_->async_accept(ex, make_allocator(this->rallocator_,
  597. [this, sptr = std::move(session_ptr), this_ptr = std::move(this_ptr), ecs = std::move(ecs)]
  598. (const error_code& ec, asio::ip::tcp::socket peer) mutable
  599. {
  600. #if defined(_DEBUG) || defined(DEBUG)
  601. this->derived().post_recv_counter_--;
  602. #endif
  603. sptr->socket().lowest_layer() = std::move(peer);
  604. this->derived()._handle_accept(ec, std::move(sptr), std::move(this_ptr), std::move(ecs));
  605. }));
  606. }
  607. template<typename C>
  608. inline void _handle_accept(
  609. const error_code& ec, std::shared_ptr<session_t> session_ptr,
  610. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  611. {
  612. set_last_error(ec);
  613. // if the acceptor status is closed,don't call _post_accept again.
  614. if (ec == asio::error::operation_aborted)
  615. return;
  616. if (!this->derived().is_started())
  617. return;
  618. session_ptr->counter_ptr_ = this->counter_ptr_;
  619. session_ptr->start(detail::to_shared_ptr(ecs->clone()));
  620. // handle exception, may be is the exception "Too many open files" (exception code : 24)
  621. // asio::error::no_descriptors - Too many open files
  622. if (ec)
  623. {
  624. ASIO2_LOG_ERROR("Error occurred when accept:{} {}", ec.value(), ec.message());
  625. this->acceptor_timer_->expires_after(std::chrono::seconds(1));
  626. this->acceptor_timer_->async_wait(
  627. [this, this_ptr = std::move(this_ptr), ecs = std::move(ecs)]
  628. (const error_code& ec) mutable
  629. {
  630. if (ec == asio::error::operation_aborted)
  631. return;
  632. asio::post(this->io_->context(), make_allocator(this->wallocator(),
  633. [this, this_ptr = std::move(this_ptr), ecs = std::move(ecs)]() mutable
  634. {
  635. this->derived()._post_accept(std::move(this_ptr), std::move(ecs));
  636. }));
  637. });
  638. return;
  639. }
  640. this->derived()._post_accept(std::move(this_ptr), std::move(ecs));
  641. }
  642. inline void _fire_init()
  643. {
  644. // the _fire_init must be executed in the thread 0.
  645. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  646. ASIO2_ASSERT(!get_last_error());
  647. this->listener_.notify(event_type::init);
  648. }
  649. inline void _fire_start()
  650. {
  651. // the _fire_start must be executed in the thread 0.
  652. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  653. #if defined(_DEBUG) || defined(DEBUG)
  654. ASIO2_ASSERT(this->is_stop_called_ == false);
  655. #endif
  656. this->listener_.notify(event_type::start);
  657. }
  658. inline void _fire_stop()
  659. {
  660. // the _fire_stop must be executed in the thread 0.
  661. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  662. #if defined(_DEBUG) || defined(DEBUG)
  663. this->is_stop_called_ = true;
  664. #endif
  665. this->listener_.notify(event_type::stop);
  666. }
  667. protected:
  668. /// acceptor to accept client connection
  669. std::unique_ptr<asio::ip::tcp::acceptor> acceptor_;
  670. /// timer for acceptor exception, like the exception "Too many open files" (exception code : 24)
  671. std::unique_ptr<asio::steady_timer> acceptor_timer_;
  672. /// used to hold the acceptor io_context util all sessions are closed already.
  673. std::unique_ptr<asio::steady_timer> counter_timer_;
  674. std::size_t init_buffer_size_ = tcp_frame_size;
  675. std::size_t max_buffer_size_ = max_buffer_size;
  676. #if defined(_DEBUG) || defined(DEBUG)
  677. bool is_stop_called_ = false;
  678. #endif
  679. };
  680. }
  681. namespace asio2
  682. {
  683. template<class derived_t, class session_t>
  684. using tcp_server_impl_t = detail::tcp_server_impl_t<derived_t, session_t>;
  685. /**
  686. * @throws constructor maybe throw exception "Too many open files" (exception code : 24)
  687. * asio::error::no_descriptors - Too many open files
  688. */
  689. template<class session_t>
  690. class tcp_server_t : public detail::tcp_server_impl_t<tcp_server_t<session_t>, session_t>
  691. {
  692. public:
  693. using detail::tcp_server_impl_t<tcp_server_t<session_t>, session_t>::tcp_server_impl_t;
  694. };
  695. /**
  696. * @brief tcp server
  697. * If this object is created as a shared_ptr like std::shared_ptr<asio2::tcp_server> server;
  698. * you must call the server->stop() manual when exit, otherwise maybe cause memory leaks.
  699. * @throws constructor maybe throw exception "Too many open files" (exception code : 24)
  700. * asio::error::no_descriptors - Too many open files
  701. */
  702. using tcp_server = tcp_server_t<tcp_session>;
  703. }
  704. #if defined(ASIO2_INCLUDE_RATE_LIMIT)
  705. #include <asio2/tcp/tcp_stream.hpp>
  706. namespace asio2
  707. {
  708. template<class session_t>
  709. class tcp_rate_server_t : public asio2::tcp_server_impl_t<tcp_rate_server_t<session_t>, session_t>
  710. {
  711. public:
  712. using asio2::tcp_server_impl_t<tcp_rate_server_t<session_t>, session_t>::tcp_server_impl_t;
  713. };
  714. using tcp_rate_server = tcp_rate_server_t<tcp_rate_session>;
  715. }
  716. #endif
  717. #include <asio2/base/detail/pop_options.hpp>
  718. #endif // !__ASIO2_TCP_SERVER_HPP__