read_some_rows.hpp 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. //
  2. // Copyright (c) 2019-2024 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_MYSQL_IMPL_INTERNAL_SANSIO_READ_SOME_ROWS_HPP
  8. #define BOOST_MYSQL_IMPL_INTERNAL_SANSIO_READ_SOME_ROWS_HPP
  9. #include <boost/mysql/diagnostics.hpp>
  10. #include <boost/mysql/error_code.hpp>
  11. #include <boost/mysql/field_view.hpp>
  12. #include <boost/mysql/detail/algo_params.hpp>
  13. #include <boost/mysql/detail/execution_processor/execution_processor.hpp>
  14. #include <boost/mysql/impl/internal/coroutine.hpp>
  15. #include <boost/mysql/impl/internal/protocol/deserialization.hpp>
  16. #include <boost/mysql/impl/internal/sansio/connection_state_data.hpp>
  17. #include <cstddef>
  18. namespace boost {
  19. namespace mysql {
  20. namespace detail {
  21. class read_some_rows_algo
  22. {
  23. diagnostics* diag_;
  24. execution_processor* proc_;
  25. output_ref output_;
  26. struct state_t
  27. {
  28. int resume_point{0};
  29. std::size_t rows_read{0};
  30. } state_;
  31. BOOST_ATTRIBUTE_NODISCARD static std::pair<error_code, std::size_t> process_some_rows(
  32. connection_state_data& st,
  33. execution_processor& proc,
  34. output_ref output,
  35. diagnostics& diag
  36. )
  37. {
  38. // Process all read messages until they run out, an error happens
  39. // or an EOF is received
  40. std::size_t read_rows = 0;
  41. error_code err;
  42. proc.on_row_batch_start();
  43. while (true)
  44. {
  45. // Check for errors (like seqnum mismatches)
  46. if (st.reader.error())
  47. return {st.reader.error(), read_rows};
  48. // Get the row message
  49. auto buff = st.reader.message();
  50. // Deserialize it
  51. auto res = deserialize_row_message(buff, st.flavor, diag);
  52. if (res.type == row_message::type_t::error)
  53. {
  54. err = res.data.err;
  55. }
  56. else if (res.type == row_message::type_t::row)
  57. {
  58. output.set_offset(read_rows);
  59. err = proc.on_row(res.data.row, output, st.shared_fields);
  60. if (!err)
  61. ++read_rows;
  62. }
  63. else
  64. {
  65. st.backslash_escapes = res.data.ok_pack.backslash_escapes();
  66. err = proc.on_row_ok_packet(res.data.ok_pack);
  67. }
  68. if (err)
  69. return {err, read_rows};
  70. // TODO: can we make this better?
  71. if (!proc.is_reading_rows() || read_rows >= output.max_size())
  72. break;
  73. // Attempt to parse the next message
  74. st.reader.prepare_read(proc.sequence_number());
  75. if (!st.reader.done())
  76. break;
  77. }
  78. proc.on_row_batch_finish();
  79. return {error_code(), read_rows};
  80. }
  81. public:
  82. read_some_rows_algo(read_some_rows_algo_params params) noexcept
  83. : diag_(params.diag), proc_(params.proc), output_(params.output)
  84. {
  85. }
  86. void reset() { state_ = state_t{}; }
  87. const execution_processor& processor() const { return *proc_; }
  88. execution_processor& processor() { return *proc_; }
  89. next_action resume(connection_state_data& st, error_code ec)
  90. {
  91. if (ec)
  92. return ec;
  93. switch (state_.resume_point)
  94. {
  95. case 0:
  96. // Clear diagnostics
  97. diag_->clear();
  98. // Clear any previous use of shared fields.
  99. // Required for the dynamic version to work.
  100. st.shared_fields.clear();
  101. // If we are not reading rows, return
  102. if (!processor().is_reading_rows())
  103. return next_action();
  104. // Read at least one message. Keep parsing state, in case a previous message
  105. // was parsed partially
  106. BOOST_MYSQL_YIELD(state_.resume_point, 1, st.read(proc_->sequence_number(), true))
  107. // Process messages
  108. std::tie(ec, state_.rows_read) = process_some_rows(st, *proc_, output_, *diag_);
  109. return ec;
  110. }
  111. return next_action();
  112. }
  113. std::size_t result(const connection_state_data&) const { return state_.rows_read; }
  114. };
  115. } // namespace detail
  116. } // namespace mysql
  117. } // namespace boost
  118. #endif