mqtt_send_op.hpp 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. */
  10. #ifndef __ASIO2_MQTT_SEND_OP_HPP__
  11. #define __ASIO2_MQTT_SEND_OP_HPP__
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. #pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <memory>
  16. #include <future>
  17. #include <utility>
  18. #include <string_view>
  19. #include <vector>
  20. #include <variant>
  21. #include <asio2/base/error.hpp>
  22. #include <asio2/base/detail/util.hpp>
  23. #include <asio2/base/detail/buffer_wrap.hpp>
  24. #include <asio2/mqtt/message.hpp>
  25. namespace asio2::detail
  26. {
  27. template<class derived_t, class args_t>
  28. class mqtt_send_op
  29. {
  30. public:
  31. /**
  32. * @brief constructor
  33. */
  34. mqtt_send_op() {}
  35. /**
  36. * @brief destructor
  37. */
  38. ~mqtt_send_op() = default;
  39. protected:
  40. template<class Data, class Callback>
  41. inline bool _mqtt_send(Data& data, Callback&& callback)
  42. {
  43. derived_t& derive = static_cast<derived_t&>(*this);
  44. // why don't use std::string ?
  45. // beacuse std::string has a SSO(Small String Optimization) mechanism
  46. // https://stackoverflow.com/questions/34788789/disable-stdstrings-sso
  47. // std::string str;
  48. // str.reserve(sizeof(str) + 1);
  49. // Test whether the vector has SSO mechanism
  50. #if defined(_DEBUG) || defined(DEBUG)
  51. static bool flag = false;
  52. if (flag == false)
  53. {
  54. flag = true;
  55. std::vector<char> v1{ 'a','b','c' };
  56. std::string_view sv{ v1.data(),v1.size() };
  57. std::vector<char> v2 = std::move(v1);
  58. ASIO2_ASSERT(sv.data() == v2.data());
  59. }
  60. #endif
  61. std::vector<char> binary;
  62. using data_type = typename detail::remove_cvref_t<Data>;
  63. if constexpr (std::is_same_v<mqtt::message, data_type>)
  64. {
  65. std::visit([&binary](auto& message) mutable
  66. {
  67. binary.reserve(message.required_size());
  68. message.serialize(binary);
  69. }, data);
  70. }
  71. else if constexpr (detail::is_template_instance_of_v<std::variant, data_type>)
  72. {
  73. std::visit([&binary](auto& message) mutable
  74. {
  75. binary.reserve(message.required_size());
  76. message.serialize(binary);
  77. }, data);
  78. }
  79. else
  80. {
  81. binary.reserve(data.required_size());
  82. data.serialize(binary);
  83. }
  84. auto buffer = asio::buffer(binary);
  85. #if defined(_DEBUG) || defined(DEBUG)
  86. ASIO2_ASSERT(derive.post_send_counter_.load() == 0);
  87. derive.post_send_counter_++;
  88. #endif
  89. asio::async_write(derive.stream(), buffer,
  90. make_allocator(derive.wallocator(), [&derive,
  91. binary = std::move(binary), callback = std::forward<Callback>(callback)]
  92. (const error_code& ec, std::size_t bytes_sent) mutable
  93. {
  94. #if defined(_DEBUG) || defined(DEBUG)
  95. derive.post_send_counter_--;
  96. #endif
  97. set_last_error(ec);
  98. callback(ec, bytes_sent);
  99. if (ec)
  100. {
  101. // must stop, otherwise re-sending will cause body confusion
  102. if (derive.state_ == state_t::started)
  103. {
  104. derive._do_disconnect(ec, derive.selfptr());
  105. }
  106. }
  107. }));
  108. return true;
  109. }
  110. };
  111. }
  112. #endif // !__ASIO2_MQTT_SEND_OP_HPP__