http_recv_op.hpp 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. */
  10. #ifndef __ASIO2_HTTP_RECV_OP_HPP__
  11. #define __ASIO2_HTTP_RECV_OP_HPP__
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. #pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <memory>
  16. #include <future>
  17. #include <utility>
  18. #include <string_view>
  19. #include <asio2/external/asio.hpp>
  20. #include <asio2/external/beast.hpp>
  21. #include <asio2/base/error.hpp>
  22. #include <asio2/http/detail/http_util.hpp>
  23. namespace asio2::detail
  24. {
  25. template<class derived_t, class args_t>
  26. class http_recv_op
  27. {
  28. public:
  29. using body_type = typename args_t::body_t;
  30. using buffer_type = typename args_t::buffer_t;
  31. /**
  32. * @brief constructor
  33. */
  34. http_recv_op() noexcept {}
  35. /**
  36. * @brief destructor
  37. */
  38. ~http_recv_op() = default;
  39. protected:
  40. template<typename C>
  41. void _http_session_post_recv(std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  42. {
  43. derived_t& derive = static_cast<derived_t&>(*this);
  44. if (derive.is_http())
  45. {
  46. // Make the request empty before reading,
  47. // otherwise the operation behavior is undefined.
  48. derive.req_.reset();
  49. #if defined(_DEBUG) || defined(DEBUG)
  50. ASIO2_ASSERT(derive.post_recv_counter_.load() == 0);
  51. derive.post_recv_counter_++;
  52. #endif
  53. ASIO2_ASSERT(derive.reading_ == false);
  54. derive.reading_ = true;
  55. // Read a request
  56. http::async_read(derive.stream(), derive.buffer().base(), derive.req_,
  57. make_allocator(derive.rallocator(),
  58. [&derive, this_ptr = std::move(this_ptr), ecs = std::move(ecs)]
  59. (const error_code & ec, std::size_t bytes_recvd) mutable
  60. {
  61. #if defined(_DEBUG) || defined(DEBUG)
  62. derive.post_recv_counter_--;
  63. #endif
  64. derive.reading_ = false;
  65. derive._handle_recv(ec, bytes_recvd, std::move(this_ptr), std::move(ecs));
  66. }));
  67. }
  68. else
  69. {
  70. #if defined(_DEBUG) || defined(DEBUG)
  71. ASIO2_ASSERT(derive.post_recv_counter_.load() == 0);
  72. derive.post_recv_counter_++;
  73. #endif
  74. ASIO2_ASSERT(derive.reading_ == false);
  75. derive.reading_ = true;
  76. // Read a message into our buffer
  77. derive.ws_stream().async_read(derive.buffer().base(),
  78. make_allocator(derive.rallocator(),
  79. [&derive, this_ptr = std::move(this_ptr), ecs = std::move(ecs)]
  80. (const error_code & ec, std::size_t bytes_recvd) mutable
  81. {
  82. #if defined(_DEBUG) || defined(DEBUG)
  83. derive.post_recv_counter_--;
  84. #endif
  85. derive.reading_ = false;
  86. derive._handle_recv(ec, bytes_recvd, std::move(this_ptr), std::move(ecs));
  87. }));
  88. }
  89. }
  90. template<typename C>
  91. void _http_client_post_recv(std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  92. {
  93. derived_t& derive = static_cast<derived_t&>(*this);
  94. // Make the request empty before reading,
  95. // otherwise the operation behavior is undefined.
  96. derive.rep_.reset();
  97. #if defined(_DEBUG) || defined(DEBUG)
  98. ASIO2_ASSERT(derive.post_recv_counter_.load() == 0);
  99. derive.post_recv_counter_++;
  100. #endif
  101. ASIO2_ASSERT(derive.reading_ == false);
  102. derive.reading_ = true;
  103. // Receive the HTTP response
  104. http::async_read(derive.stream(), derive.buffer().base(), derive.rep_,
  105. make_allocator(derive.rallocator(),
  106. [&derive, this_ptr = std::move(this_ptr), ecs = std::move(ecs)]
  107. (const error_code & ec, std::size_t bytes_recvd) mutable
  108. {
  109. #if defined(_DEBUG) || defined(DEBUG)
  110. derive.post_recv_counter_--;
  111. #endif
  112. derive.reading_ = false;
  113. derive._handle_recv(ec, bytes_recvd, std::move(this_ptr), std::move(ecs));
  114. }));
  115. }
  116. template<typename C>
  117. void _http_post_recv(std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  118. {
  119. derived_t& derive = static_cast<derived_t&>(*this);
  120. ASIO2_ASSERT(derive.io_->running_in_this_thread());
  121. if (!derive.is_started())
  122. {
  123. if (derive.state_ == state_t::started)
  124. {
  125. derive._do_disconnect(asio2::get_last_error(), std::move(this_ptr));
  126. }
  127. return;
  128. }
  129. if constexpr (args_t::is_session)
  130. {
  131. derive._http_session_post_recv(std::move(this_ptr), std::move(ecs));
  132. }
  133. else
  134. {
  135. derive._http_client_post_recv(std::move(this_ptr), std::move(ecs));
  136. }
  137. }
  138. template<typename C>
  139. void _http_session_handle_recv(
  140. const error_code& ec, std::size_t bytes_recvd,
  141. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  142. {
  143. detail::ignore_unused(ec, bytes_recvd);
  144. derived_t& derive = static_cast<derived_t&>(*this);
  145. if (derive.is_http())
  146. {
  147. derive.req_.url_.reset(derive.req_.target());
  148. derive.rep_.result(http::status::unknown);
  149. derive.rep_.keep_alive(derive.req_.keep_alive());
  150. if (derive._check_upgrade(this_ptr, ecs))
  151. return;
  152. derive._fire_recv(this_ptr, ecs);
  153. // note : can't read write the variable of "req_" after _fire_recv, it maybe
  154. // cause crash, eg :
  155. // user called response.defer() in the recv callback, and pass the defer_ptr
  156. // into another thread, then code run to here, at this time, the "req_" maybe
  157. // read write in two thread : this thread and "another thread".
  158. // note : can't call "_do_disconnect" at here, beacuse if user has called
  159. // response.defer() in the recv callback, this session maybe closed before
  160. // the response is sent to the client.
  161. //if (derive.req_.need_eof() || !derive.req_.keep_alive())
  162. //{
  163. // derive._do_disconnect(asio::error::operation_aborted, derive.selfptr());
  164. // return;
  165. //}
  166. }
  167. else
  168. {
  169. derive.req_.ws_frame_type_ = websocket::frame::message;
  170. derive.req_.ws_frame_data_ = { reinterpret_cast<std::string_view::const_pointer>(
  171. derive.buffer().data().data()), bytes_recvd };
  172. derive._fire_recv(this_ptr, ecs);
  173. derive.buffer().consume(derive.buffer().size());
  174. derive._post_recv(std::move(this_ptr), std::move(ecs));
  175. }
  176. }
  177. template<typename C>
  178. void _http_client_handle_recv(
  179. const error_code& ec, std::size_t bytes_recvd,
  180. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  181. {
  182. detail::ignore_unused(ec, bytes_recvd);
  183. derived_t& derive = static_cast<derived_t&>(*this);
  184. derive._fire_recv(this_ptr, ecs);
  185. derive._post_recv(std::move(this_ptr), std::move(ecs));
  186. }
  187. template<typename C>
  188. void _http_handle_recv(
  189. const error_code& ec, std::size_t bytes_recvd,
  190. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  191. {
  192. derived_t& derive = static_cast<derived_t&>(*this);
  193. ASIO2_ASSERT(derive.io_->running_in_this_thread());
  194. set_last_error(ec);
  195. if (!derive.is_started())
  196. {
  197. if (derive.state_ == state_t::started)
  198. {
  199. ASIO2_LOG_INFOR("_http_handle_recv with closed socket: {} {}", ec.value(), ec.message());
  200. derive._do_disconnect(ec, this_ptr);
  201. }
  202. derive._stop_readend_timer(std::move(this_ptr));
  203. return;
  204. }
  205. if (!ec)
  206. {
  207. // every times recv data,we update the last alive time.
  208. derive.update_alive_time();
  209. if constexpr (args_t::is_session)
  210. {
  211. derive._http_session_handle_recv(ec, bytes_recvd, std::move(this_ptr), std::move(ecs));
  212. }
  213. else
  214. {
  215. derive._http_client_handle_recv(ec, bytes_recvd, std::move(this_ptr), std::move(ecs));
  216. }
  217. }
  218. else
  219. {
  220. ASIO2_LOG_DEBUG("_http_handle_recv with error: {} {}", ec.value(), ec.message());
  221. // This means they closed the connection
  222. //if (ec == http::error::end_of_stream)
  223. derive._do_disconnect(ec, this_ptr);
  224. derive._stop_readend_timer(std::move(this_ptr));
  225. }
  226. // If an error occurs then no new asynchronous operations are started. This
  227. // means that all shared_ptr references to the connection object will
  228. // disappear and the object will be destroyed automatically after this
  229. // handler returns. The connection class's destructor closes the socket.
  230. }
  231. protected:
  232. };
  233. }
  234. #endif // !__ASIO2_HTTP_RECV_OP_HPP__