tcp_recv_op.hpp 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. */
  10. #ifndef __ASIO2_TCP_RECV_OP_HPP__
  11. #define __ASIO2_TCP_RECV_OP_HPP__
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. #pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <memory>
  16. #include <future>
  17. #include <utility>
  18. #include <string_view>
  19. #include <asio2/base/error.hpp>
  20. #include <asio2/base/detail/ecs.hpp>
  21. namespace asio2::detail
  22. {
  23. template<class derived_t, class args_t>
  24. class tcp_recv_op
  25. {
  26. protected:
  27. template<class, class = std::void_t<>>
  28. struct has_member_dgram : std::false_type {};
  29. template<class T>
  30. struct has_member_dgram<T, std::void_t<decltype(T::dgram_)>> : std::true_type {};
  31. public:
  32. /**
  33. * @brief constructor
  34. */
  35. tcp_recv_op() noexcept {}
  36. /**
  37. * @brief destructor
  38. */
  39. ~tcp_recv_op() = default;
  40. protected:
  41. template<typename C>
  42. void _tcp_post_recv(std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  43. {
  44. using condition_lowest_type = typename ecs_t<C>::condition_lowest_type;
  45. derived_t& derive = static_cast<derived_t&>(*this);
  46. ASIO2_ASSERT(derive.io_->running_in_this_thread());
  47. if (!derive.is_started())
  48. {
  49. if (derive.state_ == state_t::started)
  50. {
  51. derive._do_disconnect(asio2::get_last_error(), std::move(this_ptr));
  52. }
  53. return;
  54. }
  55. #if defined(_DEBUG) || defined(DEBUG)
  56. ASIO2_ASSERT(derive.post_recv_counter_.load() == 0);
  57. derive.post_recv_counter_++;
  58. #endif
  59. ASIO2_ASSERT(derive.reading_ == false);
  60. derive.reading_ = true;
  61. ecs_t<C>& e = *ecs;
  62. if constexpr (
  63. std::is_same_v<condition_lowest_type, asio::detail::transfer_all_t> ||
  64. std::is_same_v<condition_lowest_type, asio::detail::transfer_at_least_t> ||
  65. std::is_same_v<condition_lowest_type, asio::detail::transfer_exactly_t> ||
  66. std::is_same_v<condition_lowest_type, asio2::detail::hook_buffer_t>)
  67. {
  68. asio::async_read(derive.stream(), derive.buffer().base(), e.get_condition().lowest(),
  69. make_allocator(derive.rallocator(),
  70. [&derive, this_ptr = std::move(this_ptr), ecs = std::move(ecs)]
  71. (const error_code& ec, std::size_t bytes_recvd) mutable
  72. {
  73. #if defined(_DEBUG) || defined(DEBUG)
  74. derive.post_recv_counter_--;
  75. #endif
  76. derive.reading_ = false;
  77. derive._handle_recv(ec, bytes_recvd, std::move(this_ptr), std::move(ecs));
  78. }));
  79. }
  80. else
  81. {
  82. asio::async_read_until(derive.stream(), derive.buffer().base(), e.get_condition().lowest(),
  83. make_allocator(derive.rallocator(),
  84. [&derive, this_ptr = std::move(this_ptr), ecs = std::move(ecs)]
  85. (const error_code& ec, std::size_t bytes_recvd) mutable
  86. {
  87. #if defined(_DEBUG) || defined(DEBUG)
  88. derive.post_recv_counter_--;
  89. #endif
  90. derive.reading_ = false;
  91. derive._handle_recv(ec, bytes_recvd, std::move(this_ptr), std::move(ecs));
  92. }));
  93. }
  94. }
  95. template<typename C>
  96. void _tcp_dgram_fire_recv(
  97. const error_code& ec, std::size_t bytes_recvd,
  98. std::shared_ptr<derived_t>& this_ptr, std::shared_ptr<ecs_t<C>>& ecs)
  99. {
  100. detail::ignore_unused(ec);
  101. derived_t& derive = static_cast<derived_t&>(*this);
  102. const std::uint8_t* buffer = static_cast<const std::uint8_t*>(derive.buffer().data().data());
  103. if /**/ (std::uint8_t(buffer[0]) < std::uint8_t(254))
  104. {
  105. derive._fire_recv(this_ptr, ecs, std::string_view(reinterpret_cast<
  106. std::string_view::const_pointer>(buffer + 1), bytes_recvd - 1));
  107. }
  108. else if (std::uint8_t(buffer[0]) == std::uint8_t(254))
  109. {
  110. derive._fire_recv(this_ptr, ecs, std::string_view(reinterpret_cast<
  111. std::string_view::const_pointer>(buffer + 1 + 2), bytes_recvd - 1 - 2));
  112. }
  113. else
  114. {
  115. ASIO2_ASSERT(std::uint8_t(buffer[0]) == std::uint8_t(255));
  116. derive._fire_recv(this_ptr, ecs, std::string_view(reinterpret_cast<
  117. std::string_view::const_pointer>(buffer + 1 + 8), bytes_recvd - 1 - 8));
  118. }
  119. }
  120. template<typename C>
  121. void _tcp_handle_recv(
  122. const error_code& ec, std::size_t bytes_recvd,
  123. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  124. {
  125. using condition_lowest_type = typename ecs_t<C>::condition_lowest_type;
  126. derived_t& derive = static_cast<derived_t&>(*this);
  127. ASIO2_ASSERT(derive.io_->running_in_this_thread());
  128. set_last_error(ec);
  129. // bytes_recvd : The number of bytes in the streambuf's get area up to and including the delimiter.
  130. // After testing, it is found that even if the "ec" is 0, the socket may have been closed already,
  131. // the stop function of the base class has been called already, and the member variable resources
  132. // have been destroyed already, so we need check the "derive.is_started()" to ensure that the user
  133. // maybe read write the member variable resources in the recv callback, and it will cause crash.
  134. // the code can't be like this:
  135. //if (!ec && derive.is_started())
  136. //{
  137. // _fire_recv(...);
  138. // _post_recv(...);
  139. //}
  140. //else
  141. //{
  142. // // after call client.stop, and user call client.start again, when code run to here, the state
  143. // // maybe changed to starting by the client.start, if we call _do_disconnect at here, the state
  144. // // is changed to stopping again, this will break the client.start processing.
  145. // _do_disconnect(...);
  146. //}
  147. if (!derive.is_started())
  148. {
  149. if (derive.state_ == state_t::started)
  150. {
  151. ASIO2_LOG_INFOR("_tcp_handle_recv with closed socket: {} {}", ec.value(), ec.message());
  152. derive._do_disconnect(ec, this_ptr);
  153. }
  154. derive._stop_readend_timer(std::move(this_ptr));
  155. return;
  156. }
  157. if (!ec)
  158. {
  159. // every times recv data,we update the last alive time.
  160. derive.update_alive_time();
  161. if constexpr (std::is_same_v<condition_lowest_type, use_dgram_t>)
  162. {
  163. if constexpr (has_member_dgram<derived_t>::value)
  164. {
  165. if (bytes_recvd == 0)
  166. {
  167. derive._do_disconnect(asio::error::no_data, this_ptr);
  168. derive._stop_readend_timer(std::move(this_ptr));
  169. return;
  170. }
  171. }
  172. else
  173. {
  174. ASIO2_ASSERT(false);
  175. }
  176. derive._tcp_dgram_fire_recv(ec, bytes_recvd, this_ptr, ecs);
  177. }
  178. else
  179. {
  180. if constexpr (!std::is_same_v<condition_lowest_type, asio2::detail::hook_buffer_t>)
  181. {
  182. derive._fire_recv(this_ptr, ecs, std::string_view(reinterpret_cast<
  183. std::string_view::const_pointer>(derive.buffer().data().data()), bytes_recvd));
  184. }
  185. else
  186. {
  187. derive._fire_recv(this_ptr, ecs, std::string_view(reinterpret_cast<
  188. std::string_view::const_pointer>(derive.buffer().data().data()),
  189. derive.buffer().size()));
  190. }
  191. }
  192. if constexpr (!std::is_same_v<condition_lowest_type, asio2::detail::hook_buffer_t>)
  193. {
  194. derive.buffer().consume(bytes_recvd);
  195. }
  196. else
  197. {
  198. std::ignore = true;
  199. }
  200. derive._post_recv(std::move(this_ptr), std::move(ecs));
  201. }
  202. else
  203. {
  204. ASIO2_LOG_DEBUG("_tcp_handle_recv with error: {} {}", ec.value(), ec.message());
  205. if (ec == asio::error::eof)
  206. {
  207. // /beast/http/impl/read.hpp
  208. //if (ec == net::error::eof)
  209. //{
  210. // BHO_ASSERT(bytes_transferred == 0);
  211. // ...
  212. //}
  213. ASIO2_ASSERT(bytes_recvd == 0);
  214. if (bytes_recvd)
  215. {
  216. // http://www.purecpp.cn/detail?id=2303
  217. ASIO2_LOG_INFOR("_tcp_handle_recv with eof: {}", bytes_recvd);
  218. }
  219. }
  220. derive._do_disconnect(ec, this_ptr);
  221. derive._stop_readend_timer(std::move(this_ptr));
  222. }
  223. // If an error occurs then no new asynchronous operations are started. This
  224. // means that all shared_ptr references to the connection object will
  225. // disappear and the object will be destroyed automatically after this
  226. // handler returns. The connection class's destructor closes the socket.
  227. }
  228. protected:
  229. };
  230. }
  231. #endif // !__ASIO2_TCP_RECV_OP_HPP__