/*
* Copyright (c) 2017-2023 zhllxt
*
* author : zhllxt
* email : 37792738@qq.com
*
* https://github.com/mcxiaoke/mqtt
* https://github.com/eclipse/paho.mqtt.c
* https://github.com/eclipse/paho.mqtt.cpp
*
* https://github.com/mqtt/mqtt.org/wiki/libraries
*
* Distributed under the Boost Software License, Version 1.0. (See accompanying
* file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*/
#ifndef __ASIO2_MQTT_CORE_HPP__
#define __ASIO2_MQTT_CORE_HPP__
#if defined(_MSC_VER) && (_MSC_VER >= 1200)
#pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
#include <iosfwd>
#include <cstdint>
#include <cstddef>
#include <memory>
#include <chrono>
#include <functional>
#include <atomic>
#include <string>
#include <string_view>
#include <vector>
#include <optional>
#include <array>
#include <variant>
#include <tuple>
#include <type_traits>
#include <asio2/external/predef.h>
#ifdef ASIO2_HEADER_ONLY
#include <asio2/bho/beast/websocket/detail/utf8_checker.hpp>
#else
#include <boost/beast/websocket/detail/utf8_checker.hpp>
#endif
#include <asio2/base/detail/util.hpp>
#include <asio2/mqtt/detail/mqtt_topic_util.hpp>
#include <asio2/mqtt/error.hpp>
/*
* Server Broker Port Websocket
* ------------------------------------------------------------------
* iot.eclipse.org Mosquitto 1883/8883 n/a
* broker.hivemq.com HiveMQ 1883 8000 **
* test.mosquitto.org Mosquitto 1883/8883/8884 8080/8081
* test.mosca.io mosca 1883 80
* broker.mqttdashboard.com HiveMQ 1883
*
*/
//namespace asio2::detail
//{
// template <class> class mqtt_handler_t;
// template <class> class mqtt_invoker_t;
// template <class> class mqtt_aop_connect;
// template <class> class mqtt_aop_publish;
//}
namespace asio2::mqtt
{
static constexpr unsigned int max_payload = 268435455u;
enum class version : std::uint8_t
{
// mqtt version 3.1 , The protocol version of the mqtt 3.1 CONNECT message is 0x03
v3 = 3,
// mqtt version 3.1.1, The protocol version of the mqtt 3.1.1 CONNECT message is 0x04
v4 = 4,
// mqtt version 5.0 , The protocol version of the mqtt 5.0 CONNECT message is 0x05
v5 = 5
};
/**
* MQTT Control Packet type
*
* http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718021
* https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901022
*/
enum class control_packet_type : std::uint8_t
{
// [Forbidden]
// Reserved
reserved = 0,
// [Client to Server]
// Client request to connect to Server
connect = 1,
// [Server to Client]
// Connect acknowledgment
connack = 2,
// [Client to Server] or [Server to Client]
// Publish message
publish = 3,
// [Client to Server] or [Server to Client]
// Publish acknowledgment
puback = 4,
// [Client to Server] or [Server to Client]
// Publish received (assured delivery part 1)
pubrec = 5,
// [Client to Server] or [Server to Client]
// Publish release (assured delivery part 2)
pubrel = 6,
// [Client to Server] or [Server to Client]
// Publish complete (assured delivery part 3)
pubcomp = 7,
// [Client to Server]
// Client subscribe request
subscribe = 8,
// [Server to Client]
// Subscribe acknowledgment
suback = 9,
// [Client to Server]
// Unsubscribe request
unsubscribe = 10,
// [Server to Client]
// Unsubscribe acknowledgment
unsuback = 11,
// [Client to Server]
// PING request
pingreq = 12,
// [Server to Client]
// PING response
pingresp = 13,
// [Client to Server]
// Client is disconnecting
disconnect = 14,
// [Client to Server] or [Server to Client]
// Authentication exchange
// Only valid in mqtt 5.0
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901256
auth = 15,
};
/**
* Data representation
*
* https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901006
*/
enum class representation_type : std::uint8_t
{
one_byte_integer,
two_byte_integer,
four_byte_integer,
variable_byte_integer,
binary_data,
utf8_string,
utf8_string_pair
};
/**
* This field indicates the level of assurance for delivery of an Application Message.
* The QoS levels are shown below.
*
* https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901103
*/
enum class qos_type : std::uint8_t
{
at_most_once = 0, // At most once delivery
at_least_once = 1, // At least once delivery
exactly_once = 2, // Exactly once delivery
};
static constexpr std::uint8_t qos_min_value = static_cast<std::uint8_t>(qos_type::at_most_once);
static constexpr std::uint8_t qos_max_value = static_cast<std::uint8_t>(qos_type::exactly_once);
template<class QosOrInt>
typename std::enable_if_t<
std::is_same_v<asio2::detail::remove_cvref_t<QosOrInt>, mqtt::qos_type> ||
std::is_integral_v<asio2::detail::remove_cvref_t<QosOrInt>>, bool>
inline is_valid_qos(QosOrInt q)
{
return (static_cast<std::uint8_t>(q) >= qos_min_value && static_cast<std::uint8_t>(q) <= qos_max_value);
}
/**
* Bits 4 and 5 of the Subscription Options represent the Retain Handling option.
* This option specifies whether retained messages are sent when the subscription is established.
* This does not affect the sending of retained messages at any point after the subscribe.
* If there are no retained messages matching the Topic Filter, all of these values act the same.
* The values are:
*
* https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901169
*/
enum class retain_handling_type : std::uint8_t
{
// Send retained messages at the time of the subscribe
send = 0,
// Send retained messages at subscribe only if the subscription does not currently exist
send_only_new_subscription = 1,
// Do not send retained messages at the time of the subscribe
not_send = 2,
};
/*
* The algorithm for encoding a non-negative integer (X) into the Variable Byte Integer encoding scheme is as follows:
* https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901011
*/
template<class Integer>
std::array<std::uint8_t, 4> encode_variable_byte_integer(Integer X)
{
std::int32_t i = 0;
std::array<std::uint8_t, 4> value{};
if (static_cast<std::size_t>(X) > static_cast<std::size_t>(mqtt::max_payload))
{
asio2::set_last_error(asio::error::invalid_argument);
return value;
}
asio2::clear_last_error();
do
{
std::uint8_t encodedByte = X % 128;
X = X / 128;
// if there are more data to encode, set the top bit of this byte
if (X > 0)
{
encodedByte = encodedByte | 128;
}
// 'output' encodedByte
value[i] = encodedByte;
++i;
} while (X > 0);
return value;
}
template<class Integer>
inline bool check_size(Integer size)
{
return (std::size_t(size) <= std::size_t(65535));
}
template<class String>
inline bool check_utf8(String& str)
{
#ifdef ASIO2_HEADER_ONLY
namespace beast = ::bho::beast;
#else
namespace beast = ::boost::beast;
#endif
#if defined(ASIO2_CHECK_UTF8)
return beast::websocket::detail::check_utf8(str.data(), str.size());
#else
asio2::detail::ignore_unused(str);
return true;
#endif
}
/*
* @return pair.first - the integer value, pair.second - number of bytes
*
* The algorithm for decoding a Variable Byte Integer type is as follows:
* https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901011
*/
template<class BufferContainer>
std::pair<std::int32_t, std::int32_t> decode_variable_byte_integer(BufferContainer X)
{
std::int32_t i = 0;
std::int32_t multiplier = 1;
std::int32_t value = 0;
std::uint8_t encodedByte;
do
{
if (X.size() < std::size_t(i + 1))
{
i = 0;
value = 0;
asio2::set_last_error(asio::error::no_buffer_space);
return { value, i };
}
encodedByte = static_cast<std::uint8_t>(X[i]); // 'next byte from stream'
++i;
value += (encodedByte & 127) * multiplier;
if (multiplier > 128 * 128 * 128)
{
i = 0;
value = 0;
asio2::set_last_error(asio::error::invalid_argument);
return { value, i };
}
multiplier *= 128;
} while ((encodedByte & 128) != 0);
asio2::clear_last_error();
// When this algorithm terminates, value contains the Variable Byte Integer value.
return { value, i };
}
/**
* Bits in a byte are labelled 7 to 0. Bit number 7 is the most significant bit,
* the least significant bit is assigned bit number 0.
*
* https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901007
*/
class one_byte_integer
{
public:
using value_type = std::uint8_t;
one_byte_integer() = default;
explicit one_byte_integer(std::uint8_t v) : value_(v)
{
}
inline std::uint8_t value() const noexcept
{
return value_;
}
inline operator std::uint8_t() const noexcept { return value(); }
inline one_byte_integer& operator=(std::uint8_t v)
{
value_ = v;
return (*this);
}
inline constexpr std::size_t required_size() const noexcept
{
return (sizeof(std::uint8_t));
}
inline constexpr std::size_t size() const noexcept
{
return (sizeof(std::uint8_t));
}
inline one_byte_integer& serialize(std::vector<asio::const_buffer>& buffers)
{
buffers.emplace_back(std::addressof(value_), required_size());
return (*this);
}
/*
* The Container is usually a std::string, std::vector<char>, ...
*/
template<class Container>
inline one_byte_integer& serialize(Container& buffer)
{
static_assert(sizeof(typename Container::value_type) == std::size_t(1));
auto* p = reinterpret_cast<typename Container::const_pointer>(std::addressof(value_));
buffer.insert(buffer.end(), p, p + required_size());
return (*this);
}
inline one_byte_integer& deserialize(std::string_view& data)
{
if (data.size() < required_size())
{
set_last_error(mqtt::make_error_code(mqtt::error::malformed_packet));
return (*this);
}
asio2::clear_last_error();
value_ = data[0];
data.remove_prefix(required_size());
return (*this);
}
protected:
std::uint8_t value_{ 0 };
};
/**
* Two Byte Integer data values are 16-bit unsigned integers in big-endian order: the high order
* byte precedes the lower order byte. This means that a 16-bit word is presented on the network
* as Most Significant Byte (MSB), followed by Least Significant Byte (LSB).
*
* https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901008
*/
class two_byte_integer
{
public:
using value_type = std::uint16_t;
two_byte_integer() = default;
explicit two_byte_integer(std::uint16_t v)
{
*this = v;
}
inline std::uint16_t value() const noexcept
{
#if ASIO2_ENDIAN_BIG_BYTE
return value_;
#else
return (((value_ >> 8) & 0x00ff) | ((value_ << 8) & 0xff00));
#endif
}
inline operator std::uint16_t() const noexcept { return value(); }
inline two_byte_integer& operator=(std::uint16_t v)
{
#if ASIO2_ENDIAN_BIG_BYTE
value_ = v;
#else
value_ = ((v >> 8) & 0x00ff) | ((v << 8) & 0xff00);
#endif
return (*this);
}
inline constexpr std::size_t required_size() const noexcept
{
return (sizeof(std::uint16_t));
}
inline constexpr std::size_t size() const noexcept
{
return (sizeof(std::uint16_t));
}
inline two_byte_integer& serialize(std::vector<asio::const_buffer>& buffers)
{
buffers.emplace_back(std::addressof(value_), required_size());
return (*this);
}
/*
* The Container is usually a std::string, std::vector<char>, ...
*/
template<class Container>
inline two_byte_integer& serialize(Container& buffer)
{
static_assert(sizeof(typename Container::value_type) == std::size_t(1));
auto* p = reinterpret_cast<typename Container::const_pointer>(std::addressof(value_));
buffer.insert(buffer.end(), p, p + required_size());
return (*this);
}
inline two_byte_integer& deserialize(std::string_view& data)
{
if (data.size() < required_size())
{
set_last_error(mqtt::make_error_code(mqtt::error::malformed_packet));
return (*this);
}
asio2::clear_last_error();
value_ = ((data[1] << 8) & 0xff00) | ((data[0] << 0) & 0x00ff);
data.remove_prefix(required_size());
return (*this);
}
protected:
std::uint16_t value_{ 0 };
};
/**
* Four Byte Integer data values are 32-bit unsigned integers in big-endian order: the high order
* byte precedes the successively lower order bytes. This means that a 32-bit word is presented on
* the network as Most Significant Byte (MSB), followed by the next most Significant Byte (MSB),
* followed by the next most Significant Byte (MSB), followed by Least Significant Byte (LSB).
*
* https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901009
*/
class four_byte_integer
{
public:
using value_type = std::uint32_t;
four_byte_integer() = default;
explicit four_byte_integer(std::uint32_t v)
{
*this = v;
}
inline std::uint32_t value() const noexcept
{
#if ASIO2_ENDIAN_BIG_BYTE
return value_;
#else
return (
((value_ >> 24) & 0x000000ff) |
((value_ << 24) & 0xff000000) |
((value_ >> 8) & 0x0000ff00) |
((value_ << 8) & 0x00ff0000));
#endif
}
inline operator std::uint32_t() const noexcept { return value(); }
inline four_byte_integer& operator=(std::uint32_t v)
{
#if ASIO2_ENDIAN_BIG_BYTE
value_ = v;
#else
value_ = ((v >> 24) & 0x000000ff) | ((v << 24) & 0xff000000) | ((v >> 8) & 0x0000ff00) | ((v << 8) & 0x00ff0000);
#endif
return (*this);
}
inline constexpr std::size_t required_size() const noexcept
{
return (sizeof(std::uint32_t));
}
inline constexpr std::size_t size() const noexcept
{
return (sizeof(std::uint32_t));
}
inline four_byte_integer& serialize(std::vector<asio::const_buffer>& buffers)
{
buffers.emplace_back(std::addressof(value_), required_size());
return (*this);
}
/*
* The Container is usually a std::string, std::vector<char>, ...
*/
template<class Container>
inline four_byte_integer& serialize(Container& buffer)
{
static_assert(sizeof(typename Container::value_type) == std::size_t(1));
auto* p = reinterpret_cast<typename Container::const_pointer>(std::addressof(value_));
buffer.insert(buffer.end(), p, p + required_size());
return (*this);
}
inline four_byte_integer& deserialize(std::string_view& data)
{
if (data.size() < required_size())
{
set_last_error(mqtt::make_error_code(mqtt::error::malformed_packet));
return (*this);
}
asio2::clear_last_error();
value_ =
((data[3] << 24) & 0xff000000) |
((data[2] << 16) & 0x00ff0000) |
((data[1] << 8) & 0x0000ff00) |
((data[0] << 0) & 0x000000ff);
data.remove_prefix(required_size());
return (*this);
}
protected:
std::uint32_t value_{ 0 };
};
/**
* The Variable Byte Integer is encoded using an encoding scheme which uses a single byte for values up to 127.
* Larger values are handled as follows. The least significant seven bits of each byte encode the data, and
* the most significant bit is used to indicate whether there are bytes following in the representation.
* Thus, each byte encodes 128 values and a "continuation bit".
* The maximum number of bytes in the Variable Byte Integer field is four.
* The encoded value MUST use the minimum number of bytes necessary to represent the value [MQTT-1.5.5-1].
* This is shown in Table 1?1 Size of Variable Byte Integer.
*
* Table 1-1 Size of Variable Byte Integer
* +---------+-------------------------------------+---------------------------------------+
* | Digits | From | To |
* +---------+-------------------------------------+---------------------------------------+
* | 1 | 0 (0x00) | 127 (0x7F) |
* +---------+-------------------------------------+---------------------------------------+
* | 2 | 128 (0x80, 0x01) | 16,383 (0xFF, 0x7F) |
* +---------+-------------------------------------+---------------------------------------+
* | 3 | 16,384 (0x80, 0x80, 0x01) | 2,097,151 (0xFF, 0xFF, 0x7F) |
* +---------+-------------------------------------+---------------------------------------+
* | 4 | 2,097,152 (0x80, 0x80, 0x80, 0x01) | 268,435,455 (0xFF, 0xFF, 0xFF, 0x7F) |
* +---------+-------------------------------------+---------------------------------------+
*
* https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901011
*/
class variable_byte_integer
{
public:
using value_type = std::int32_t;
variable_byte_integer() = default;
explicit variable_byte_integer(std::int32_t v)
{
*this = v;
}
inline std::int32_t value() const
{
return std::get<0>(decode_variable_byte_integer(value_));
}
inline operator std::int32_t() const { return value(); }
inline variable_byte_integer& operator=(std::int32_t v)
{
value_ = encode_variable_byte_integer(v);
return (*this);
}
inline std::size_t required_size() const
{
return std::get<1>(decode_variable_byte_integer(value_));
}
inline std::size_t size() const
{
return std::get<1>(decode_variable_byte_integer(value_));
}
inline variable_byte_integer& serialize(std::vector<asio::const_buffer>& buffers)
{
buffers.emplace_back(value_.data(), required_size());
return (*this);
}
/*
* The Container is usually a std::string, std::vector<char>, ...
*/
template<class Container>
inline variable_byte_integer& serialize(Container& buffer)
{
static_assert(sizeof(typename Container::value_type) == std::size_t(1));
auto* p = reinterpret_cast<typename Container::const_pointer>(value_.data());
buffer.insert(buffer.end(), p, p + required_size());
return (*this);
}
inline variable_byte_integer& deserialize(std::string_view& data)
{
auto[value, bytes] = decode_variable_byte_integer(data);
if (asio2::get_last_error())
return (*this);
asio2::detail::ignore_unused(value);
std::memcpy((void*)value_.data(), (const void*)data.data(), bytes);
data.remove_prefix(bytes);
return (*this);
}
protected:
// The Variable Byte Integer is encoded using an encoding scheme which uses a single byte for values
// up to 127. Larger values are handled as follows. The least significant seven bits of each byte
// encode the data, and the most significant bit is used to indicate whether there are bytes following
// in the representation. Thus, each byte encodes 128 values and a "continuation bit". The maximum
// number of bytes in the Variable Byte Integer field is four. The encoded value MUST use the minimum
// number of bytes necessary to represent the value [MQTT-1.5.5-1].
std::array<std::uint8_t, 4> value_{};
};
/**
* UTF-8 Encoded String
* https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901010
*/
class utf8_string
{
public:
using value_type = std::string;
using view_type = std::string_view;
utf8_string() = default;
utf8_string(utf8_string const& o) : length_(o.length_), data_(o.data_)
{
//asio2::detail::disable_sso(data_);
}
utf8_string& operator=(utf8_string const& o)
{
length_ = o.length_;
data_ = o.data_;
//asio2::detail::disable_sso(data_);
return (*this);
}
explicit utf8_string(const char* const s)
{
*this = std::string{ s };
}
explicit utf8_string(std::string s)
{
*this = std::move(s);
}
explicit utf8_string(std::string_view s)
{
*this = std::move(s);
}
inline utf8_string& operator=(std::string s)
{
if (!check_size(s.size()))
{
asio2::set_last_error(asio::error::invalid_argument);
return (*this);
}
if (!check_utf8(s))
{
asio2::set_last_error(asio::error::invalid_argument);
return (*this);
}
asio2::clear_last_error();
data_ = std::move(s);
length_ = static_cast<std::uint16_t>(data_.size());
//asio2::detail::disable_sso(data_);
return (*this);
}
inline utf8_string& operator=(std::string_view s)
{
*this = std::string{ s };
return (*this);
}
inline utf8_string& operator=(const char* const s)
{
*this = std::string{ s };
return (*this);
}
inline bool operator==(const std::string & s) noexcept { return (data_ == s); }
inline bool operator==(const std::string_view & s) noexcept { return (data_ == s); }
inline bool operator!=(const std::string & s) noexcept { return (data_ != s); }
inline bool operator!=(const std::string_view & s) noexcept { return (data_ != s); }
inline utf8_string& operator+=(const std::string & s)
{
if (!check_size(data_.size() + s.size()))
{
asio2::set_last_error(asio::error::invalid_argument);
return (*this);
}
if (!check_utf8(s))
{
asio2::set_last_error(asio::error::invalid_argument);
return (*this);
}
asio2::clear_last_error();
data_ += s;
length_ = static_cast<std::uint16_t>(data_.size());
//asio2::detail::disable_sso(data_);
return (*this);
}
inline utf8_string& operator+=(const std::string_view & s)
{
if (!check_size(data_.size() + s.size()))
{
asio2::set_last_error(asio::error::invalid_argument);
return (*this);
}
if (!check_utf8(s))
{
asio2::set_last_error(asio::error::invalid_argument);
return (*this);
}
asio2::clear_last_error();
data_ += s;
length_ = static_cast<std::uint16_t>(data_.size());
//asio2::detail::disable_sso(data_);
return (*this);
}
inline utf8_string& operator+=(const char* const s)
{
this->operator+=(std::string_view{ s });
return (*this);
}
// https://stackoverflow.com/questions/56833000/c20-with-u8-char8-t-and-stdstring
#if defined(__cpp_lib_char8_t)
explicit utf8_string(const char8_t* const s)
{
*this = std::u8string_view{ s };
}
explicit utf8_string(const std::u8string& s)
{
*this = s;
}
explicit utf8_string(const std::u8string_view& s)
{
*this = s;
}
inline utf8_string& operator=(const std::u8string& s)
{
*this = std::string{ s.begin(), s.end() };
return (*this);
}
inline utf8_string& operator=(const std::u8string_view& s)
{
*this = std::string{ s.begin(), s.end() };
return (*this);
}
inline utf8_string& operator=(const char8_t* const s)
{
*this = std::u8string_view{ s };
return (*this);
}
inline bool operator==(const std::u8string & s) noexcept { return (data_ == std::string{s.begin(), s.end()}); }
inline bool operator==(const std::u8string_view & s) noexcept { return (data_ == std::string{s.begin(), s.end()}); }
inline bool operator!=(const std::u8string & s) noexcept { return (data_ != std::string{s.begin(), s.end()}); }
inline bool operator!=(const std::u8string_view & s) noexcept { return (data_ != std::string{s.begin(), s.end()}); }
inline utf8_string& operator+=(const std::u8string & s)
{
*this += std::string{ s.begin(), s.end() };
return (*this);
}
inline utf8_string& operator+=(const std::u8string_view & s)
{
*this += std::string{ s.begin(), s.end() };
return (*this);
}
inline utf8_string& operator+=(const char8_t* const s)
{
*this += std::u8string_view{ s };
return (*this);
}
#endif
inline std::size_t required_size() const noexcept
{
return (length_.required_size() + data_.size());
}
inline std::size_t size() const noexcept
{
return (data_.size());
}
inline bool empty() const noexcept { return data_.empty(); }
inline std::string data () const { return data_; }
inline std::string_view data_view() const { return data_; }
inline operator std::string () const { return data_; }
inline operator std::string_view () const { return data_; }
inline utf8_string& serialize(std::vector<asio::const_buffer>& buffers)
{
length_.serialize(buffers);
buffers.emplace_back(asio::buffer(data_));
return (*this);
}
/*
* The Container is usually a std::string, std::vector<char>, ...
*/
template<class Container>
inline utf8_string& serialize(Container& buffer)
{
length_.serialize(buffer);
buffer.insert(buffer.end(), data_.begin(), data_.end());
return (*this);
}
inline utf8_string& deserialize(std::string_view& data)
{
length_.deserialize(data);
if (asio2::get_last_error())
return (*this);
if (data.size() < length_.value())
{
set_last_error(mqtt::make_error_code(mqtt::error::malformed_packet));
return (*this);
}
asio2::clear_last_error();
data_ = data.substr(0, length_.value());
data.remove_prefix(data_.size());
//asio2::detail::disable_sso(data_);
return (*this);
}
protected:
// Each of these strings is prefixed with a Two Byte Integer length field that gives the number of
// bytes in a UTF-8 encoded string itself, as illustrated in Figure 1.1 Structure of UTF-8 Encoded
// Strings below. Consequently, the maximum size of a UTF-8 Encoded String is 65,535 bytes.
two_byte_integer length_{};
// UTF-8 encoded character data, if length > 0.
std::string data_ {};
};
/**
* Binary Data
* https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901012
*/
class binary_data
{
public:
using value_type = std::string;
using view_type = std::string_view;
binary_data() = default;
binary_data(binary_data const& o) : length_(o.length_), data_(o.data_)
{
//asio2::detail::disable_sso(data_);
}
binary_data& operator=(binary_data const& o)
{
length_ = o.length_;
data_ = o.data_;
//asio2::detail::disable_sso(data_);
return (*this);
}
explicit binary_data(const char* const s)
{
*this = std::string{ s };
}
explicit binary_data(std::string s)
{
*this = std::move(s);
}
explicit binary_data(std::string_view s)
{
*this = std::move(s);
}
inline binary_data& operator=(std::string s)
{
if (!check_size(s.size()))
{
asio2::set_last_error(asio::error::invalid_argument);
return (*this);
}
asio2::clear_last_error();
data_ = std::move(s);
length_ = static_cast<std::uint16_t>(data_.size());
//asio2::detail::disable_sso(data_);
return (*this);
}
inline binary_data& operator=(std::string_view s)
{
*this = std::string{ s };
return (*this);
}
inline binary_data& operator=(const char* const s)
{
*this = std::string{ s };
return (*this);
}
inline bool operator==(const std::string & s) noexcept { return (data_ == s); }
inline bool operator==(const std::string_view & s) noexcept { return (data_ == s); }
inline bool operator!=(const std::string & s) noexcept { return (data_ != s); }
inline bool operator!=(const std::string_view & s) noexcept { return (data_ != s); }
inline binary_data& operator+=(const std::string & s)
{
if (!check_size(data_.size() + s.size()))
{
asio2::set_last_error(asio::error::invalid_argument);
return (*this);
}
asio2::clear_last_error();
data_ += s;
length_ = static_cast<std::uint16_t>(data_.size());
//asio2::detail::disable_sso(data_);
return (*this);
}
inline binary_data& operator+=(const std::string_view & s)
{
if (!check_size(data_.size() + s.size()))
{
asio2::set_last_error(asio::error::invalid_argument);
return (*this);
}
asio2::clear_last_error();
data_ += s;
length_ = static_cast<std::uint16_t>(data_.size());
//asio2::detail::disable_sso(data_);
return (*this);
}
inline binary_data& operator+=(const char* const s)
{
this->operator+=(std::string_view{ s });
return (*this);
}
#if defined(__cpp_lib_char8_t)
explicit binary_data(const char8_t* const s)
{
*this = std::u8string{ s };
}
explicit binary_data(const std::u8string& s)
{
*this = s;
}
explicit binary_data(const std::u8string_view& s)
{
*this = s;
}
inline binary_data& operator=(const std::u8string& s)
{
*this = std::string{ s.begin(), s.end() };
return (*this);
}
inline binary_data& operator=(const std::u8string_view& s)
{
*this = std::string{ s.begin(), s.end() };
return (*this);
}
inline binary_data& operator=(const char8_t* const s)
{
*this = std::u8string_view{ s };
return (*this);
}
inline bool operator==(const std::u8string & s) noexcept { return (data_ == std::string(s.begin(), s.end())); }
inline bool operator==(const std::u8string_view & s) noexcept { return (data_ == std::string(s.begin(), s.end())); }
inline bool operator!=(const std::u8string & s) noexcept { return (data_ != std::string(s.begin(), s.end())); }
inline bool operator!=(const std::u8string_view & s) noexcept { return (data_ != std::string(s.begin(), s.end())); }
inline binary_data& operator+=(const std::u8string & s)
{
*this += std::string{ s.begin(), s.end() };
return (*this);
}
inline binary_data& operator+=(const std::u8string_view & s)
{
*this += std::string{ s.begin(), s.end() };
return (*this);
}
inline binary_data& operator+=(const char8_t* const s)
{
*this += std::u8string_view{ s };
return (*this);
}
#endif
inline std::size_t required_size() const noexcept
{
return (length_.required_size() + data_.size());
}
inline std::size_t size() const noexcept
{
return (data_.size());
}
inline bool empty() const noexcept { return data_.empty(); }
inline std::string data () const { return data_; }
inline std::string_view data_view() const { return data_; }
inline operator std::string () const { return data_; }
inline operator std::string_view () const { return data_; }
inline binary_data& serialize(std::vector<asio::const_buffer>& buffers)
{
length_.serialize(buffers);
buffers.emplace_back(asio::buffer(data_));
return (*this);
}
/*
* The Container is usually a std::string, std::vector<char>, ...
*/
template<class Container>
inline binary_data& serialize(Container& buffer)
{
length_.serialize(buffer);
buffer.insert(buffer.end(), data_.begin(), data_.end());
return (*this);
}
inline binary_data& deserialize(std::string_view& data)
{
length_.deserialize(data);
if (asio2::get_last_error())
return (*this);
if (data.size() < length_.value())
{
set_last_error(mqtt::make_error_code(mqtt::error::malformed_packet));
return (*this);
}
asio2::clear_last_error();
data_ = data.substr(0, length_.value());
data.remove_prefix(data_.size());
//asio2::detail::disable_sso(data_);
return (*this);
}
protected:
// Binary Data is represented by a Two Byte Integer length which indicates the number of data bytes,
// followed by that number of bytes. Thus, the length of Binary Data is limited to the range of 0 to
// 65,535 Bytes.
two_byte_integer length_{};
// number of bytes
std::string data_ {};
};
/**
* A UTF-8 String Pair consists of two UTF-8 Encoded Strings.
* This data type is used to hold name-value pairs.
* The first string serves as the name, and the second string contains the value.
*
* https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901013
*/
class utf8_string_pair
{
public:
using value_type = std::pair<std::string, std::string>;
using view_type = std::pair<std::string_view, std::string_view>;
utf8_string_pair() = default;
explicit utf8_string_pair(const char* const key, const char* const val)
: key_(std::move(key)), val_(std::move(val))
{
}
explicit utf8_string_pair(std::string key, std::string val)
: key_(std::move(key)), val_(std::move(val))
{
}
explicit utf8_string_pair(std::string_view key, std::string_view val)
: key_(std::move(key)), val_(std::move(val))
{
}
inline utf8_string_pair& operator=(std::tuple<std::string, std::string> kv)
{
key_ = std::move(std::get<0>(kv));
val_ = std::move(std::get<1>(kv));
return (*this);
}
inline utf8_string_pair& operator=(std::pair<std::string, std::string> kv)
{
key_ = std::move(std::get<0>(kv));
val_ = std::move(std::get<1>(kv));
return (*this);
}
inline utf8_string_pair& operator=(std::tuple<std::string_view, std::string_view> kv)
{
key_ = std::move(std::get<0>(kv));
val_ = std::move(std::get<1>(kv));
return (*this);
}
inline utf8_string_pair& operator=(std::pair<std::string_view, std::string_view> kv)
{
key_ = std::move(std::get<0>(kv));
val_ = std::move(std::get<1>(kv));
return (*this);
}
#if defined(__cpp_lib_char8_t)
explicit utf8_string_pair(const char8_t* const key, const char8_t* const val)
: key_(std::move(key)), val_(std::move(val))
{
}
explicit utf8_string_pair(const std::u8string& key, const std::u8string& val)
: key_(key), val_(val)
{
}
explicit utf8_string_pair(const std::u8string_view& key, const std::u8string_view& val)
: key_(key), val_(val)
{
}
inline utf8_string_pair& operator=(std::tuple<std::u8string, std::u8string> kv)
{
key_ = std::move(std::get<0>(kv));
val_ = std::move(std::get<1>(kv));
return (*this);
}
inline utf8_string_pair& operator=(std::pair<std::u8string, std::u8string> kv)
{
key_ = std::move(std::get<0>(kv));
val_ = std::move(std::get<1>(kv));
return (*this);
}
inline utf8_string_pair& operator=(std::tuple<std::u8string_view, std::u8string_view> kv)
{
key_ = std::move(std::get<0>(kv));
val_ = std::move(std::get<1>(kv));
return (*this);
}
inline utf8_string_pair& operator=(std::pair<std::u8string_view, std::u8string_view> kv)
{
key_ = std::move(std::get<0>(kv));
val_ = std::move(std::get<1>(kv));
return (*this);
}
#endif
inline std::size_t required_size() const noexcept
{
return (key_.required_size() + val_.required_size());
}
inline std::size_t size() const noexcept
{
return (key_.size() + val_.size());
}
inline std::string key() const { return key_.data(); }
inline std::string val() const { return val_.data(); }
inline std::string_view key_view() const { return key_.data_view(); }
inline std::string_view val_view() const { return val_.data_view(); }
/*
* The Container is usually a std::string, std::vector<char>, ...
*/
template<class Container>
inline utf8_string_pair& serialize(Container& buffer)
{
key_.serialize(buffer);
val_.serialize(buffer);
return (*this);
}
inline utf8_string_pair& deserialize(std::string_view& data)
{
key_.deserialize(data);
if (asio2::get_last_error())
return (*this);
val_.deserialize(data);
return (*this);
}
protected:
utf8_string key_{};
utf8_string val_{};
};
/**
* The Payload contains the Application Message that is being published.
* The content and format of the data is application specific.
* The length of the Payload can be calculated by subtracting the length of the Variable Header
* from the Remaining Length field that is in the Fixed Header.
* It is valid for a PUBLISH packet to contain a zero length Payload.
*
* https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901119
*/
class application_message
{
public:
using value_type = std::string;
using view_type = std::string_view;
application_message() = default;
application_message(application_message const& o) : data_(o.data_)
{
//asio2::detail::disable_sso(data_);
}
application_message& operator=(application_message const& o)
{
data_ = o.data_;
//asio2::detail::disable_sso(data_);
return (*this);
}
explicit application_message(const char* const s)
{
*this = std::string{ s };
}
explicit application_message(std::string s)
{
*this = std::move(s);
}
explicit application_message(std::string_view s)
{
*this = std::move(s);
}
inline application_message& operator=(std::string s)
{
if (s.size() > std::size_t(max_payload))
{
set_last_error(asio::error::invalid_argument);
return (*this);
}
asio2::clear_last_error();
data_ = std::move(s);
//asio2::detail::disable_sso(data_);
return (*this);
}
inline application_message& operator=(std::string_view s)
{
*this = std::string{ s };
return (*this);
}
inline application_message& operator=(const char* const s)
{
*this = std::string{ s };
return (*this);
}
inline bool operator==(const std::string & s) noexcept { return (data_ == s); }
inline bool operator==(const std::string_view & s) noexcept { return (data_ == s); }
inline bool operator!=(const std::string & s) noexcept { return (data_ != s); }
inline bool operator!=(const std::string_view & s) noexcept { return (data_ != s); }
inline application_message& operator+=(const std::string & s)
{
if (data_.size() + s.size() > std::size_t(max_payload))
{
set_last_error(asio::error::invalid_argument);
return (*this);
}
asio2::clear_last_error();
data_ += s;
//asio2::detail::disable_sso(data_);
return (*this);
}
inline application_message& operator+=(const std::string_view & s)
{
if (data_.size() + s.size() > std::size_t(max_payload))
{
set_last_error(asio::error::invalid_argument);
return (*this);
}
asio2::clear_last_error();
data_ += s;
//asio2::detail::disable_sso(data_);
return (*this);
}
inline application_message& operator+=(const char* const s)
{
this->operator+=(std::string_view{ s });
return (*this);
}
#if defined(__cpp_lib_char8_t)
explicit application_message(const char8_t* const s)
{
*this = std::u8string{ s };
}
explicit application_message(const std::u8string& s)
{
*this = s;
}
explicit application_message(const std::u8string_view& s)
{
*this = s;
}
inline application_message& operator=(const std::u8string& s)
{
*this = std::string(s.begin(), s.end());
return (*this);
}
inline application_message& operator=(const std::u8string_view& s)
{
*this = std::string(s.begin(), s.end());
return (*this);
}
inline application_message& operator=(const char8_t* const s)
{
*this = std::u8string_view{ s };
return (*this);
}
inline bool operator==(const std::u8string & s) noexcept { return (data_ == std::string(s.begin(), s.end())); }
inline bool operator==(const std::u8string_view & s) noexcept { return (data_ == std::string(s.begin(), s.end())); }
inline bool operator!=(const std::u8string & s) noexcept { return (data_ != std::string(s.begin(), s.end())); }
inline bool operator!=(const std::u8string_view & s) noexcept { return (data_ != std::string(s.begin(), s.end())); }
inline application_message& operator+=(const std::u8string & s)
{
*this += std::string{ s.begin(), s.end() };
return (*this);
}
inline application_message& operator+=(const std::u8string_view & s)
{
*this += std::string{ s.begin(), s.end() };
return (*this);
}
inline application_message& operator+=(const char8_t* const s)
{
*this += std::u8string_view{ s };
return (*this);
}
#endif
inline std::size_t required_size() const noexcept
{
return (data_.size());
}
inline std::size_t size() const noexcept
{
return (data_.size());
}
inline bool empty() const noexcept { return data_.empty(); }
inline std::string data () const { return data_; }
inline std::string_view data_view() const { return data_; }
inline operator std::string () const { return data_; }
inline operator std::string_view () const { return data_; }
inline application_message& serialize(std::vector<asio::const_buffer>& buffers)
{
buffers.emplace_back(asio::buffer(data_));
return (*this);
}
/*
* The Container is usually a std::string, std::vector<char>, ...
*/
template<class Container>
inline application_message& serialize(Container& buffer)
{
buffer.insert(buffer.end(), data_.begin(), data_.end());
return (*this);
}
inline application_message& deserialize(std::string_view& data)
{
data_ = data;
data.remove_prefix(data_.size());
//asio2::detail::disable_sso(data_);
return (*this);
}
protected:
std::string data_{};
};
/**
* this class is just used for user application, it is not the part of mqtt protocol.
*/
template<class derived_t>
class user_message_attr
{
//template <class> friend class asio2::detail::mqtt_handler_t;
//template <class> friend class asio2::detail::mqtt_invoker_t;
//template <class> friend class asio2::detail::mqtt_aop_connect;
//template <class> friend class asio2::detail::mqtt_aop_publish;
public:
user_message_attr() = default;
~user_message_attr() = default;
//protected:
inline derived_t& set_send_flag(bool v) { send_flag_ = v; return (static_cast<derived_t&>(*this)); }
inline bool get_send_flag( ) const { return send_flag_; }
protected:
bool send_flag_ = true;
};
/**
* Fixed header format
* Each MQTT Control Packet contains a Fixed Header as shown below.
*
* http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718020
* https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901021
*/
template<std::uint8_t Version>
class fixed_header : public user_message_attr<fixed_header<Version>>
{
public:
union type_and_flags
{
one_byte_integer byte{ 0 }; // the whole byte
#if ASIO2_ENDIAN_BIG_BYTE
struct
{
std::uint8_t type : 4; // MQTT Control Packet type
bool dup : 1; // Duplicate delivery of a PUBLISH Control Packet
std::uint8_t qos : 2; // PUBLISH Quality of Service, 0, 1 or 2
bool retain : 1; // PUBLISH Retain flag
} bits;
struct
{
std::uint8_t type : 4; // MQTT Control Packet type
std::uint8_t bit3 : 1;
std::uint8_t bit2 : 1;
std::uint8_t bit1 : 1;
std::uint8_t bit0 : 1;
} reserved;
#else
struct
{
bool retain : 1; // PUBLISH Retain flag
std::uint8_t qos : 2; // PUBLISH Quality of Service, 0, 1 or 2
bool dup : 1; // Duplicate delivery of a PUBLISH Control Packet
std::uint8_t type : 4; // MQTT Control Packet type
} bits;
struct
{
std::uint8_t bit0 : 1;
std::uint8_t bit1 : 1;
std::uint8_t bit2 : 1;
std::uint8_t bit3 : 1;
std::uint8_t type : 4; // MQTT Control Packet type
} reserved;
#endif
};
public:
fixed_header()
{
}
explicit fixed_header(control_packet_type type)
{
type_and_flags_.bits.type = asio2::detail::to_underlying(type);
}
constexpr std::uint8_t version() const noexcept { return Version; }
inline std::size_t required_size() const
{
return (type_and_flags_.byte.required_size() + remain_length_.required_size());
}
/*
* The Container is usually a std::string, std::vector<char>, ...
*/
template<class Container>
inline fixed_header& serialize(Container& buffer)
{
type_and_flags_.byte.serialize(buffer);
remain_length_ .serialize(buffer);
return (*this);
}
inline fixed_header& deserialize(std::string_view& data)
{
type_and_flags_.byte.deserialize(data);
if (asio2::get_last_error())
return (*this);
remain_length_ .deserialize(data);
return (*this);
}
inline control_packet_type packet_type () const { return static_cast<control_packet_type>(type_and_flags_.bits.type ); }
inline control_packet_type message_type () const { return packet_type(); }
inline std::int32_t remain_length() const { return remain_length_.value(); }
protected:
type_and_flags type_and_flags_{ };
// The Remaining Length is a Variable Byte Integer that represents the number of bytes remaining
// within the current Control Packet, including data in the Variable Header and the Payload.
// The Remaining Length does not include the bytes used to encode the Remaining Length.
// The packet size is the total number of bytes in an MQTT Control Packet, this is equal to the
// length of the Fixed Header plus the Remaining Length.
variable_byte_integer remain_length_ { 0 };
};
/**
* Used to init a variant mqtt::message to empty.
*/
class nullmsg : public fixed_header<asio2::detail::to_underlying(mqtt::version::v4)>
{
public:
nullmsg() : fixed_header(control_packet_type::reserved) {}
inline nullmsg& update_remain_length() { return (*this); }
};
/**
* https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901168
* https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901248
*/
class subscription
{
public:
subscription() = default;
/*
* the "no_local,rap,retain_handling" are only valid in mqtt 5.0
*/
template<class String, class QosOrInt>
explicit subscription(String&& topic_filter,
QosOrInt qos, bool no_local = false, bool rap = false,
retain_handling_type retain_handling = retain_handling_type::send
)
: topic_filter_(std::forward<String>(topic_filter))
{
option_.bits.qos = static_cast<std::uint8_t>(qos);
option_.bits.nl = no_local;
option_.bits.rap = rap;
option_.bits.retain_handling = asio2::detail::to_underlying(retain_handling);
}
inline std::size_t required_size() const
{
return (topic_filter_.required_size() + option_.byte.required_size());
}
/*
* The Container is usually a std::string, std::vector<char>, ...
*/
template<class Container>
inline subscription& serialize(Container& buffer)
{
topic_filter_.serialize(buffer);
option_.byte .serialize(buffer);
return (*this);
}
inline subscription& deserialize(std::string_view& data)
{
topic_filter_.deserialize(data);
if (asio2::get_last_error())
return (*this);
option_.byte .deserialize(data);
return (*this);
}
inline qos_type qos () const { return static_cast<qos_type>(option_.bits.qos); }
inline bool no_local () const { return option_.bits.nl ; }
inline bool rap () const { return option_.bits.rap ; }
inline retain_handling_type retain_handling() const { return static_cast<retain_handling_type>(option_.bits.retain_handling); }
inline std::string_view share_name () const
{
auto[name, filter] = parse_topic_filter(topic_filter_.data_view());
std::ignore = filter;
return name;
}
inline std::string_view topic_filter () const
{
auto[name, filter] = parse_topic_filter(topic_filter_.data_view());
std::ignore = name;
return filter;
}
template<class QosOrInt>
inline subscription& qos (QosOrInt v) { option_.bits.qos = static_cast<std::uint8_t>(v); return (*this); }
inline subscription& no_local (bool v) { option_.bits.nl = v; return (*this); }
inline subscription& rap (bool v) { option_.bits.rap = v; return (*this); }
inline subscription& retain_handling(std::uint8_t v) { option_.bits.retain_handling = v; return (*this); }
template<class String>
inline subscription& topic_filter (String&& v) { topic_filter_ = std::forward<String>(v) ; return (*this); }
inline subscription& retain_handling(retain_handling_type v) { option_.bits.retain_handling = asio2::detail::to_underlying(v); return (*this); }
protected:
// The Payload of a SUBSCRIBE packet contains a list of Topic Filters indicating the Topics to
// which the Client wants to subscribe. The Topic Filters MUST be a UTF-8 Encoded String
utf8_string topic_filter_{};
// Each Topic Filter is followed by a Subscription Options byte.
union
{
one_byte_integer byte{ 0 }; // the whole byte
#if ASIO2_ENDIAN_BIG_BYTE
struct
{
std::uint8_t reserved : 2; // reserved for future use
std::uint8_t retain_handling : 2; // Retain Handling option, [Only valid for mqtt 5.0]
bool rap : 1; // Retain As Published option, [Only valid for mqtt 5.0]
bool nl : 1; // No Local option, [Only valid for mqtt 5.0]
std::uint8_t qos : 2; // PUBLISH Quality of Service, 0, 1 or 2
} bits;
#else
struct
{
std::uint8_t qos : 2; // PUBLISH Quality of Service, 0, 1 or 2
bool nl : 1; // No Local option, [Only valid for mqtt 5.0]
bool rap : 1; // Retain As Published option, [Only valid for mqtt 5.0]
std::uint8_t retain_handling : 2; // Retain Handling option, [Only valid for mqtt 5.0]
std::uint8_t reserved : 2; // reserved for future use
} bits;
#endif
} option_{ };
};
/**
* https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901168
*/
class subscriptions_set
{
protected:
template<class... Args>
static constexpr bool _is_subscription() noexcept
{
if constexpr (sizeof...(Args) == std::size_t(0))
return true;
else
return ((std::is_same_v<asio2::detail::remove_cvref_t<Args>, mqtt::subscription>) && ...);
}
public:
subscriptions_set()
{
}
template<class... Subscriptions, std::enable_if_t<_is_subscription<Subscriptions...>(), int> = 0>
explicit subscriptions_set(Subscriptions&&... Subscripts)
{
set(std::forward<Subscriptions>(Subscripts)...);
}
subscriptions_set(subscriptions_set&&) noexcept = default;
subscriptions_set(subscriptions_set const&) = default;
subscriptions_set& operator=(subscriptions_set&&) noexcept = default;
subscriptions_set& operator=(subscriptions_set const&) = default;
template<class... Subscriptions, std::enable_if_t<_is_subscription<Subscriptions...>(), int> = 0>
inline subscriptions_set& set(Subscriptions&&... Subscripts)
{
data_.clear();
(data_.emplace_back(std::forward<Subscriptions>(Subscripts)), ...);
return (*this);
}
template<class... Subscriptions, std::enable_if_t<_is_subscription<Subscriptions...>(), int> = 0>
inline subscriptions_set& add(Subscriptions&&... Subscripts)
{
(data_.emplace_back(std::forward<Subscriptions>(Subscripts)), ...);
return (*this);
}
inline subscriptions_set& erase(std::string_view topic_filter)
{
for (auto it = data_.begin(); it != data_.end();)
{
if (it->topic_filter() == topic_filter)
it = data_.erase(it);
else
++it;
}
return (*this);
}
inline subscriptions_set& clear()
{
data_.clear();
return (*this);
}
inline std::size_t required_size() const
{
std::size_t r = 0;
for (auto& v : data_)
{
r += v.required_size();
}
return r;
}
inline std::size_t count() const noexcept
{
return data_.size();
}
/*
* The Container is usually a std::string, std::vector<char>, ...
*/
template<class Container>
inline subscriptions_set& serialize(Container& buffer)
{
for (auto& v : data_)
{
v.serialize(buffer);
}
return (*this);
}
inline subscriptions_set& deserialize(std::string_view& data)
{
while (!data.empty())
{
subscription s{};
s.deserialize(data);
if (asio2::get_last_error())
return (*this);
data_.emplace_back(std::move(s));
}
return (*this);
}
inline std::vector<subscription>& data() { return data_; }
inline std::vector<subscription> const& data() const { return data_; }
protected:
std::vector<subscription> data_{};
};
class one_byte_integer_set
{
protected:
template<class... Args>
static constexpr bool _is_integral() noexcept
{
if constexpr (sizeof...(Args) == std::size_t(0))
return true;
else
return ((std::is_integral_v<asio2::detail::remove_cvref_t<Args>>) && ...);
}
public:
one_byte_integer_set()
{
}
template<class... Integers, std::enable_if_t<_is_integral<Integers...>(), int> = 0>
explicit one_byte_integer_set(Integers... Ints)
{
set(Ints...);
}
one_byte_integer_set(one_byte_integer_set&&) noexcept = default;
one_byte_integer_set(one_byte_integer_set const&) = default;
one_byte_integer_set& operator=(one_byte_integer_set&&) noexcept = default;
one_byte_integer_set& operator=(one_byte_integer_set const&) = default;
template<class... Integers, std::enable_if_t<_is_integral<Integers...>(), int> = 0>
inline one_byte_integer_set& set(Integers... Ints)
{
data_.clear();
(data_.emplace_back(static_cast<one_byte_integer::value_type>(Ints)), ...);
return (*this);
}
template<class... Integers, std::enable_if_t<_is_integral<Integers...>(), int> = 0>
inline one_byte_integer_set& add(Integers... Ints)
{
(data_.emplace_back(static_cast<one_byte_integer::value_type>(Ints)), ...);
return (*this);
}
inline one_byte_integer_set& erase(std::size_t index)
{
if (index < data_.size())
data_.erase(std::next(data_.begin(), index));
return (*this);
}
inline one_byte_integer_set& clear()
{
data_.clear();
return (*this);
}
inline std::size_t required_size() const
{
return (data_.empty() ? 0 : data_.size() * data_.front().required_size());
}
inline std::size_t count() const noexcept
{
return data_.size();
}
/*
* The Container is usually a std::string, std::vector<char>, ...
*/
template<class Container>
inline one_byte_integer_set& serialize(Container& buffer)
{
for (auto& v : data_)
{
v.serialize(buffer);
}
return (*this);
}
inline one_byte_integer_set& deserialize(std::string_view& data)
{
while (!data.empty())
{
one_byte_integer v{};
v.deserialize(data);
if (asio2::get_last_error())
return (*this);
data_.emplace_back(std::move(v));
}
return (*this);
}
inline std::vector<one_byte_integer>& data() { return data_; }
inline std::vector<one_byte_integer> const& data() const { return data_; }
inline one_byte_integer::value_type at(std::size_t i) const
{
return static_cast<one_byte_integer::value_type>(data_.size() > i ? data_[i].value() : -1);
}
protected:
std::vector<one_byte_integer> data_{};
};
class utf8_string_set
{
public:
utf8_string_set()
{
}
template<class... Strings>
explicit utf8_string_set(Strings&&... Strs)
{
set(std::forward<Strings>(Strs)...);
}
utf8_string_set(utf8_string_set&&) noexcept = default;
utf8_string_set(utf8_string_set const&) = default;
utf8_string_set& operator=(utf8_string_set&&) noexcept = default;
utf8_string_set& operator=(utf8_string_set const&) = default;
template<class... Strings>
inline utf8_string_set& set(Strings&&... Strs)
{
data_.clear();
(data_.emplace_back(std::forward<Strings>(Strs)), ...);
return (*this);
}
template<class... Strings>
inline utf8_string_set& add(Strings... Strs)
{
(data_.emplace_back(std::forward<Strings>(Strs)), ...);
return (*this);
}
inline utf8_string_set& erase(std::string_view s)
{
for (auto it = data_.begin(); it != data_.end();)
{
if (it->data_view() == s)
it = data_.erase(it);
else
++it;
}
return (*this);
}
inline utf8_string_set& clear()
{
data_.clear();
return (*this);
}
inline std::size_t required_size() const
{
std::size_t r = 0;
for (auto& v : data_)
{
r += v.required_size();
}
return r;
}
inline std::size_t count() const noexcept
{
return data_.size();
}
/*
* The Container is usually a std::string, std::vector<char>, ...
*/
template<class Container>
inline utf8_string_set& serialize(Container& buffer)
{
for (auto& v : data_)
{
v.serialize(buffer);
}
return (*this);
}
inline utf8_string_set& deserialize(std::string_view& data)
{
while (!data.empty())
{
utf8_string s{};
s.deserialize(data);
if (asio2::get_last_error())
return (*this);
data_.emplace_back(std::move(s));
}
return (*this);
}
inline std::vector<utf8_string>& data() { return data_; }
inline std::vector<utf8_string> const& data() const { return data_; }
/**
* function signature : void(mqtt::utf8_string& str)
*/
template<class Function>
inline utf8_string_set& for_each(Function&& f)
{
for (auto& v : data_)
{
f(v);
}
return (*this);
}
protected:
std::vector<utf8_string> data_{};
};
namespace
{
using iterator = asio::buffers_iterator<asio::streambuf::const_buffers_type>;
using diff_type = typename iterator::difference_type;
std::pair<iterator, bool> mqtt_match_role(iterator begin, iterator end)
{
for (iterator p = begin; p < end;)
{
// Fixed header : at least 2 bytes
if (end - p < static_cast<diff_type>(2))
break;
p += 1;
// Remaining Length
std::int32_t remain_length = 0, remain_bytes = 0;
auto[value, bytes] = decode_variable_byte_integer(std::string_view{ reinterpret_cast<
std::string_view::const_pointer>(p.operator->()), static_cast<std::size_t>(end - p) });
if (error_code ec = asio2::get_last_error(); ec)
{
// need more data
if (ec == asio::error::no_buffer_space)
{
return std::pair(begin, false);
}
// illegal packet
else
{
return std::pair(begin, true);
}
}
remain_length = value;
remain_bytes = bytes;
p += remain_bytes;
// Variable Header, Payload
if (end - p < static_cast<diff_type>(remain_length))
break;
return std::pair(p + static_cast<diff_type>(remain_length), true);
}
return std::pair(begin, false);
}
}
// Write the text for a one_byte_integer variable to an output stream.
inline std::ostream& operator<<(std::ostream& os, one_byte_integer v)
{
std::string s;
v.serialize(s);
return os << std::move(s);
}
// Write the text for a two_byte_integer variable to an output stream.
inline std::ostream& operator<<(std::ostream& os, two_byte_integer v)
{
std::string s;
v.serialize(s);
return os << std::move(s);
}
// Write the text for a four_byte_integer variable to an output stream.
inline std::ostream& operator<<(std::ostream& os, four_byte_integer v)
{
std::string s;
v.serialize(s);
return os << std::move(s);
}
// Write the text for a variable_byte_integer variable to an output stream.
inline std::ostream& operator<<(std::ostream& os, variable_byte_integer v)
{
std::string s;
v.serialize(s);
return os << std::move(s);
}
// Write the text for a utf8_string variable to an output stream.
inline std::ostream& operator<<(std::ostream& os, utf8_string& v)
{
std::string s;
v.serialize(s);
return os << std::move(s);
}
// Write the text for a binary_data variable to an output stream.
inline std::ostream& operator<<(std::ostream& os, binary_data& v)
{
std::string s;
v.serialize(s);
return os << std::move(s);
}
// Write the text for a utf8_string_pair variable to an output stream.
inline std::ostream& operator<<(std::ostream& os, utf8_string_pair& v)
{
std::string s;
v.serialize(s);
return os << std::move(s);
}
// Write the text for a application_message variable to an output stream.
inline std::ostream& operator<<(std::ostream& os, application_message& v)
{
return os << v.data_view();
}
}
#endif // !__ASIO2_MQTT_CORE_HPP__