message_reader.hpp 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. //
  2. // Copyright (c) 2019-2023 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BHO_MYSQL_IMPL_INTERNAL_CHANNEL_MESSAGE_READER_HPP
  8. #define BHO_MYSQL_IMPL_INTERNAL_CHANNEL_MESSAGE_READER_HPP
  9. #include <asio2/bho/mysql/client_errc.hpp>
  10. #include <asio2/bho/mysql/error_code.hpp>
  11. #include <asio2/bho/mysql/detail/any_stream.hpp>
  12. #include <asio2/bho/mysql/impl/internal/channel/message_parser.hpp>
  13. #include <asio2/bho/mysql/impl/internal/channel/read_buffer.hpp>
  14. #include <asio2/bho/mysql/impl/internal/channel/valgrind.hpp>
  15. #include <asio2/bho/mysql/impl/internal/protocol/constants.hpp>
  16. #include <asio/async_result.hpp>
  17. #include <asio/buffer.hpp>
  18. #include <asio/compose.hpp>
  19. #include <asio/coroutine.hpp>
  20. #include <asio/post.hpp>
  21. #include <asio2/bho/assert.hpp>
  22. #include <cstddef>
  23. #include <cstdint>
  24. namespace bho {
  25. namespace mysql {
  26. namespace detail {
  27. class message_reader
  28. {
  29. public:
  30. message_reader(std::size_t initial_buffer_size, std::size_t max_frame_size = MAX_PACKET_SIZE)
  31. : buffer_(initial_buffer_size), parser_(max_frame_size)
  32. {
  33. }
  34. bool has_message() const noexcept { return result_.has_message; }
  35. span<const std::uint8_t> get_next_message(std::uint8_t& seqnum, error_code& ec) noexcept
  36. {
  37. {
  38. BHO_ASSERT(has_message());
  39. if (result_.message.has_seqnum_mismatch || seqnum != result_.message.seqnum_first)
  40. {
  41. ec = make_error_code(client_errc::sequence_number_mismatch);
  42. return {};
  43. }
  44. seqnum = result_.message.seqnum_last + 1;
  45. span<const std::uint8_t> res(
  46. buffer_.current_message_first() - result_.message.size,
  47. result_.message.size
  48. );
  49. parse_message();
  50. ec = error_code();
  51. return res;
  52. }
  53. }
  54. // Reads some messages from stream, until there is at least one
  55. // or an error happens. On success, has_message() returns true
  56. // and get_next_message() returns the parsed message.
  57. // May relocate the buffer, modifying buffer_first().
  58. // The reserved area bytes will be removed before the actual read.
  59. void read_some(any_stream& stream, error_code& ec)
  60. {
  61. // If we already have a message, complete immediately
  62. if (has_message())
  63. {
  64. ec = error_code();
  65. return;
  66. }
  67. // Remove processed messages
  68. buffer_.remove_reserved();
  69. while (!has_message())
  70. {
  71. // If any previous process_message indicated that we need more
  72. // buffer space, resize the buffer now
  73. maybe_resize_buffer();
  74. // Actually read bytes
  75. std::size_t bytes_read = stream.read_some(free_area(), ec);
  76. if (ec)
  77. break;
  78. valgrind_make_mem_defined(buffer_.free_first(), bytes_read);
  79. // Process them
  80. on_read_bytes(bytes_read);
  81. }
  82. }
  83. template <ASIO_COMPLETION_TOKEN_FOR(void(::bho::mysql::error_code)) CompletionToken>
  84. ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, void(error_code))
  85. async_read_some(any_stream& stream, CompletionToken&& token);
  86. // Exposed for the sake of testing
  87. read_buffer& buffer() noexcept { return buffer_; }
  88. const read_buffer& buffer() const noexcept { return buffer_; }
  89. private:
  90. struct read_some_op;
  91. read_buffer buffer_;
  92. message_parser parser_;
  93. message_parser::result result_;
  94. void parse_message() { parser_.parse_message(buffer_, result_); }
  95. void maybe_resize_buffer()
  96. {
  97. if (!result_.has_message)
  98. {
  99. buffer_.grow_to_fit(result_.required_size);
  100. }
  101. }
  102. void on_read_bytes(size_t num_bytes)
  103. {
  104. buffer_.move_to_pending(num_bytes);
  105. parse_message();
  106. }
  107. asio::mutable_buffer free_area() noexcept
  108. {
  109. auto res = buffer_.free_area();
  110. return asio::mutable_buffer(res.data(), res.size());
  111. }
  112. };
  113. struct bho::mysql::detail::message_reader::read_some_op : asio::coroutine
  114. {
  115. message_reader& reader_;
  116. any_stream& stream_;
  117. read_some_op(message_reader& reader, any_stream& stream) noexcept : reader_(reader), stream_(stream) {}
  118. template <class Self>
  119. void operator()(Self& self, error_code ec = {}, std::size_t bytes_read = 0)
  120. {
  121. // Error handling
  122. if (ec)
  123. {
  124. self.complete(ec);
  125. return;
  126. }
  127. // Non-error path
  128. ASIO_CORO_REENTER(*this)
  129. {
  130. // If we already have a message, complete immediately
  131. if (reader_.has_message())
  132. {
  133. ASIO_CORO_YIELD asio::post(stream_.get_executor(), std::move(self));
  134. self.complete(error_code());
  135. ASIO_CORO_YIELD break;
  136. }
  137. // Remove processed messages
  138. reader_.buffer_.remove_reserved();
  139. while (!reader_.has_message())
  140. {
  141. // If any previous process_message indicated that we need more
  142. // buffer space, resize the buffer now
  143. reader_.maybe_resize_buffer();
  144. // Actually read bytes
  145. ASIO_CORO_YIELD stream_.async_read_some(reader_.free_area(), std::move(self));
  146. valgrind_make_mem_defined(reader_.buffer_.free_first(), bytes_read);
  147. // Process them
  148. reader_.on_read_bytes(bytes_read);
  149. }
  150. self.complete(error_code());
  151. }
  152. }
  153. };
  154. // Public interface
  155. inline void read_some_messages(any_stream& stream, message_reader& reader, error_code& ec)
  156. {
  157. return reader.read_some(stream, ec);
  158. }
  159. template <class CompletionToken>
  160. ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, void(error_code))
  161. async_read_some_messages(any_stream& stream, message_reader& reader, CompletionToken&& token)
  162. {
  163. return reader.async_read_some(stream, std::forward<CompletionToken>(token));
  164. }
  165. // Equivalent to read_some + get_next_message
  166. inline span<const std::uint8_t> read_one_message(
  167. any_stream& stream,
  168. message_reader& reader,
  169. std::uint8_t& seqnum,
  170. error_code& ec
  171. )
  172. {
  173. read_some_messages(stream, reader, ec);
  174. if (ec)
  175. return {};
  176. else
  177. return reader.get_next_message(seqnum, ec);
  178. }
  179. struct read_one_message_op : asio::coroutine
  180. {
  181. message_reader& reader_;
  182. any_stream& stream_;
  183. std::uint8_t& seqnum_;
  184. read_one_message_op(message_reader& reader, any_stream& stream, std::uint8_t& seqnum)
  185. : reader_(reader), stream_(stream), seqnum_(seqnum)
  186. {
  187. }
  188. template <class Self>
  189. void operator()(Self& self, error_code code = {})
  190. {
  191. // Error handling
  192. if (code)
  193. {
  194. self.complete(code, span<const std::uint8_t>());
  195. return;
  196. }
  197. // Non-error path
  198. ASIO_CORO_REENTER(*this)
  199. {
  200. ASIO_CORO_YIELD reader_.async_read_some(stream_, std::move(self));
  201. {
  202. auto b = reader_.get_next_message(seqnum_, code);
  203. self.complete(code, b);
  204. }
  205. }
  206. }
  207. };
  208. template <class CompletionToken>
  209. ASIO_INITFN_AUTO_RESULT_TYPE(
  210. CompletionToken,
  211. void(bho::mysql::error_code, ::bho::span<const std::uint8_t>)
  212. )
  213. async_read_one_message(
  214. any_stream& stream,
  215. message_reader& reader,
  216. std::uint8_t& seqnum,
  217. CompletionToken&& token
  218. )
  219. {
  220. return asio::async_compose<CompletionToken, void(error_code, span<const std::uint8_t>)>(
  221. read_one_message_op(reader, stream, seqnum),
  222. token,
  223. stream
  224. );
  225. }
  226. } // namespace detail
  227. } // namespace mysql
  228. } // namespace bho
  229. template <ASIO_COMPLETION_TOKEN_FOR(void(::bho::mysql::error_code)) CompletionToken>
  230. ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, void(::bho::mysql::error_code))
  231. bho::mysql::detail::message_reader::async_read_some(any_stream& stream, CompletionToken&& token)
  232. {
  233. return asio::async_compose<CompletionToken, void(error_code)>(
  234. read_some_op{*this, stream},
  235. token,
  236. stream
  237. );
  238. }
  239. #endif /* INCLUDE_BHO_MYSQL_DETAIL_AUXILIAR_STATIC_STRING_HPP_ */