message_writer.hpp 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. //
  2. // Copyright (c) 2019-2023 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BHO_MYSQL_IMPL_INTERNAL_CHANNEL_MESSAGE_WRITER_HPP
  8. #define BHO_MYSQL_IMPL_INTERNAL_CHANNEL_MESSAGE_WRITER_HPP
  9. #include <asio2/bho/mysql/impl/internal/protocol/constants.hpp>
  10. #include <asio2/bho/mysql/impl/internal/protocol/protocol.hpp>
  11. #include <array>
  12. #include <cstddef>
  13. #include <cstdint>
  14. namespace bho {
  15. namespace mysql {
  16. namespace detail {
  17. class chunk_processor
  18. {
  19. std::size_t first_{};
  20. std::size_t last_{};
  21. std::size_t remaining() const noexcept { return last_ - first_; }
  22. public:
  23. chunk_processor() = default;
  24. void reset() noexcept { reset(0, 0); }
  25. void reset(std::size_t first, std::size_t last) noexcept
  26. {
  27. BHO_ASSERT(last >= first);
  28. first_ = first;
  29. last_ = last;
  30. }
  31. void on_bytes_written(std::size_t n) noexcept
  32. {
  33. BHO_ASSERT(remaining() >= n);
  34. first_ += n;
  35. }
  36. bool done() const noexcept { return first_ == last_; }
  37. span<const std::uint8_t> get_chunk(const std::vector<std::uint8_t>& buff) const noexcept
  38. {
  39. BHO_ASSERT(buff.size() >= last_);
  40. return {buff.data() + first_, remaining()};
  41. }
  42. };
  43. class message_writer
  44. {
  45. std::vector<std::uint8_t> buffer_;
  46. std::size_t max_frame_size_;
  47. std::uint8_t* seqnum_{nullptr};
  48. chunk_processor chunk_;
  49. std::size_t total_bytes_{};
  50. std::size_t total_bytes_written_{};
  51. bool should_send_empty_frame_{};
  52. void process_header_write(std::uint32_t size_to_write, std::uint8_t seqnum, std::size_t buff_offset)
  53. {
  54. serialize_frame_header(
  55. frame_header{size_to_write, seqnum},
  56. span<std::uint8_t, frame_header_size>(buffer_.data() + buff_offset, frame_header_size)
  57. );
  58. }
  59. std::uint8_t next_seqnum() noexcept { return (*seqnum_)++; }
  60. void prepare_next_chunk()
  61. {
  62. if (should_send_empty_frame_)
  63. {
  64. process_header_write(0, next_seqnum(), 0);
  65. chunk_.reset(0, HEADER_SIZE);
  66. should_send_empty_frame_ = false;
  67. }
  68. else if (total_bytes_written_ < total_bytes_)
  69. {
  70. std::size_t offset = total_bytes_written_;
  71. std::size_t remaining = total_bytes_ - total_bytes_written_;
  72. std::size_t size = (std::min)(max_frame_size_, remaining);
  73. process_header_write(static_cast<std::uint32_t>(size), next_seqnum(), offset);
  74. chunk_.reset(offset, offset + size + HEADER_SIZE);
  75. if (remaining == max_frame_size_)
  76. {
  77. should_send_empty_frame_ = true;
  78. }
  79. total_bytes_written_ += size;
  80. }
  81. else
  82. {
  83. // We're done
  84. chunk_.reset();
  85. }
  86. }
  87. public:
  88. message_writer(std::size_t max_frame_size = MAX_PACKET_SIZE) noexcept : max_frame_size_(max_frame_size) {}
  89. span<std::uint8_t> prepare_buffer(std::size_t msg_size, std::uint8_t& seqnum)
  90. {
  91. buffer_.resize(msg_size + HEADER_SIZE);
  92. total_bytes_ = msg_size;
  93. total_bytes_written_ = 0;
  94. should_send_empty_frame_ = msg_size == 0;
  95. seqnum_ = &seqnum;
  96. prepare_next_chunk();
  97. return {buffer_.data() + HEADER_SIZE, msg_size};
  98. }
  99. bool done() const noexcept { return chunk_.done(); }
  100. // This function returns an empty buffer to signal that we're done
  101. span<const std::uint8_t> next_chunk() const
  102. {
  103. BHO_ASSERT(!done());
  104. return chunk_.get_chunk(buffer_);
  105. }
  106. void on_bytes_written(std::size_t n)
  107. {
  108. BHO_ASSERT(!done());
  109. // Acknowledge the written bytes
  110. chunk_.on_bytes_written(n);
  111. // Prepare the next chunk, if required
  112. if (chunk_.done())
  113. {
  114. prepare_next_chunk();
  115. }
  116. }
  117. };
  118. } // namespace detail
  119. } // namespace mysql
  120. } // namespace bho
  121. #endif