123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- #ifndef __ASIO2_MQTT_SEND_OP_HPP__
- #define __ASIO2_MQTT_SEND_OP_HPP__
- #if defined(_MSC_VER) && (_MSC_VER >= 1200)
- #pragma once
- #endif
- #include <memory>
- #include <future>
- #include <utility>
- #include <string_view>
- #include <vector>
- #include <variant>
- #include <asio2/base/error.hpp>
- #include <asio2/base/detail/util.hpp>
- #include <asio2/base/detail/buffer_wrap.hpp>
- #include <asio2/mqtt/message.hpp>
- namespace asio2::detail
- {
- template<class derived_t, class args_t>
- class mqtt_send_op
- {
- public:
-
- mqtt_send_op() {}
-
- ~mqtt_send_op() = default;
- protected:
- template<class Data, class Callback>
- inline bool _mqtt_send(Data& data, Callback&& callback)
- {
- derived_t& derive = static_cast<derived_t&>(*this);
-
-
-
-
-
-
- #if defined(_DEBUG) || defined(DEBUG)
- static bool flag = false;
- if (flag == false)
- {
- flag = true;
- std::vector<char> v1{ 'a','b','c' };
- std::string_view sv{ v1.data(),v1.size() };
- std::vector<char> v2 = std::move(v1);
- ASIO2_ASSERT(sv.data() == v2.data());
- }
- #endif
- std::vector<char> binary;
- using data_type = typename detail::remove_cvref_t<Data>;
- if constexpr (std::is_same_v<mqtt::message, data_type>)
- {
- std::visit([&binary](auto& message) mutable
- {
- binary.reserve(message.required_size());
- message.serialize(binary);
- }, data);
- }
- else if constexpr (detail::is_template_instance_of_v<std::variant, data_type>)
- {
- std::visit([&binary](auto& message) mutable
- {
- binary.reserve(message.required_size());
- message.serialize(binary);
- }, data);
- }
- else
- {
- binary.reserve(data.required_size());
- data.serialize(binary);
- }
- auto buffer = asio::buffer(binary);
- #if defined(_DEBUG) || defined(DEBUG)
- ASIO2_ASSERT(derive.post_send_counter_.load() == 0);
- derive.post_send_counter_++;
- #endif
- asio::async_write(derive.stream(), buffer,
- make_allocator(derive.wallocator(), [&derive,
- binary = std::move(binary), callback = std::forward<Callback>(callback)]
- (const error_code& ec, std::size_t bytes_sent) mutable
- {
- #if defined(_DEBUG) || defined(DEBUG)
- derive.post_send_counter_--;
- #endif
- set_last_error(ec);
- callback(ec, bytes_sent);
- if (ec)
- {
-
- if (derive.state_ == state_t::started)
- {
- derive._do_disconnect(ec, derive.selfptr());
- }
- }
- }));
- return true;
- }
- };
- }
- #endif
|