rpc_recv_op.hpp 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. */
  10. #ifndef __ASIO2_RPC_RECV_OP_HPP__
  11. #define __ASIO2_RPC_RECV_OP_HPP__
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. #pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <memory>
  16. #include <future>
  17. #include <utility>
  18. #include <string_view>
  19. #include <asio2/base/error.hpp>
  20. #include <asio2/base/detail/ecs.hpp>
  21. #include <asio2/rpc/detail/rpc_serialization.hpp>
  22. #include <asio2/rpc/detail/rpc_protocol.hpp>
  23. #include <asio2/rpc/detail/rpc_invoker.hpp>
  24. namespace asio2::detail
  25. {
  26. template<class derived_t, class args_t>
  27. class rpc_recv_op
  28. {
  29. public:
  30. /**
  31. * @brief constructor
  32. */
  33. rpc_recv_op() noexcept {}
  34. /**
  35. * @brief destructor
  36. */
  37. ~rpc_recv_op() = default;
  38. protected:
  39. inline void _rpc_handle_failed_request(
  40. std::shared_ptr<derived_t>& this_ptr, rpc::error e, rpc_serializer& sr, rpc_header& head)
  41. {
  42. ASIO2_ASSERT(static_cast<derived_t&>(*this).io_->running_in_this_thread());
  43. if (head.id() != static_cast<rpc_header::id_type>(0))
  44. {
  45. derived_t& derive = static_cast<derived_t&>(*this);
  46. error_code ec = rpc::make_error_code(e);
  47. sr.reset();
  48. sr << head;
  49. sr << ec;
  50. derive.internal_async_send(this_ptr, sr.str());
  51. }
  52. }
  53. template<typename C>
  54. void _rpc_handle_recv(
  55. std::shared_ptr<derived_t>& this_ptr, std::shared_ptr<ecs_t<C>>& ecs, std::string_view data)
  56. {
  57. detail::ignore_unused(ecs);
  58. derived_t& derive = static_cast<derived_t&>(*this);
  59. ASIO2_ASSERT(derive.is_started());
  60. rpc_serializer & sr = derive.serializer_;
  61. rpc_deserializer & dr = derive.deserializer_;
  62. rpc_header & head = derive.header_;
  63. try
  64. {
  65. dr.reset(data);
  66. dr >> head;
  67. }
  68. catch (cereal::exception const&)
  69. {
  70. derive._do_disconnect(rpc::make_error_code(rpc::error::illegal_data), this_ptr);
  71. return;
  72. }
  73. // bug fixed : illegal data being parsed into string object fails to allocate
  74. // memory due to excessively long data
  75. catch (std::bad_alloc const&)
  76. {
  77. derive._do_disconnect(rpc::make_error_code(rpc::error::illegal_data), this_ptr);
  78. return;
  79. }
  80. catch (std::exception const&)
  81. {
  82. derive._do_disconnect(rpc::make_error_code(rpc::error::unspecified_error), this_ptr);
  83. return;
  84. }
  85. if /**/ (head.is_request())
  86. {
  87. head.type(rpc_type_rep);
  88. sr.reset();
  89. sr << head;
  90. auto fn = derive._invoker().find(head.name());
  91. if (fn)
  92. {
  93. // async - return true, sync - return false
  94. // call this function will deserialize data, so it maybe throw some exception,
  95. // and it will call user function inner, the user function maybe throw some
  96. // exception also.
  97. #if !defined(ASIO_NO_EXCEPTIONS) && !defined(BOOST_ASIO_NO_EXCEPTIONS)
  98. try
  99. {
  100. #endif
  101. if ((*fn)(this_ptr, std::addressof(derive), sr, dr))
  102. return;
  103. #if !defined(ASIO_NO_EXCEPTIONS) && !defined(BOOST_ASIO_NO_EXCEPTIONS)
  104. }
  105. catch (cereal::exception const&)
  106. {
  107. derive._rpc_handle_failed_request(this_ptr, rpc::error::invalid_argument, sr, head);
  108. return;
  109. }
  110. catch (system_error const&)
  111. {
  112. derive._rpc_handle_failed_request(this_ptr, rpc::error::unspecified_error, sr, head);
  113. return;
  114. }
  115. catch (std::exception const&)
  116. {
  117. derive._rpc_handle_failed_request(this_ptr, rpc::error::unspecified_error, sr, head);
  118. return;
  119. }
  120. #endif
  121. // The number of parameters passed in when calling rpc function exceeds
  122. // the number of parameters of local function
  123. if (head.id() != static_cast<rpc_header::id_type>(0))
  124. {
  125. if (dr.buffer().in_avail() == 0)
  126. {
  127. derive.internal_async_send(this_ptr, sr.str());
  128. }
  129. else
  130. {
  131. derive._rpc_handle_failed_request(this_ptr, rpc::error::invalid_argument, sr, head);
  132. }
  133. }
  134. }
  135. else
  136. {
  137. if (head.id() != static_cast<rpc_header::id_type>(0))
  138. {
  139. error_code ec = rpc::make_error_code(rpc::error::not_found);
  140. sr << ec;
  141. derive.internal_async_send(this_ptr, sr.str());
  142. }
  143. }
  144. }
  145. else if (head.is_response())
  146. {
  147. auto iter = derive.reqs_.find(head.id());
  148. if (iter != derive.reqs_.end())
  149. {
  150. std::function<void(error_code, std::string_view)>& cb = iter->second;
  151. cb(rpc::make_error_code(rpc::error::success), data);
  152. }
  153. }
  154. else
  155. {
  156. derive._do_disconnect(rpc::make_error_code(rpc::error::no_data), this_ptr);
  157. }
  158. }
  159. protected:
  160. };
  161. }
  162. #endif // !__ASIO2_RPC_RECV_OP_HPP__