tcp_client.hpp 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. */
  10. #ifndef __ASIO2_TCP_CLIENT_HPP__
  11. #define __ASIO2_TCP_CLIENT_HPP__
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. #pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <asio2/base/detail/push_options.hpp>
  16. #include <asio2/base/client.hpp>
  17. #include <asio2/tcp/impl/tcp_keepalive_cp.hpp>
  18. #include <asio2/tcp/impl/tcp_send_op.hpp>
  19. #include <asio2/tcp/impl/tcp_recv_op.hpp>
  20. namespace asio2::detail
  21. {
  22. struct template_args_tcp_client : public tcp_tag
  23. {
  24. static constexpr bool is_session = false;
  25. static constexpr bool is_client = true;
  26. static constexpr bool is_server = false;
  27. using socket_t = asio::ip::tcp::socket;
  28. using buffer_t = asio::streambuf;
  29. using send_data_t = std::string_view;
  30. using recv_data_t = std::string_view;
  31. };
  32. ASIO2_CLASS_FORWARD_DECLARE_BASE;
  33. ASIO2_CLASS_FORWARD_DECLARE_TCP_BASE;
  34. ASIO2_CLASS_FORWARD_DECLARE_TCP_CLIENT;
  35. template<class derived_t, class args_t = template_args_tcp_client>
  36. class tcp_client_impl_t
  37. : public client_impl_t <derived_t, args_t>
  38. , public tcp_keepalive_cp <derived_t, args_t>
  39. , public tcp_send_op <derived_t, args_t>
  40. , public tcp_recv_op <derived_t, args_t>
  41. , public tcp_tag
  42. {
  43. ASIO2_CLASS_FRIEND_DECLARE_BASE;
  44. ASIO2_CLASS_FRIEND_DECLARE_TCP_BASE;
  45. ASIO2_CLASS_FRIEND_DECLARE_TCP_CLIENT;
  46. public:
  47. using super = client_impl_t <derived_t, args_t>;
  48. using self = tcp_client_impl_t<derived_t, args_t>;
  49. using args_type = args_t;
  50. using buffer_type = typename args_t::buffer_t;
  51. using send_data_t = typename args_t::send_data_t;
  52. using recv_data_t = typename args_t::recv_data_t;
  53. public:
  54. /**
  55. * @brief constructor
  56. */
  57. explicit tcp_client_impl_t(
  58. std::size_t init_buf_size = tcp_frame_size,
  59. std::size_t max_buf_size = max_buffer_size,
  60. std::size_t concurrency = 1
  61. )
  62. : super(init_buf_size, max_buf_size, concurrency)
  63. , tcp_keepalive_cp<derived_t, args_t>()
  64. , tcp_send_op <derived_t, args_t>()
  65. , tcp_recv_op <derived_t, args_t>()
  66. {
  67. this->set_connect_timeout(std::chrono::milliseconds(tcp_connect_timeout));
  68. }
  69. template<class Scheduler, std::enable_if_t<!std::is_integral_v<detail::remove_cvref_t<Scheduler>>, int> = 0>
  70. explicit tcp_client_impl_t(
  71. std::size_t init_buf_size,
  72. std::size_t max_buf_size,
  73. Scheduler&& scheduler
  74. )
  75. : super(init_buf_size, max_buf_size, std::forward<Scheduler>(scheduler))
  76. , tcp_keepalive_cp<derived_t, args_t>()
  77. , tcp_send_op <derived_t, args_t>()
  78. , tcp_recv_op <derived_t, args_t>()
  79. {
  80. this->set_connect_timeout(std::chrono::milliseconds(tcp_connect_timeout));
  81. }
  82. template<class Scheduler, std::enable_if_t<!std::is_integral_v<detail::remove_cvref_t<Scheduler>>, int> = 0>
  83. explicit tcp_client_impl_t(Scheduler&& scheduler)
  84. : tcp_client_impl_t(tcp_frame_size, max_buffer_size, std::forward<Scheduler>(scheduler))
  85. {
  86. }
  87. // -- Support initializer_list causes the code of inherited classes to be not concised
  88. //template<class Scheduler, std::enable_if_t<!std::is_integral_v<detail::remove_cvref_t<Scheduler>>, int> = 0>
  89. //explicit tcp_client_impl_t(
  90. // std::size_t init_buf_size,
  91. // std::size_t max_buf_size,
  92. // std::initializer_list<Scheduler> scheduler
  93. //)
  94. // : tcp_client_impl_t(init_buf_size, max_buf_size, std::vector<Scheduler>{std::move(scheduler)})
  95. //{
  96. //}
  97. //template<class Scheduler, std::enable_if_t<!std::is_integral_v<detail::remove_cvref_t<Scheduler>>, int> = 0>
  98. //explicit tcp_client_impl_t(std::initializer_list<Scheduler> scheduler)
  99. // : tcp_client_impl_t(tcp_frame_size, max_buffer_size, std::move(scheduler))
  100. //{
  101. //}
  102. /**
  103. * @brief destructor
  104. */
  105. ~tcp_client_impl_t()
  106. {
  107. this->stop();
  108. }
  109. /**
  110. * @brief start the client, blocking connect to server
  111. * @param host - A string identifying a location. May be a descriptive name or
  112. * a numeric address string.
  113. * @param port - A string identifying the requested service. This may be a
  114. * descriptive name or a numeric string corresponding to a port number.
  115. * @param args - The delimiter condition.Valid value types include the following:
  116. * char,std::string,std::string_view,
  117. * function:std::pair<iterator, bool> match_condition(iterator begin, iterator end),
  118. * asio::transfer_at_least,asio::transfer_exactly
  119. * more details see asio::read_until
  120. */
  121. template<typename String, typename StrOrInt, typename... Args>
  122. inline bool start(String&& host, StrOrInt&& port, Args&&... args)
  123. {
  124. return this->derived().template _do_connect<false>(
  125. std::forward<String>(host), std::forward<StrOrInt>(port),
  126. ecs_helper::make_ecs(asio::transfer_at_least(1), std::forward<Args>(args)...));
  127. }
  128. /**
  129. * @brief start the client, asynchronous connect to server
  130. * @param host - A string identifying a location. May be a descriptive name or
  131. * a numeric address string.
  132. * @param port - A string identifying the requested service. This may be a
  133. * descriptive name or a numeric string corresponding to a port number.
  134. * @param args - The delimiter condition.Valid value types include the following:
  135. * char,std::string,std::string_view,
  136. * function:std::pair<iterator, bool> match_condition(iterator begin, iterator end),
  137. * asio::transfer_at_least,asio::transfer_exactly
  138. * more details see asio::read_until
  139. */
  140. template<typename String, typename StrOrInt, typename... Args>
  141. inline bool async_start(String&& host, StrOrInt&& port, Args&&... args)
  142. {
  143. return this->derived().template _do_connect<true>(
  144. std::forward<String>(host), std::forward<StrOrInt>(port),
  145. ecs_helper::make_ecs(asio::transfer_at_least(1), std::forward<Args>(args)...));
  146. }
  147. /**
  148. * @brief stop the client
  149. * You can call this function in the communication thread and anywhere to stop the client.
  150. * If this function is called in the communication thread, it will post a asynchronous
  151. * event into the event queue, then return immediately.
  152. * If this function is called not in the communication thread, it will blocking forever
  153. * util the client is stopped completed.
  154. */
  155. inline void stop()
  156. {
  157. if (this->is_iopool_stopped())
  158. return;
  159. derived_t& derive = this->derived();
  160. derive.io_->unregobj(&derive);
  161. // use promise to get the result of stop
  162. std::promise<state_t> promise;
  163. std::future<state_t> future = promise.get_future();
  164. // use derfer to ensure the promise's value must be seted.
  165. detail::defer_event pg
  166. {
  167. [this, p = std::move(promise)]() mutable
  168. {
  169. p.set_value(this->state_.load());
  170. }
  171. };
  172. // if user call stop in the recv callback, use post event to executed a async event.
  173. derive.post_event([&derive, this_ptr = derive.selfptr(), pg = std::move(pg)]
  174. (event_queue_guard<derived_t> g) mutable
  175. {
  176. // first close the reconnect timer
  177. derive._stop_reconnect_timer();
  178. derive._do_disconnect(asio::error::operation_aborted, derive.selfptr(), defer_event
  179. {
  180. [&derive, this_ptr = std::move(this_ptr), pg = std::move(pg)]
  181. (event_queue_guard<derived_t> g) mutable
  182. {
  183. derive._do_stop(asio::error::operation_aborted, std::move(this_ptr), defer_event
  184. {
  185. [pg = std::move(pg)](event_queue_guard<derived_t> g) mutable
  186. {
  187. detail::ignore_unused(pg, g);
  188. // the "pg" should destroyed before the "g", otherwise if the "g"
  189. // is destroyed before "pg", the next event maybe called, then the
  190. // state maybe change to not stopped.
  191. {
  192. [[maybe_unused]] detail::defer_event t{ std::move(pg) };
  193. }
  194. }, std::move(g)
  195. });
  196. }, std::move(g)
  197. });
  198. });
  199. // use this to ensure the client is stopped completed when the stop is called not in
  200. // the io_context thread
  201. while (!derive.running_in_this_thread())
  202. {
  203. std::future_status status = future.wait_for(std::chrono::milliseconds(100));
  204. if (status == std::future_status::ready)
  205. {
  206. ASIO2_ASSERT(future.get() == state_t::stopped);
  207. break;
  208. }
  209. else
  210. {
  211. if (derive.get_thread_id() == std::thread::id{})
  212. break;
  213. if (derive.io_->context().stopped())
  214. break;
  215. }
  216. }
  217. this->stop_iopool();
  218. }
  219. public:
  220. /**
  221. * @brief bind recv listener
  222. * @param fun - a user defined callback function.
  223. * @param obj - a pointer or reference to a class object, this parameter can be none.
  224. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  225. * the class object's pointer or reference.
  226. * Function signature : void(std::string_view data)
  227. */
  228. template<class F, class ...C>
  229. inline derived_t & bind_recv(F&& fun, C&&... obj)
  230. {
  231. this->listener_.bind(event_type::recv,
  232. observer_t<std::string_view>(std::forward<F>(fun), std::forward<C>(obj)...));
  233. return (this->derived());
  234. }
  235. /**
  236. * @brief bind connect listener
  237. * @param fun - a user defined callback function.
  238. * @param obj - a pointer or reference to a class object, this parameter can be none.
  239. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  240. * the class object's pointer or reference.
  241. * This notification is called after the client connection completed, whether successful or unsuccessful
  242. * Function signature : void()
  243. */
  244. template<class F, class ...C>
  245. inline derived_t & bind_connect(F&& fun, C&&... obj)
  246. {
  247. this->listener_.bind(event_type::connect,
  248. observer_t<>(std::forward<F>(fun), std::forward<C>(obj)...));
  249. return (this->derived());
  250. }
  251. /**
  252. * @brief bind disconnect listener
  253. * @param fun - a user defined callback function.
  254. * @param obj - a pointer or reference to a class object, this parameter can be none.
  255. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  256. * the class object's pointer or reference.
  257. * This notification is called before the client is ready to disconnect
  258. * Function signature : void()
  259. */
  260. template<class F, class ...C>
  261. inline derived_t & bind_disconnect(F&& fun, C&&... obj)
  262. {
  263. this->listener_.bind(event_type::disconnect,
  264. observer_t<>(std::forward<F>(fun), std::forward<C>(obj)...));
  265. return (this->derived());
  266. }
  267. /**
  268. * @brief bind init listener,we should set socket options at here
  269. * @param fun - a user defined callback function.
  270. * @param obj - a pointer or reference to a class object, this parameter can be none.
  271. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  272. * the class object's pointer or reference.
  273. * Function signature : void()
  274. */
  275. template<class F, class ...C>
  276. inline derived_t & bind_init(F&& fun, C&&... obj)
  277. {
  278. this->listener_.bind(event_type::init,
  279. observer_t<>(std::forward<F>(fun), std::forward<C>(obj)...));
  280. return (this->derived());
  281. }
  282. protected:
  283. template<bool IsAsync, typename String, typename StrOrInt, typename C>
  284. inline bool _do_connect(String&& host, StrOrInt&& port, std::shared_ptr<ecs_t<C>> ecs)
  285. {
  286. derived_t& derive = this->derived();
  287. // if log is enabled, init the log first, otherwise when "Too many open files" error occurs,
  288. // the log file will be created failed too.
  289. #if defined(ASIO2_ENABLE_LOG)
  290. asio2::detail::get_logger();
  291. #endif
  292. this->start_iopool();
  293. if (!this->is_iopool_started())
  294. {
  295. set_last_error(asio::error::operation_aborted);
  296. return false;
  297. }
  298. asio::dispatch(derive.io_->context(), [&derive, this_ptr = derive.selfptr()]() mutable
  299. {
  300. detail::ignore_unused(this_ptr);
  301. // init the running thread id
  302. derive.io_->init_thread_id();
  303. });
  304. // use promise to get the result of async connect
  305. std::promise<error_code> promise;
  306. std::future<error_code> future = promise.get_future();
  307. // use derfer to ensure the promise's value must be seted.
  308. detail::defer_event pg
  309. {
  310. [promise = std::move(promise)]() mutable
  311. {
  312. promise.set_value(get_last_error());
  313. }
  314. };
  315. // if user call start in the recv callback, use post event to executed a async event.
  316. derive.post_event(
  317. [this, this_ptr = derive.selfptr(), ecs = std::move(ecs),
  318. host = std::forward<String>(host), port = std::forward<StrOrInt>(port), pg = std::move(pg)]
  319. (event_queue_guard<derived_t> g) mutable
  320. {
  321. derived_t& derive = this->derived();
  322. defer_event chain
  323. {
  324. [pg = std::move(pg)] (event_queue_guard<derived_t> g) mutable
  325. {
  326. detail::ignore_unused(pg, g);
  327. // the "pg" should destroyed before the "g", otherwise if the "g"
  328. // is destroyed before "pg", the next event maybe called, then the
  329. // state maybe change to not stopped.
  330. {
  331. [[maybe_unused]] detail::defer_event t{ std::move(pg) };
  332. }
  333. }, std::move(g)
  334. };
  335. state_t expected = state_t::stopped;
  336. if (!derive.state_.compare_exchange_strong(expected, state_t::starting))
  337. {
  338. // if the state is not stopped, set the last error to already_started
  339. set_last_error(asio::error::already_started);
  340. return;
  341. }
  342. // must read/write ecs in the io_context thread.
  343. derive.ecs_ = ecs;
  344. clear_last_error();
  345. derive.io_->regobj(&derive);
  346. #if defined(_DEBUG) || defined(DEBUG)
  347. this->is_stop_reconnect_timer_called_ = false;
  348. this->is_post_reconnect_timer_called_ = false;
  349. this->is_stop_connect_timeout_timer_called_ = false;
  350. this->is_disconnect_called_ = false;
  351. #endif
  352. // convert to string maybe throw some exception.
  353. this->host_ = detail::to_string(std::move(host));
  354. this->port_ = detail::to_string(std::move(port));
  355. super::start();
  356. derive._do_init(ecs);
  357. // ecs init
  358. derive._rdc_init(ecs);
  359. derive._socks5_init(ecs);
  360. derive.template _start_connect<IsAsync>(std::move(this_ptr), std::move(ecs), std::move(chain));
  361. });
  362. if constexpr (IsAsync)
  363. {
  364. set_last_error(asio::error::in_progress);
  365. return true;
  366. }
  367. else
  368. {
  369. if (!derive.io_->running_in_this_thread())
  370. {
  371. set_last_error(future.get());
  372. // beacuse here code is running in the user thread, not in the io_context thread,
  373. // so, even if the client is start successed, but if the server disconnect this
  374. // client after connect success, and when code run to here, the client's state
  375. // maybe stopping, so if we return derive.is_started();, the return value maybe
  376. // false, but we did connect to the server is successfully.
  377. return static_cast<bool>(!get_last_error());
  378. }
  379. else
  380. {
  381. set_last_error(asio::error::in_progress);
  382. }
  383. // if the state is stopped , the return value is "is_started()".
  384. // if the state is stopping, the return value is false, the last error is already_started
  385. // if the state is starting, the return value is false, the last error is already_started
  386. // if the state is started , the return value is true , the last error is already_started
  387. return derive.is_started();
  388. }
  389. }
  390. template<typename C>
  391. inline void _do_init(std::shared_ptr<ecs_t<C>>&) noexcept
  392. {
  393. #if defined(ASIO2_ENABLE_LOG)
  394. // Used to test whether the behavior of different compilers is consistent
  395. static_assert(tcp_send_op<derived_t, args_t>::template has_member_dgram<self>::value,
  396. "The behavior of different compilers is not consistent");
  397. #endif
  398. if constexpr (std::is_same_v<typename ecs_t<C>::condition_lowest_type, use_dgram_t>)
  399. this->dgram_ = true;
  400. else
  401. this->dgram_ = false;
  402. }
  403. template<typename C, typename DeferEvent>
  404. inline void _do_start(
  405. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
  406. {
  407. this->derived().update_alive_time();
  408. this->derived().reset_connect_time();
  409. this->derived()._start_recv(std::move(this_ptr), std::move(ecs), std::move(chain));
  410. }
  411. template<typename DeferEvent>
  412. inline void _handle_disconnect(const error_code& ec, std::shared_ptr<derived_t> this_ptr, DeferEvent chain)
  413. {
  414. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  415. ASIO2_ASSERT(this->state_ == state_t::stopped);
  416. ASIO2_LOG_DEBUG("tcp_client::_handle_disconnect: {} {}", ec.value(), ec.message());
  417. this->derived()._rdc_stop();
  418. // should we close the socket in handle disconnect function? otherwise when send
  419. // data failed, will cause the _do_disconnect function be called, then cause the
  420. // auto reconnect executed, and then the _post_recv will be return with some error,
  421. // and the _post_recv will cause the auto reconnect executed again.
  422. // can't use push event to close the socket, beacuse when used with websocket,
  423. // the websocket's async_close will be called, and the chain will passed into
  424. // the async_close, but the async_close will cause the chain interrupted, and
  425. // we don't know when the async_close will be completed, if another push event
  426. // was called during async_close executing, then here push event will after
  427. // the another event in the queue.
  428. // call shutdown again, beacuse the do shutdown maybe not called, eg: when
  429. // protocol error is checked in the mqtt or http, then the do disconnect
  430. // maybe called directly.
  431. // the socket maybe closed already in the connect timeout timer.
  432. if (this->socket().is_open())
  433. {
  434. error_code ec_linger{}, ec_ignore{};
  435. asio::socket_base::linger lnger{};
  436. this->socket().lowest_layer().get_option(lnger, ec_linger);
  437. // call socket's close function to notify the _handle_recv function response with
  438. // error > 0 ,then the socket can get notify to exit
  439. // Call shutdown() to indicate that you will not write any more data to the socket.
  440. if (!ec_linger && !(lnger.enabled() == true && lnger.timeout() == 0))
  441. {
  442. this->socket().shutdown(asio::socket_base::shutdown_both, ec_ignore);
  443. }
  444. // if the socket is basic_stream with rate limit, we should call the cancel,
  445. // otherwise the rate timer maybe can't canceled, and cause the io_context
  446. // can't stopped forever, even if the socket is closed already.
  447. this->socket().cancel(ec_ignore);
  448. // Call close,otherwise the _handle_recv will never return
  449. this->socket().close(ec_ignore);
  450. }
  451. super::_handle_disconnect(ec, std::move(this_ptr), std::move(chain));
  452. }
  453. template<typename DeferEvent>
  454. inline void _do_stop(const error_code& ec, std::shared_ptr<derived_t> this_ptr, DeferEvent chain)
  455. {
  456. // When use call client.stop in the io_context thread, then the iopool is not stopped,
  457. // but this client is stopped, When client.stop is called again in the not io_context
  458. // thread, then this client state is stopped.
  459. ASIO2_ASSERT(this->state_ == state_t::stopped);
  460. this->derived()._post_stop(ec, std::move(this_ptr), std::move(chain));
  461. }
  462. template<typename DeferEvent>
  463. inline void _post_stop(const error_code& ec, std::shared_ptr<derived_t> this_ptr, DeferEvent chain)
  464. {
  465. // All pending sending events will be cancelled after enter the callback below.
  466. this->derived().disp_event([this, ec, this_ptr = std::move(this_ptr), e = chain.move_event()]
  467. (event_queue_guard<derived_t> g) mutable
  468. {
  469. set_last_error(ec);
  470. defer_event chain(std::move(e), std::move(g));
  471. // call the base class stop function
  472. super::stop();
  473. // call CRTP polymorphic stop
  474. this->derived()._handle_stop(ec, std::move(this_ptr), std::move(chain));
  475. }, chain.move_guard());
  476. }
  477. template<typename DeferEvent>
  478. inline void _handle_stop(const error_code& ec, std::shared_ptr<derived_t> this_ptr, DeferEvent chain)
  479. {
  480. detail::ignore_unused(ec, this_ptr, chain);
  481. this->derived()._socks5_stop();
  482. ASIO2_ASSERT(this->state_ == state_t::stopped);
  483. }
  484. template<typename C, typename DeferEvent>
  485. inline void _start_recv(
  486. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
  487. {
  488. // Connect succeeded. post recv request.
  489. asio::dispatch(this->derived().io_->context(), make_allocator(this->derived().wallocator(),
  490. [this, this_ptr = std::move(this_ptr), ecs = std::move(ecs), chain = std::move(chain)]
  491. () mutable
  492. {
  493. using condition_lowest_type = typename ecs_t<C>::condition_lowest_type;
  494. detail::ignore_unused(chain);
  495. if constexpr (!std::is_same_v<condition_lowest_type, asio2::detail::hook_buffer_t>)
  496. {
  497. this->derived().buffer().consume(this->derived().buffer().size());
  498. }
  499. else
  500. {
  501. std::ignore = true;
  502. }
  503. this->derived()._post_recv(std::move(this_ptr), std::move(ecs));
  504. }));
  505. }
  506. template<class Data, class Callback>
  507. inline bool _do_send(Data& data, Callback&& callback)
  508. {
  509. return this->derived()._tcp_send(data, std::forward<Callback>(callback));
  510. }
  511. template<class Data>
  512. inline send_data_t _rdc_convert_to_send_data(Data& data) noexcept
  513. {
  514. auto buffer = asio::buffer(data);
  515. return send_data_t{ reinterpret_cast<
  516. std::string_view::const_pointer>(buffer.data()),buffer.size() };
  517. }
  518. template<class Invoker>
  519. inline void _rdc_invoke_with_none(const error_code& ec, Invoker& invoker)
  520. {
  521. if (invoker)
  522. invoker(ec, send_data_t{}, recv_data_t{});
  523. }
  524. template<class Invoker>
  525. inline void _rdc_invoke_with_recv(const error_code& ec, Invoker& invoker, recv_data_t data)
  526. {
  527. if (invoker)
  528. invoker(ec, send_data_t{}, data);
  529. }
  530. template<class Invoker>
  531. inline void _rdc_invoke_with_send(const error_code& ec, Invoker& invoker, send_data_t data)
  532. {
  533. if (invoker)
  534. invoker(ec, data, recv_data_t{});
  535. }
  536. protected:
  537. template<typename C>
  538. inline void _post_recv(std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  539. {
  540. this->derived()._tcp_post_recv(std::move(this_ptr), std::move(ecs));
  541. }
  542. template<typename C>
  543. inline void _handle_recv(const error_code & ec, std::size_t bytes_recvd,
  544. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  545. {
  546. this->derived()._tcp_handle_recv(ec, bytes_recvd, std::move(this_ptr), std::move(ecs));
  547. }
  548. inline void _fire_init()
  549. {
  550. // the _fire_init must be executed in the thread 0.
  551. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  552. ASIO2_ASSERT(!get_last_error());
  553. this->listener_.notify(event_type::init);
  554. }
  555. template<typename C>
  556. inline void _fire_recv(
  557. std::shared_ptr<derived_t>& this_ptr, std::shared_ptr<ecs_t<C>>& ecs, std::string_view data)
  558. {
  559. data = detail::call_data_filter_before_recv(this->derived(), data);
  560. this->listener_.notify(event_type::recv, data);
  561. this->derived()._rdc_handle_recv(this_ptr, ecs, data);
  562. }
  563. template<typename C>
  564. inline void _fire_connect(std::shared_ptr<derived_t>& this_ptr, std::shared_ptr<ecs_t<C>>& ecs)
  565. {
  566. // the _fire_connect must be executed in the thread 0.
  567. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  568. #if defined(_DEBUG) || defined(DEBUG)
  569. ASIO2_ASSERT(this->is_disconnect_called_ == false);
  570. #endif
  571. if (!get_last_error())
  572. {
  573. this->derived()._rdc_start(this_ptr, ecs);
  574. }
  575. this->listener_.notify(event_type::connect);
  576. }
  577. inline void _fire_disconnect(std::shared_ptr<derived_t>& this_ptr)
  578. {
  579. // the _fire_disconnect must be executed in the thread 0.
  580. ASIO2_ASSERT(this->derived().io_->running_in_this_thread());
  581. #if defined(_DEBUG) || defined(DEBUG)
  582. this->is_disconnect_called_ = true;
  583. #endif
  584. detail::ignore_unused(this_ptr);
  585. this->listener_.notify(event_type::disconnect);
  586. }
  587. protected:
  588. bool dgram_ = false;
  589. #if defined(_DEBUG) || defined(DEBUG)
  590. bool is_disconnect_called_ = false;
  591. #endif
  592. };
  593. }
  594. namespace asio2
  595. {
  596. using tcp_client_args = detail::template_args_tcp_client;
  597. template<class derived_t, class args_t>
  598. using tcp_client_impl_t = detail::tcp_client_impl_t<derived_t, args_t>;
  599. /**
  600. * @brief tcp client template class
  601. * @throws constructor maybe throw exception "Too many open files" (exception code : 24)
  602. * asio::error::no_descriptors - Too many open files
  603. */
  604. template<class derived_t>
  605. class tcp_client_t : public detail::tcp_client_impl_t<derived_t, detail::template_args_tcp_client>
  606. {
  607. public:
  608. using detail::tcp_client_impl_t<derived_t, detail::template_args_tcp_client>::tcp_client_impl_t;
  609. };
  610. /**
  611. * @brief tcp client
  612. * If this object is created as a shared_ptr like std::shared_ptr<asio2::tcp_client> client;
  613. * you must call the client->stop() manual when exit, otherwise maybe cause memory leaks.
  614. * @throws constructor maybe throw exception "Too many open files" (exception code : 24)
  615. * asio::error::no_descriptors - Too many open files
  616. */
  617. class tcp_client : public tcp_client_t<tcp_client>
  618. {
  619. public:
  620. using tcp_client_t<tcp_client>::tcp_client_t;
  621. };
  622. }
  623. #if defined(ASIO2_INCLUDE_RATE_LIMIT)
  624. #include <asio2/tcp/tcp_stream.hpp>
  625. namespace asio2
  626. {
  627. struct tcp_rate_client_args : public tcp_client_args
  628. {
  629. using socket_t = asio2::tcp_stream<asio2::simple_rate_policy>;
  630. };
  631. template<class derived_t>
  632. class tcp_rate_client_t : public asio2::tcp_client_impl_t<derived_t, tcp_rate_client_args>
  633. {
  634. public:
  635. using asio2::tcp_client_impl_t<derived_t, tcp_rate_client_args>::tcp_client_impl_t;
  636. };
  637. class tcp_rate_client : public asio2::tcp_rate_client_t<tcp_rate_client>
  638. {
  639. public:
  640. using asio2::tcp_rate_client_t<tcp_rate_client>::tcp_rate_client_t;
  641. };
  642. }
  643. #endif
  644. #include <asio2/base/detail/pop_options.hpp>
  645. #endif // !__ASIO2_TCP_CLIENT_HPP__