send_cp.hpp 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. *
  10. * https://www.zhihu.com/question/25016042
  11. * Windows API : send
  12. * The successful completion of a send function does not indicate that the data
  13. * was successfully delivered and received to the recipient. This function only
  14. * indicates the data was successfully sent.
  15. *
  16. */
  17. #ifndef __ASIO2_SEND_COMPONENT_HPP__
  18. #define __ASIO2_SEND_COMPONENT_HPP__
  19. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  20. #pragma once
  21. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  22. #include <cstdint>
  23. #include <memory>
  24. #include <functional>
  25. #include <string>
  26. #include <future>
  27. #include <tuple>
  28. #include <utility>
  29. #include <string_view>
  30. #include <asio2/base/iopool.hpp>
  31. #include <asio2/base/define.hpp>
  32. #include <asio2/base/detail/util.hpp>
  33. #include <asio2/base/detail/function_traits.hpp>
  34. #include <asio2/base/detail/buffer_wrap.hpp>
  35. #include <asio2/base/impl/data_persistence_cp.hpp>
  36. namespace asio2::detail
  37. {
  38. ASIO2_CLASS_FORWARD_DECLARE_BASE;
  39. template<class derived_t, class args_t>
  40. class send_cp : public data_persistence_cp<derived_t, args_t>
  41. {
  42. ASIO2_CLASS_FRIEND_DECLARE_BASE;
  43. public:
  44. /**
  45. * @brief constructor
  46. */
  47. send_cp() noexcept {}
  48. /**
  49. * @brief destructor
  50. */
  51. ~send_cp() = default;
  52. public:
  53. /**
  54. * @brief Asynchronous send data, support multiple data formats,
  55. * see asio::buffer(...) in /asio/buffer.hpp
  56. * You can call this function on the communication thread and anywhere,it's multi thread safed.
  57. * use like this : std::string m; async_send(std::move(m)); can reducing memory allocation.
  58. * PodType * : async_send("abc");
  59. * PodType (&data)[N] : double m[10]; async_send(m);
  60. * std::array<PodType, N> : std::array<int,10> m; async_send(m);
  61. * std::vector<PodType, Allocator> : std::vector<float> m; async_send(m);
  62. * std::basic_string<Elem, Traits, Allocator> : std::string m; async_send(m);
  63. */
  64. template<class DataT>
  65. inline void async_send(DataT&& data) noexcept
  66. {
  67. derived_t& derive = static_cast<derived_t&>(*this);
  68. // use this guard to fix the issue of below "# issue x:"
  69. detail::integer_add_sub_guard asg(derive.io_->pending());
  70. // We must ensure that there is only one operation to send data
  71. // at the same time,otherwise may be cause crash.
  72. // # issue x:
  73. // 1. user call "async_send" function in some worker thread (not io_context thread)
  74. // 2. code run to here, "here" means between "if (!derive.is_started())" and
  75. // "derive.push_event("
  76. // 3. the operating system switched from this thread to other thread
  77. // 4. user call "client.stop", and the client is stopped successed
  78. // 5. the operating system switched from other thread to this thread
  79. // 6. code run to below, it means code run to "derive.push_event("
  80. // 7. beacuse the iopool is stopped alreadly, so the "derive.push_event(" and
  81. // "derive._do_send(..." will can't be executed.
  82. derive.push_event(
  83. [&derive, p = derive.selfptr(), id = derive.life_id(),
  84. data = derive._data_persistence(std::forward<DataT>(data))]
  85. (event_queue_guard<derived_t> g) mutable
  86. {
  87. if (!derive.is_started())
  88. {
  89. set_last_error(asio::error::not_connected);
  90. return;
  91. }
  92. if (id != derive.life_id())
  93. {
  94. set_last_error(asio::error::operation_aborted);
  95. return;
  96. }
  97. clear_last_error();
  98. derive._do_send(data, [g = std::move(g)](const error_code&, std::size_t) mutable {});
  99. });
  100. }
  101. /**
  102. * @brief Asynchronous send data
  103. * You can call this function on the communication thread and anywhere,it's multi thread safed.
  104. * PodType * : async_send("abc");
  105. */
  106. template<class CharT, class Traits = std::char_traits<CharT>>
  107. inline typename std::enable_if_t<detail::is_char_v<CharT>, void> async_send(CharT * s) noexcept
  108. {
  109. derived_t& derive = static_cast<derived_t&>(*this);
  110. derive.async_send(s, s ? Traits::length(s) : 0);
  111. }
  112. /**
  113. * @brief Asynchronous send data
  114. * You can call this function on the communication thread and anywhere,it's multi thread safed.
  115. * PodType (&data)[N] : double m[10]; async_send(m,5);
  116. */
  117. template<class CharT, class SizeT>
  118. inline typename std::enable_if_t<std::is_integral_v<detail::remove_cvref_t<SizeT>>, void>
  119. async_send(CharT* s, SizeT count) noexcept
  120. {
  121. derived_t& derive = static_cast<derived_t&>(*this);
  122. if (!s)
  123. {
  124. set_last_error(asio::error::invalid_argument);
  125. return;
  126. }
  127. detail::integer_add_sub_guard asg(derive.io_->pending());
  128. // We must ensure that there is only one operation to send data
  129. // at the same time,otherwise may be cause crash.
  130. derive.push_event(
  131. [&derive, p = derive.selfptr(), id = derive.life_id(), data = derive._data_persistence(s, count)]
  132. (event_queue_guard<derived_t> g) mutable
  133. {
  134. if (!derive.is_started())
  135. {
  136. set_last_error(asio::error::not_connected);
  137. return;
  138. }
  139. if (id != derive.life_id())
  140. {
  141. set_last_error(asio::error::operation_aborted);
  142. return;
  143. }
  144. clear_last_error();
  145. derive._do_send(data, [g = std::move(g)](const error_code&, std::size_t) mutable {});
  146. });
  147. }
  148. /**
  149. * @brief Asynchronous send data, support multiple data formats,
  150. * see asio::buffer(...) in /asio/buffer.hpp
  151. * use like this : std::string m; async_send(std::move(m)); can reducing memory allocation.
  152. * the pair.first save the send result error_code,the pair.second save the sent_bytes.
  153. * note : Do not call this function in any listener callback function like this:
  154. * auto future = async_send(msg,asio::use_future); future.get(); it will cause deadlock and
  155. * the future.get() will never return.
  156. * PodType * : async_send("abc");
  157. * PodType (&data)[N] : double m[10]; async_send(m);
  158. * std::array<PodType, N> : std::array<int,10> m; async_send(m);
  159. * std::vector<PodType, Allocator> : std::vector<float> m; async_send(m);
  160. * std::basic_string<Elem, Traits, Allocator> : std::string m; async_send(m);
  161. */
  162. template<class DataT>
  163. inline std::future<std::pair<error_code, std::size_t>> async_send(DataT&& data, asio::use_future_t<>)
  164. {
  165. derived_t& derive = static_cast<derived_t&>(*this);
  166. // why use copyable_wrapper? beacuse std::promise is moveable-only, but
  167. // std::function need copy-constructible.
  168. // 20220211 :
  169. // Due to the event_queue_cp uses a custom detail::function that supports moveable-only instead
  170. // of std::function, so copyable_wrapper is no longer required.
  171. // https://en.cppreference.com/w/cpp/utility/functional/function/function
  172. // The program is ill-formed if the target type is not copy-constructible or
  173. // initialization of the target is ill-formed.
  174. std::promise<std::pair<error_code, std::size_t>> promise;
  175. std::future<std::pair<error_code, std::size_t>> future = promise.get_future();
  176. detail::integer_add_sub_guard asg(derive.io_->pending());
  177. derive.push_event(
  178. [&derive, p = derive.selfptr(), id = derive.life_id(), promise = std::move(promise),
  179. data = derive._data_persistence(std::forward<DataT>(data))]
  180. (event_queue_guard<derived_t> g) mutable
  181. {
  182. if (!derive.is_started())
  183. {
  184. set_last_error(asio::error::not_connected);
  185. promise.set_value(std::pair<error_code, std::size_t>(asio::error::not_connected, 0));
  186. return;
  187. }
  188. if (id != derive.life_id())
  189. {
  190. set_last_error(asio::error::operation_aborted);
  191. promise.set_value(std::pair<error_code, std::size_t>(asio::error::operation_aborted, 0));
  192. return;
  193. }
  194. clear_last_error();
  195. derive._do_send(data, [&promise, g = std::move(g)]
  196. (const error_code& ec, std::size_t bytes_sent) mutable
  197. {
  198. promise.set_value(std::pair<error_code, std::size_t>(ec, bytes_sent));
  199. });
  200. });
  201. return future;
  202. }
  203. /**
  204. * @brief Asynchronous send data
  205. * the pair.first save the send result error_code,the pair.second save the sent_bytes.
  206. * note : Do not call this function in any listener callback function like this:
  207. * auto future = async_send(msg,asio::use_future); future.get(); it will cause deadlock and
  208. * the future.get() will never return.
  209. * PodType * : async_send("abc");
  210. */
  211. template<class CharT, class Traits = std::char_traits<CharT>>
  212. inline typename std::enable_if_t<detail::is_char_v<CharT>, std::future<std::pair<error_code, std::size_t>>>
  213. async_send(CharT * s, asio::use_future_t<> flag)
  214. {
  215. derived_t& derive = static_cast<derived_t&>(*this);
  216. return derive.async_send(s, s ? Traits::length(s) : 0, std::move(flag));
  217. }
  218. /**
  219. * @brief Asynchronous send data
  220. * the pair.first save the send result error_code,the pair.second save the sent_bytes.
  221. * note : Do not call this function in any listener callback function like this:
  222. * auto future = async_send(msg,asio::use_future); future.get(); it will cause deadlock,
  223. * the future.get() will never return.
  224. * PodType (&data)[N] : double m[10]; async_send(m,5);
  225. */
  226. template<class CharT, class SizeT>
  227. inline typename std::enable_if_t<std::is_integral_v<detail::remove_cvref_t<SizeT>>,
  228. std::future<std::pair<error_code, std::size_t>>>
  229. async_send(CharT * s, SizeT count, asio::use_future_t<>)
  230. {
  231. derived_t& derive = static_cast<derived_t&>(*this);
  232. std::promise<std::pair<error_code, std::size_t>> promise;
  233. std::future<std::pair<error_code, std::size_t>> future = promise.get_future();
  234. if (!s)
  235. {
  236. set_last_error(asio::error::invalid_argument);
  237. promise.set_value(std::pair<error_code, std::size_t>(asio::error::invalid_argument, 0));
  238. return future;
  239. }
  240. detail::integer_add_sub_guard asg(derive.io_->pending());
  241. derive.push_event(
  242. [&derive, p = derive.selfptr(), id = derive.life_id(), promise = std::move(promise),
  243. data = derive._data_persistence(s, count)]
  244. (event_queue_guard<derived_t> g) mutable
  245. {
  246. if (!derive.is_started())
  247. {
  248. set_last_error(asio::error::not_connected);
  249. promise.set_value(std::pair<error_code, std::size_t>(asio::error::not_connected, 0));
  250. return;
  251. }
  252. if (id != derive.life_id())
  253. {
  254. set_last_error(asio::error::operation_aborted);
  255. promise.set_value(std::pair<error_code, std::size_t>(asio::error::operation_aborted, 0));
  256. return;
  257. }
  258. clear_last_error();
  259. derive._do_send(data, [&promise, g = std::move(g)]
  260. (const error_code& ec, std::size_t bytes_sent) mutable
  261. {
  262. promise.set_value(std::pair<error_code, std::size_t>(ec, bytes_sent));
  263. });
  264. });
  265. return future;
  266. }
  267. /**
  268. * @brief Asynchronous send data, support multiple data formats,
  269. * see asio::buffer(...) in /asio/buffer.hpp
  270. * You can call this function on the communication thread and anywhere,it's multi thread safed.
  271. * use like this : std::string m; async_send(std::move(m)); can reducing memory allocation.
  272. * PodType * : async_send("abc");
  273. * PodType (&data)[N] : double m[10]; async_send(m);
  274. * std::array<PodType, N> : std::array<int,10> m; async_send(m);
  275. * std::vector<PodType, Allocator> : std::vector<float> m; async_send(m);
  276. * std::basic_string<Elem, Traits, Allocator> : std::string m; async_send(m);
  277. * Callback signature : void() or void(std::size_t bytes_sent)
  278. */
  279. template<class DataT, class Callback>
  280. inline typename std::enable_if_t<is_callable_v<Callback>, void> async_send(DataT&& data, Callback&& fn)
  281. {
  282. derived_t& derive = static_cast<derived_t&>(*this);
  283. detail::integer_add_sub_guard asg(derive.io_->pending());
  284. // We must ensure that there is only one operation to send data
  285. // at the same time,otherwise may be cause crash.
  286. derive.push_event(
  287. [&derive, p = derive.selfptr(), id = derive.life_id(), fn = std::forward<Callback>(fn),
  288. data = derive._data_persistence(std::forward<DataT>(data))]
  289. (event_queue_guard<derived_t> g) mutable
  290. {
  291. if (!derive.is_started())
  292. {
  293. set_last_error(asio::error::not_connected);
  294. callback_helper::call(fn, 0);
  295. return;
  296. }
  297. if (id != derive.life_id())
  298. {
  299. set_last_error(asio::error::operation_aborted);
  300. callback_helper::call(fn, 0);
  301. return;
  302. }
  303. clear_last_error();
  304. derive._do_send(data, [&fn, g = std::move(g)]
  305. (const error_code&, std::size_t bytes_sent) mutable
  306. {
  307. ASIO2_ASSERT(!g.is_empty());
  308. callback_helper::call(fn, bytes_sent);
  309. });
  310. });
  311. }
  312. /**
  313. * @brief Asynchronous send data
  314. * You can call this function on the communication thread and anywhere,it's multi thread safed.
  315. * PodType * : async_send("abc");
  316. * Callback signature : void() or void(std::size_t bytes_sent)
  317. */
  318. template<class Callback, class CharT, class Traits = std::char_traits<CharT>>
  319. inline typename std::enable_if_t<is_callable_v<Callback> && detail::is_char_v<CharT>, void>
  320. async_send(CharT * s, Callback&& fn)
  321. {
  322. derived_t& derive = static_cast<derived_t&>(*this);
  323. derive.async_send(s, s ? Traits::length(s) : 0, std::forward<Callback>(fn));
  324. }
  325. /**
  326. * @brief Asynchronous send data
  327. * You can call this function on the communication thread and anywhere,it's multi thread safed.
  328. * PodType (&data)[N] : double m[10]; async_send(m,5);
  329. * Callback signature : void() or void(std::size_t bytes_sent)
  330. */
  331. template<class Callback, class CharT, class SizeT>
  332. inline typename std::enable_if_t<is_callable_v<Callback> &&
  333. std::is_integral_v<detail::remove_cvref_t<SizeT>>, void>
  334. async_send(CharT * s, SizeT count, Callback&& fn)
  335. {
  336. derived_t& derive = static_cast<derived_t&>(*this);
  337. detail::integer_add_sub_guard asg(derive.io_->pending());
  338. // We must ensure that there is only one operation to send data
  339. // at the same time,otherwise may be cause crash.
  340. derive.push_event(
  341. [&derive, p = derive.selfptr(), id = derive.life_id(), fn = std::forward<Callback>(fn),
  342. s, data = derive._data_persistence(s, count)]
  343. (event_queue_guard<derived_t> g) mutable
  344. {
  345. if (!s)
  346. {
  347. set_last_error(asio::error::invalid_argument);
  348. callback_helper::call(fn, 0);
  349. return;
  350. }
  351. if (!derive.is_started())
  352. {
  353. set_last_error(asio::error::not_connected);
  354. callback_helper::call(fn, 0);
  355. return;
  356. }
  357. if (id != derive.life_id())
  358. {
  359. set_last_error(asio::error::operation_aborted);
  360. callback_helper::call(fn, 0);
  361. return;
  362. }
  363. clear_last_error();
  364. derive._do_send(data, [&fn, g = std::move(g)]
  365. (const error_code&, std::size_t bytes_sent) mutable
  366. {
  367. callback_helper::call(fn, bytes_sent);
  368. });
  369. });
  370. }
  371. public:
  372. /**
  373. * @brief Synchronous send data, support multiple data formats,
  374. * see asio::buffer(...) in /asio/buffer.hpp
  375. * @return Number of bytes written from the data. If an error occurred,
  376. * this will be less than the sum of the data size.
  377. * You can call this function on the communication thread and anywhere,it's multi thread safed.
  378. * Note : If this function is called in communication thread, it will degenerates into async_send
  379. * and the return value is 0, you can use asio2::get_last_error() to check whether the
  380. * send is success, if asio2::get_last_error() is equal to asio::error::in_progress, it
  381. * means success, otherwise failed.
  382. * use like this : std::string m; send(std::move(m)); can reducing memory allocation.
  383. * PodType * : send("abc");
  384. * PodType (&data)[N] : double m[10]; send(m);
  385. * std::array<PodType, N> : std::array<int,10> m; send(m);
  386. * std::vector<PodType, Allocator> : std::vector<float> m; send(m);
  387. * std::basic_string<Elem, Traits, Allocator> : std::string m; send(m);
  388. */
  389. template<class DataT>
  390. inline std::size_t send(DataT&& data)
  391. {
  392. derived_t& derive = static_cast<derived_t&>(*this);
  393. std::future<std::pair<error_code, std::size_t>> future = derive.async_send(
  394. std::forward<DataT>(data), asio::use_future);
  395. // Whether we run on the io_context thread
  396. if (derive.io_->running_in_this_thread())
  397. {
  398. std::future_status status = future.wait_for(std::chrono::nanoseconds(0));
  399. // async_send failed :
  400. // beacuse current thread is the io_context thread, so the data must be sent in the future,
  401. // so if the "status" is ready, it means that the data must have failed to be sent.
  402. if (status == std::future_status::ready)
  403. {
  404. set_last_error(future.get().first);
  405. return std::size_t(0);
  406. }
  407. // async_send in_progress.
  408. else
  409. {
  410. set_last_error(asio::error::in_progress);
  411. return std::size_t(0);
  412. }
  413. }
  414. std::pair<error_code, std::size_t> pair = future.get();
  415. set_last_error(pair.first);
  416. return pair.second;
  417. }
  418. /**
  419. * @brief Synchronous send data
  420. * @return Number of bytes written from the data. If an error occurred,
  421. * this will be less than the sum of the data size.
  422. * You can call this function on the communication thread and anywhere,it's multi thread safed.
  423. * Note : If this function is called in communication thread, it will degenerates into async_send
  424. * and the return value is 0, you can use asio2::get_last_error() to check whether the
  425. * send is success, if asio2::get_last_error() is equal to asio::error::in_progress, it
  426. * means success, otherwise failed.
  427. * PodType * : send("abc");
  428. */
  429. template<class CharT, class Traits = std::char_traits<CharT>>
  430. inline typename std::enable_if_t<detail::is_char_v<CharT>, std::size_t> send(CharT * s)
  431. {
  432. derived_t& derive = static_cast<derived_t&>(*this);
  433. return derive.send(s, s ? Traits::length(s) : 0);
  434. }
  435. /**
  436. * @brief Synchronous send data
  437. * @return Number of bytes written from the data. If an error occurred,
  438. * this will be less than the sum of the data size.
  439. * You can call this function on the communication thread and anywhere,it's multi thread safed.
  440. * Note : If this function is called in communication thread, it will degenerates into async_send
  441. * and the return value is 0, you can use asio2::get_last_error() to check whether the
  442. * send is success, if asio2::get_last_error() is equal to asio::error::in_progress, it
  443. * means success, otherwise failed.
  444. * PodType (&data)[N] : double m[10]; send(m,5);
  445. */
  446. template<class CharT, class SizeT>
  447. inline typename std::enable_if_t<std::is_integral_v<detail::remove_cvref_t<SizeT>>, std::size_t>
  448. send(CharT* s, SizeT count)
  449. {
  450. derived_t& derive = static_cast<derived_t&>(*this);
  451. return derive.send(derive._data_persistence(s, count));
  452. }
  453. protected:
  454. /**
  455. * @brief Asynchronous send data, support multiple data formats,
  456. * see asio::buffer(...) in /asio/buffer.hpp
  457. * You can call this function on the communication thread and anywhere,it's multi thread safed.
  458. * use like this : std::string m; async_send(std::move(m)); can reducing memory allocation.
  459. * PodType * : async_send("abc");
  460. * PodType (&data)[N] : double m[10]; async_send(m);
  461. * std::array<PodType, N> : std::array<int,10> m; async_send(m);
  462. * std::vector<PodType, Allocator> : std::vector<float> m; async_send(m);
  463. * std::basic_string<Elem, Traits, Allocator> : std::string m; async_send(m);
  464. */
  465. template<class DataT>
  466. inline void internal_async_send(std::shared_ptr<derived_t> this_ptr, DataT&& data) noexcept
  467. {
  468. derived_t& derive = static_cast<derived_t&>(*this);
  469. ASIO2_ASSERT(derive.io_->running_in_this_thread());
  470. derive.push_event(
  471. [&derive, p = std::move(this_ptr), id = derive.life_id(),
  472. data = derive._data_persistence(std::forward<DataT>(data))]
  473. (event_queue_guard<derived_t> g) mutable
  474. {
  475. if (!derive.is_started())
  476. {
  477. set_last_error(asio::error::not_connected);
  478. return;
  479. }
  480. if (id != derive.life_id())
  481. {
  482. set_last_error(asio::error::operation_aborted);
  483. return;
  484. }
  485. clear_last_error();
  486. derive._do_send(data, [g = std::move(g), p = std::move(p)](const error_code&, std::size_t) mutable
  487. {
  488. {
  489. [[maybe_unused]] auto t{ std::move(g) };
  490. }
  491. });
  492. });
  493. }
  494. /**
  495. * @brief Asynchronous send data, support multiple data formats,
  496. * see asio::buffer(...) in /asio/buffer.hpp
  497. * You can call this function on the communication thread and anywhere,it's multi thread safed.
  498. * use like this : std::string m; async_send(std::move(m)); can reducing memory allocation.
  499. * PodType * : async_send("abc");
  500. * PodType (&data)[N] : double m[10]; async_send(m);
  501. * std::array<PodType, N> : std::array<int,10> m; async_send(m);
  502. * std::vector<PodType, Allocator> : std::vector<float> m; async_send(m);
  503. * std::basic_string<Elem, Traits, Allocator> : std::string m; async_send(m);
  504. * Callback signature : void() or void(std::size_t bytes_sent)
  505. */
  506. template<class DataT, class Callback>
  507. inline typename std::enable_if_t<is_callable_v<Callback>, void> internal_async_send(
  508. std::shared_ptr<derived_t> this_ptr, DataT&& data, Callback&& fn)
  509. {
  510. derived_t& derive = static_cast<derived_t&>(*this);
  511. ASIO2_ASSERT(derive.io_->running_in_this_thread());
  512. derive.push_event(
  513. [&derive, p = std::move(this_ptr), id = derive.life_id(), fn = std::forward<Callback>(fn),
  514. data = derive._data_persistence(std::forward<DataT>(data))]
  515. (event_queue_guard<derived_t> g) mutable
  516. {
  517. if (!derive.is_started())
  518. {
  519. set_last_error(asio::error::not_connected);
  520. callback_helper::call(fn, 0);
  521. return;
  522. }
  523. if (id != derive.life_id())
  524. {
  525. set_last_error(asio::error::operation_aborted);
  526. callback_helper::call(fn, 0);
  527. return;
  528. }
  529. clear_last_error();
  530. derive._do_send(data, [&fn, g = std::move(g), p = std::move(p)]
  531. (const error_code&, std::size_t bytes_sent) mutable
  532. {
  533. ASIO2_ASSERT(!g.is_empty());
  534. callback_helper::call(fn, bytes_sent);
  535. {
  536. [[maybe_unused]] auto t{ std::move(g) };
  537. }
  538. });
  539. });
  540. }
  541. /**
  542. * @brief Asynchronous send data, support multiple data formats,
  543. * see asio::buffer(...) in /asio/buffer.hpp
  544. * You can call this function on the communication thread and anywhere,it's multi thread safed.
  545. * use like this : std::string m; async_send(std::move(m)); can reducing memory allocation.
  546. * PodType * : async_send("abc");
  547. * PodType (&data)[N] : double m[10]; async_send(m);
  548. * std::array<PodType, N> : std::array<int,10> m; async_send(m);
  549. * std::vector<PodType, Allocator> : std::vector<float> m; async_send(m);
  550. * std::basic_string<Elem, Traits, Allocator> : std::string m; async_send(m);
  551. * Callback signature : void() or void(std::size_t bytes_sent)
  552. */
  553. template<class DataT, class Callback>
  554. inline void internal_async_send(
  555. std::shared_ptr<derived_t> this_ptr, DataT&& data, Callback&& fn, event_queue_guard<derived_t> g)
  556. {
  557. derived_t& derive = static_cast<derived_t&>(*this);
  558. ASIO2_ASSERT(derive.io_->running_in_this_thread());
  559. derive.disp_event(
  560. [&derive, p = std::move(this_ptr), id = derive.life_id(), fn = std::forward<Callback>(fn),
  561. data = derive._data_persistence(std::forward<DataT>(data))]
  562. (event_queue_guard<derived_t> g) mutable
  563. {
  564. if (!derive.is_started())
  565. {
  566. set_last_error(asio::error::not_connected);
  567. fn(std::move(p), asio::error::not_connected, 0, std::move(g));
  568. return;
  569. }
  570. if (id != derive.life_id())
  571. {
  572. set_last_error(asio::error::operation_aborted);
  573. fn(std::move(p), asio::error::operation_aborted, 0, std::move(g));
  574. return;
  575. }
  576. clear_last_error();
  577. derive._do_send(data, [fn = std::move(fn), p = std::move(p), g = std::move(g)]
  578. (const error_code& ec, std::size_t bytes_sent) mutable
  579. {
  580. fn(std::move(p), ec, bytes_sent, std::move(g));
  581. });
  582. }, std::move(g));
  583. }
  584. };
  585. }
  586. #endif // !__ASIO2_SEND_COMPONENT_HPP__