123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282 |
- //
- // Copyright (c) 2019-2023 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
- //
- // Distributed under the Boost Software License, Version 1.0. (See accompanying
- // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
- //
- #ifndef BHO_MYSQL_IMPL_INTERNAL_CHANNEL_MESSAGE_READER_HPP
- #define BHO_MYSQL_IMPL_INTERNAL_CHANNEL_MESSAGE_READER_HPP
- #include <asio2/bho/mysql/client_errc.hpp>
- #include <asio2/bho/mysql/error_code.hpp>
- #include <asio2/bho/mysql/detail/any_stream.hpp>
- #include <asio2/bho/mysql/impl/internal/channel/message_parser.hpp>
- #include <asio2/bho/mysql/impl/internal/channel/read_buffer.hpp>
- #include <asio2/bho/mysql/impl/internal/channel/valgrind.hpp>
- #include <asio2/bho/mysql/impl/internal/protocol/constants.hpp>
- #include <asio/async_result.hpp>
- #include <asio/buffer.hpp>
- #include <asio/compose.hpp>
- #include <asio/coroutine.hpp>
- #include <asio/post.hpp>
- #include <asio2/bho/assert.hpp>
- #include <cstddef>
- #include <cstdint>
- namespace bho {
- namespace mysql {
- namespace detail {
- class message_reader
- {
- public:
- message_reader(std::size_t initial_buffer_size, std::size_t max_frame_size = MAX_PACKET_SIZE)
- : buffer_(initial_buffer_size), parser_(max_frame_size)
- {
- }
- bool has_message() const noexcept { return result_.has_message; }
- span<const std::uint8_t> get_next_message(std::uint8_t& seqnum, error_code& ec) noexcept
- {
- {
- BHO_ASSERT(has_message());
- if (result_.message.has_seqnum_mismatch || seqnum != result_.message.seqnum_first)
- {
- ec = make_error_code(client_errc::sequence_number_mismatch);
- return {};
- }
- seqnum = result_.message.seqnum_last + 1;
- span<const std::uint8_t> res(
- buffer_.current_message_first() - result_.message.size,
- result_.message.size
- );
- parse_message();
- ec = error_code();
- return res;
- }
- }
- // Reads some messages from stream, until there is at least one
- // or an error happens. On success, has_message() returns true
- // and get_next_message() returns the parsed message.
- // May relocate the buffer, modifying buffer_first().
- // The reserved area bytes will be removed before the actual read.
- void read_some(any_stream& stream, error_code& ec)
- {
- // If we already have a message, complete immediately
- if (has_message())
- {
- ec = error_code();
- return;
- }
- // Remove processed messages
- buffer_.remove_reserved();
- while (!has_message())
- {
- // If any previous process_message indicated that we need more
- // buffer space, resize the buffer now
- maybe_resize_buffer();
- // Actually read bytes
- std::size_t bytes_read = stream.read_some(free_area(), ec);
- if (ec)
- break;
- valgrind_make_mem_defined(buffer_.free_first(), bytes_read);
- // Process them
- on_read_bytes(bytes_read);
- }
- }
- template <ASIO_COMPLETION_TOKEN_FOR(void(::bho::mysql::error_code)) CompletionToken>
- ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, void(error_code))
- async_read_some(any_stream& stream, CompletionToken&& token);
- // Exposed for the sake of testing
- read_buffer& buffer() noexcept { return buffer_; }
- const read_buffer& buffer() const noexcept { return buffer_; }
- private:
- struct read_some_op;
- read_buffer buffer_;
- message_parser parser_;
- message_parser::result result_;
- void parse_message() { parser_.parse_message(buffer_, result_); }
- void maybe_resize_buffer()
- {
- if (!result_.has_message)
- {
- buffer_.grow_to_fit(result_.required_size);
- }
- }
- void on_read_bytes(size_t num_bytes)
- {
- buffer_.move_to_pending(num_bytes);
- parse_message();
- }
- asio::mutable_buffer free_area() noexcept
- {
- auto res = buffer_.free_area();
- return asio::mutable_buffer(res.data(), res.size());
- }
- };
- struct bho::mysql::detail::message_reader::read_some_op : asio::coroutine
- {
- message_reader& reader_;
- any_stream& stream_;
- read_some_op(message_reader& reader, any_stream& stream) noexcept : reader_(reader), stream_(stream) {}
- template <class Self>
- void operator()(Self& self, error_code ec = {}, std::size_t bytes_read = 0)
- {
- // Error handling
- if (ec)
- {
- self.complete(ec);
- return;
- }
- // Non-error path
- ASIO_CORO_REENTER(*this)
- {
- // If we already have a message, complete immediately
- if (reader_.has_message())
- {
- ASIO_CORO_YIELD asio::post(stream_.get_executor(), std::move(self));
- self.complete(error_code());
- ASIO_CORO_YIELD break;
- }
- // Remove processed messages
- reader_.buffer_.remove_reserved();
- while (!reader_.has_message())
- {
- // If any previous process_message indicated that we need more
- // buffer space, resize the buffer now
- reader_.maybe_resize_buffer();
- // Actually read bytes
- ASIO_CORO_YIELD stream_.async_read_some(reader_.free_area(), std::move(self));
- valgrind_make_mem_defined(reader_.buffer_.free_first(), bytes_read);
- // Process them
- reader_.on_read_bytes(bytes_read);
- }
- self.complete(error_code());
- }
- }
- };
- // Public interface
- inline void read_some_messages(any_stream& stream, message_reader& reader, error_code& ec)
- {
- return reader.read_some(stream, ec);
- }
- template <class CompletionToken>
- ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, void(error_code))
- async_read_some_messages(any_stream& stream, message_reader& reader, CompletionToken&& token)
- {
- return reader.async_read_some(stream, std::forward<CompletionToken>(token));
- }
- // Equivalent to read_some + get_next_message
- inline span<const std::uint8_t> read_one_message(
- any_stream& stream,
- message_reader& reader,
- std::uint8_t& seqnum,
- error_code& ec
- )
- {
- read_some_messages(stream, reader, ec);
- if (ec)
- return {};
- else
- return reader.get_next_message(seqnum, ec);
- }
- struct read_one_message_op : asio::coroutine
- {
- message_reader& reader_;
- any_stream& stream_;
- std::uint8_t& seqnum_;
- read_one_message_op(message_reader& reader, any_stream& stream, std::uint8_t& seqnum)
- : reader_(reader), stream_(stream), seqnum_(seqnum)
- {
- }
- template <class Self>
- void operator()(Self& self, error_code code = {})
- {
- // Error handling
- if (code)
- {
- self.complete(code, span<const std::uint8_t>());
- return;
- }
- // Non-error path
- ASIO_CORO_REENTER(*this)
- {
- ASIO_CORO_YIELD reader_.async_read_some(stream_, std::move(self));
- {
- auto b = reader_.get_next_message(seqnum_, code);
- self.complete(code, b);
- }
- }
- }
- };
- template <class CompletionToken>
- ASIO_INITFN_AUTO_RESULT_TYPE(
- CompletionToken,
- void(bho::mysql::error_code, ::bho::span<const std::uint8_t>)
- )
- async_read_one_message(
- any_stream& stream,
- message_reader& reader,
- std::uint8_t& seqnum,
- CompletionToken&& token
- )
- {
- return asio::async_compose<CompletionToken, void(error_code, span<const std::uint8_t>)>(
- read_one_message_op(reader, stream, seqnum),
- token,
- stream
- );
- }
- } // namespace detail
- } // namespace mysql
- } // namespace bho
- template <ASIO_COMPLETION_TOKEN_FOR(void(::bho::mysql::error_code)) CompletionToken>
- ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, void(::bho::mysql::error_code))
- bho::mysql::detail::message_reader::async_read_some(any_stream& stream, CompletionToken&& token)
- {
- return asio::async_compose<CompletionToken, void(error_code)>(
- read_some_op{*this, stream},
- token,
- stream
- );
- }
- #endif /* INCLUDE_BHO_MYSQL_DETAIL_AUXILIAR_STATIC_STRING_HPP_ */
|