message_reader.hpp 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. //
  2. // Copyright (c) 2019-2024 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_MYSQL_IMPL_INTERNAL_SANSIO_MESSAGE_READER_HPP
  8. #define BOOST_MYSQL_IMPL_INTERNAL_SANSIO_MESSAGE_READER_HPP
  9. #include <boost/mysql/client_errc.hpp>
  10. #include <boost/mysql/error_code.hpp>
  11. #include <boost/mysql/impl/internal/coroutine.hpp>
  12. #include <boost/mysql/impl/internal/protocol/deserialization.hpp>
  13. #include <boost/mysql/impl/internal/protocol/frame_header.hpp>
  14. #include <boost/mysql/impl/internal/sansio/read_buffer.hpp>
  15. #include <boost/assert.hpp>
  16. #include <boost/config.hpp>
  17. #include <cstddef>
  18. #include <cstdint>
  19. namespace boost {
  20. namespace mysql {
  21. namespace detail {
  22. // Flow:
  23. // Prepare a read operation with prepare_read()
  24. // In a loop, until done():
  25. // prepare_buffer() to resize the buffer to an appropriate size
  26. // Read bytes against buffer()
  27. // Call resume with the number of bytes read
  28. // Or call prepare_read() and check done() to attempt to get a cached message
  29. // (further prepare_read calls should use keep_state=true)
  30. class message_reader
  31. {
  32. public:
  33. message_reader(
  34. std::size_t initial_buffer_size,
  35. std::size_t max_buffer_size = static_cast<std::size_t>(-1),
  36. std::size_t max_frame_size = max_packet_size
  37. )
  38. : buffer_(initial_buffer_size, max_buffer_size), max_frame_size_(max_frame_size)
  39. {
  40. }
  41. void reset()
  42. {
  43. buffer_.reset();
  44. state_ = parse_state();
  45. }
  46. std::size_t max_buffer_size() const { return buffer_.max_size(); }
  47. // Prepares a read operation. sequence_number should be kept alive until
  48. // the next read is prepared or no more calls to resume() are expected.
  49. // If keep_state=true, and the op is not complete, parsing state is preserved
  50. void prepare_read(std::uint8_t& sequence_number, bool keep_state = false)
  51. {
  52. if (!keep_state || done())
  53. state_ = parse_state(sequence_number);
  54. else
  55. state_.sequence_number = &sequence_number;
  56. resume(0);
  57. }
  58. // Is parsing the current message done?
  59. bool done() const { return state_.resume_point == -1; }
  60. // Returns any errors generated during parsing. Requires this->done()
  61. error_code error() const
  62. {
  63. BOOST_ASSERT(done());
  64. return state_.ec;
  65. }
  66. // Returns the last parsed message. Valid until prepare_buffer()
  67. // is next called. Requires done() && !error()
  68. span<const std::uint8_t> message() const
  69. {
  70. BOOST_ASSERT(done());
  71. BOOST_ASSERT(!error());
  72. return buffer_.current_message();
  73. }
  74. // Returns buffer space suitable to read bytes to
  75. span<std::uint8_t> buffer() { return buffer_.free_area(); }
  76. // Removes old messages stored in the buffer, and resizes it, if required, to accomodate
  77. // the message currently being parsed.
  78. BOOST_ATTRIBUTE_NODISCARD
  79. error_code prepare_buffer()
  80. {
  81. buffer_.remove_reserved();
  82. auto ec = buffer_.grow_to_fit(state_.required_size);
  83. if (ec)
  84. return ec;
  85. state_.required_size = 0;
  86. return error_code();
  87. }
  88. // The main operation. Call it after reading bytes against buffer(),
  89. // with the number of bytes read
  90. void resume(std::size_t bytes_read)
  91. {
  92. frame_header header{};
  93. buffer_.move_to_pending(bytes_read);
  94. switch (state_.resume_point)
  95. {
  96. case 0:
  97. // Move the previously parsed message to the reserved area, if any
  98. buffer_.move_to_reserved(buffer_.current_message_size());
  99. while (true)
  100. {
  101. // Read the header
  102. set_required_size(frame_header_size);
  103. while (buffer_.pending_size() < frame_header_size)
  104. BOOST_MYSQL_YIELD_VOID(state_.resume_point, 1)
  105. // Mark the header as belonging to the current message
  106. buffer_.move_to_current_message(frame_header_size);
  107. // Deserialize the header
  108. header = deserialize_frame_header(span<const std::uint8_t, frame_header_size>(
  109. buffer_.pending_first() - frame_header_size,
  110. frame_header_size
  111. ));
  112. // Process the sequence number
  113. if (*state_.sequence_number != header.sequence_number)
  114. {
  115. state_.ec = client_errc::sequence_number_mismatch;
  116. state_.resume_point = -1;
  117. return;
  118. }
  119. ++*state_.sequence_number;
  120. // Process the packet size
  121. state_.body_bytes = header.size;
  122. state_.more_frames_follow = (state_.body_bytes == max_frame_size_);
  123. // We are done with the header
  124. if (state_.is_first_frame)
  125. {
  126. // If it's the 1st frame, we can just move the header bytes to the reserved
  127. // area, avoiding a big memmove
  128. buffer_.move_to_reserved(frame_header_size);
  129. }
  130. else
  131. {
  132. buffer_.remove_current_message_last(frame_header_size);
  133. }
  134. state_.is_first_frame = false;
  135. // Read the body
  136. set_required_size(state_.body_bytes);
  137. while (buffer_.pending_size() < state_.body_bytes)
  138. BOOST_MYSQL_YIELD_VOID(state_.resume_point, 2)
  139. buffer_.move_to_current_message(state_.body_bytes);
  140. // Check if we're done
  141. if (!state_.more_frames_follow)
  142. {
  143. state_.resume_point = -1;
  144. return;
  145. }
  146. }
  147. }
  148. }
  149. // Exposed for testing
  150. const read_buffer& internal_buffer() const { return buffer_; }
  151. private:
  152. read_buffer buffer_;
  153. std::size_t max_frame_size_;
  154. struct parse_state
  155. {
  156. int resume_point{0};
  157. std::uint8_t* sequence_number{};
  158. bool is_first_frame{true};
  159. std::size_t body_bytes{0};
  160. bool more_frames_follow{false};
  161. std::size_t required_size{0};
  162. error_code ec;
  163. parse_state() = default;
  164. parse_state(std::uint8_t& seqnum) noexcept : sequence_number(&seqnum) {}
  165. } state_;
  166. void set_required_size(std::size_t required_bytes)
  167. {
  168. if (required_bytes > buffer_.pending_size())
  169. state_.required_size = required_bytes - buffer_.pending_size();
  170. else
  171. state_.required_size = 0;
  172. }
  173. };
  174. } // namespace detail
  175. } // namespace mysql
  176. } // namespace boost
  177. #endif