rpc_invoker.hpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. */
  10. #ifndef __ASIO2_RPC_INVOKER_HPP__
  11. #define __ASIO2_RPC_INVOKER_HPP__
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. #pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <cstdint>
  16. #include <memory>
  17. #include <chrono>
  18. #include <functional>
  19. #include <atomic>
  20. #include <string>
  21. #include <string_view>
  22. #include <queue>
  23. #include <any>
  24. #include <future>
  25. #include <tuple>
  26. #include <unordered_map>
  27. #include <type_traits>
  28. #include <optional>
  29. #include <asio2/base/iopool.hpp>
  30. #include <asio2/base/define.hpp>
  31. #include <asio2/base/detail/function_traits.hpp>
  32. #include <asio2/base/detail/util.hpp>
  33. #include <asio2/base/detail/shared_mutex.hpp>
  34. #include <asio2/rpc/detail/rpc_serialization.hpp>
  35. #include <asio2/rpc/detail/rpc_protocol.hpp>
  36. #include <asio2/util/string.hpp>
  37. namespace asio2::detail
  38. {
  39. // forward declare
  40. template<class, class> class rpc_invoker_t;
  41. }
  42. namespace asio2::rpc
  43. {
  44. template<class T>
  45. class response_defer
  46. {
  47. template<class, class> friend class asio2::detail::rpc_invoker_t;
  48. public:
  49. response_defer() = default;
  50. ~response_defer()
  51. {
  52. ASIO2_ASSERT(f_);
  53. if (f_) { f_(); }
  54. }
  55. template<class V>
  56. inline void set_value(V&& v) { v_ = std::forward<V>(v); }
  57. protected:
  58. template<class F>
  59. inline void _bind (F&& f) { f_ = std::forward<F>(f); }
  60. protected:
  61. std::optional<T> v_{};
  62. std::function<void()> f_{};
  63. };
  64. template<class T>
  65. class future
  66. {
  67. template<class, class> friend class asio2::detail::rpc_invoker_t;
  68. public:
  69. future() = delete;
  70. future(std::shared_ptr<response_defer<T>> defer) noexcept : defer_(std::move(defer))
  71. {
  72. }
  73. ~future() = default;
  74. future(future&&) noexcept = default;
  75. future(future const&) = default;
  76. future& operator=(future&&) noexcept = default;
  77. future& operator=(future const&) = default;
  78. protected:
  79. std::shared_ptr<response_defer<T>> defer_{};
  80. };
  81. template<class T>
  82. class promise
  83. {
  84. template<class, class> friend class asio2::detail::rpc_invoker_t;
  85. public:
  86. promise() = default;
  87. ~promise() = default;
  88. promise(promise&&) noexcept = default;
  89. promise(promise const&) = default;
  90. promise& operator=(promise&&) noexcept = default;
  91. promise& operator=(promise const&) = default;
  92. inline future<T> get_future() const noexcept { return future<T>{ defer_ }; }
  93. template<class V>
  94. inline void set_value(V&& v) { defer_->set_value(std::forward<V>(v)); }
  95. protected:
  96. std::shared_ptr<response_defer<T>> defer_ = std::make_shared<response_defer<T>>();
  97. };
  98. //---------------------------------------------------------------------------------------------
  99. // specialize for void
  100. //---------------------------------------------------------------------------------------------
  101. template<>
  102. class response_defer<void>
  103. {
  104. template<class, class> friend class asio2::detail::rpc_invoker_t;
  105. public:
  106. response_defer() = default;
  107. ~response_defer()
  108. {
  109. ASIO2_ASSERT(f_);
  110. if (f_) { f_(); }
  111. }
  112. template<typename = void>
  113. inline void set_value() { v_ = '0'; }
  114. protected:
  115. template<class F>
  116. inline void _bind (F&& f) { f_ = std::forward<F>(f); }
  117. protected:
  118. std::optional<char> v_{};
  119. std::function<void()> f_{};
  120. };
  121. template<>
  122. class future<void>
  123. {
  124. template<class, class> friend class asio2::detail::rpc_invoker_t;
  125. public:
  126. future() = delete;
  127. future(std::shared_ptr<response_defer<void>> defer) noexcept : defer_(std::move(defer))
  128. {
  129. }
  130. ~future() = default;
  131. future(future&&) noexcept = default;
  132. future(future const&) = default;
  133. future& operator=(future&&) noexcept = default;
  134. future& operator=(future const&) = default;
  135. protected:
  136. std::shared_ptr<response_defer<void>> defer_{};
  137. };
  138. template<>
  139. class promise<void>
  140. {
  141. template<class, class> friend class asio2::detail::rpc_invoker_t;
  142. public:
  143. promise() = default;
  144. ~promise() = default;
  145. promise(promise&&) noexcept = default;
  146. promise(promise const&) = default;
  147. promise& operator=(promise&&) noexcept = default;
  148. promise& operator=(promise const&) = default;
  149. inline future<void> get_future() const noexcept { return future<void>{ defer_ }; }
  150. template<typename = void>
  151. inline void set_value() { defer_->set_value(); }
  152. protected:
  153. std::shared_ptr<response_defer<void>> defer_ = std::make_shared<response_defer<void>>();
  154. };
  155. }
  156. namespace asio2::detail
  157. {
  158. template<class T>
  159. struct rpc_result_t
  160. {
  161. using type = typename std::remove_cv_t<std::remove_reference_t<T>>;
  162. };
  163. template<>
  164. struct rpc_result_t<void>
  165. {
  166. using type = std::int8_t;
  167. };
  168. template<class caller_t, class args_t>
  169. class rpc_invoker_t
  170. {
  171. protected:
  172. struct dummy {};
  173. public:
  174. using self = rpc_invoker_t<caller_t, args_t>;
  175. using fntype = std::function<
  176. bool(std::shared_ptr<caller_t>&, caller_t*, rpc_serializer&, rpc_deserializer&)>;
  177. /**
  178. * @brief constructor
  179. */
  180. rpc_invoker_t() = default;
  181. /**
  182. * @brief destructor
  183. */
  184. ~rpc_invoker_t() = default;
  185. rpc_invoker_t(rpc_invoker_t&& o) noexcept : rpc_invokers_(std::move(o.rpc_invokers_))
  186. {
  187. }
  188. rpc_invoker_t(rpc_invoker_t const& o) : rpc_invokers_(o.rpc_invokers_)
  189. {
  190. }
  191. rpc_invoker_t& operator=(rpc_invoker_t&& o) noexcept
  192. {
  193. this->rpc_invokers_ = std::move(o.rpc_invokers_);
  194. }
  195. rpc_invoker_t& operator=(rpc_invoker_t const& o)
  196. {
  197. this->rpc_invokers_ = o.rpc_invokers_;
  198. }
  199. /**
  200. * @brief bind a rpc function
  201. * @param name - Function name in string format.
  202. * @param fun - Function object.
  203. * @param obj - A pointer or reference to a class object, this parameter can be none.
  204. * if fun is nonmember function, the obj param must be none, otherwise the obj must be the
  205. * the class object's pointer or reference.
  206. */
  207. template<class F, class ...C>
  208. inline self& bind(std::string name, F&& fun, C&&... obj)
  209. {
  210. asio2::trim_both(name);
  211. ASIO2_ASSERT(!name.empty());
  212. if (name.empty())
  213. return (*this);
  214. #if defined(_DEBUG) || defined(DEBUG)
  215. {
  216. #if defined(ASIO2_ENABLE_RPC_INVOKER_THREAD_SAFE)
  217. asio2::shared_locker guard(this->rpc_invoker_mutex_);
  218. #endif
  219. ASIO2_ASSERT(this->rpc_invokers_.find(name) == this->rpc_invokers_.end());
  220. }
  221. #endif
  222. this->_bind(std::move(name), std::forward<F>(fun), std::forward<C>(obj)...);
  223. return (*this);
  224. }
  225. /**
  226. * @brief unbind a rpc function
  227. */
  228. inline self& unbind(std::string const& name)
  229. {
  230. #if defined(ASIO2_ENABLE_RPC_INVOKER_THREAD_SAFE)
  231. asio2::unique_locker guard(this->rpc_invoker_mutex_);
  232. #endif
  233. this->rpc_invokers_.erase(name);
  234. return (*this);
  235. }
  236. /**
  237. * @brief find binded rpc function by name
  238. */
  239. inline std::shared_ptr<fntype> find(std::string const& name)
  240. {
  241. #if defined(ASIO2_ENABLE_RPC_INVOKER_THREAD_SAFE)
  242. asio2::shared_locker guard(this->rpc_invoker_mutex_);
  243. #endif
  244. if (auto iter = this->rpc_invokers_.find(name); iter != this->rpc_invokers_.end())
  245. return iter->second;
  246. return nullptr;
  247. }
  248. protected:
  249. inline self& _invoker() noexcept { return (*this); }
  250. inline self const& _invoker() const noexcept { return (*this); }
  251. template<class F>
  252. inline void _bind(std::string name, F f)
  253. {
  254. #if defined(ASIO2_ENABLE_RPC_INVOKER_THREAD_SAFE)
  255. asio2::unique_locker guard(this->rpc_invoker_mutex_);
  256. #endif
  257. this->rpc_invokers_[std::move(name)] = std::make_shared<fntype>(std::bind(&self::template _proxy<F, dummy>,
  258. this, std::move(f), nullptr,
  259. std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
  260. }
  261. template<class F, class C>
  262. inline void _bind(std::string name, F f, C& c)
  263. {
  264. #if defined(ASIO2_ENABLE_RPC_INVOKER_THREAD_SAFE)
  265. asio2::unique_locker guard(this->rpc_invoker_mutex_);
  266. #endif
  267. this->rpc_invokers_[std::move(name)] = std::make_shared<fntype>(std::bind(&self::template _proxy<F, C>,
  268. this, std::move(f), std::addressof(c),
  269. std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
  270. }
  271. template<class F, class C>
  272. inline void _bind(std::string name, F f, C* c)
  273. {
  274. #if defined(ASIO2_ENABLE_RPC_INVOKER_THREAD_SAFE)
  275. asio2::unique_locker guard(this->rpc_invoker_mutex_);
  276. #endif
  277. this->rpc_invokers_[std::move(name)] = std::make_shared<fntype>(std::bind(&self::template _proxy<F, C>,
  278. this, std::move(f), c,
  279. std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
  280. }
  281. template<class F, class C>
  282. inline bool _proxy(F& f, C* c, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller,
  283. rpc_serializer& sr, rpc_deserializer& dr)
  284. {
  285. using fun_traits_type = function_traits<F>;
  286. return _argc_proxy<fun_traits_type::argc>(f, c, caller_ptr, caller, sr, dr);
  287. }
  288. template<std::size_t Argc, class F, class C>
  289. typename std::enable_if_t<Argc == 0, bool>
  290. inline _argc_proxy(F& f, C* c, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller,
  291. rpc_serializer& sr, rpc_deserializer& dr)
  292. {
  293. using fun_traits_type = function_traits<F>;
  294. using fun_args_tuple = typename fun_traits_type::pod_tuple_type;
  295. using fun_ret_type = typename fun_traits_type::return_type;
  296. fun_args_tuple tp;
  297. detail::for_each_tuple(tp, [&dr](auto& elem) mutable
  298. {
  299. dr >> elem;
  300. });
  301. return _invoke<fun_ret_type>(f, c, caller_ptr, caller, sr, dr, std::move(tp));
  302. }
  303. template<std::size_t Argc, class F, class C>
  304. typename std::enable_if_t<Argc != 0, bool>
  305. inline _argc_proxy(F& f, C* c, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller,
  306. rpc_serializer& sr, rpc_deserializer& dr)
  307. {
  308. detail::ignore_unused(caller);
  309. using fun_traits_type = function_traits<F>;
  310. using fun_args_tuple = typename fun_traits_type::pod_tuple_type;
  311. using fun_ret_type = typename fun_traits_type::return_type;
  312. using arg0_type = typename std::remove_cv_t<std::remove_reference_t<
  313. typename fun_traits_type::template args<0>::type>>;
  314. if constexpr /**/ (std::is_same_v<arg0_type, std::shared_ptr<caller_t>>)
  315. {
  316. auto tp = _body_args_tuple((fun_args_tuple*)0);
  317. detail::for_each_tuple(tp, [&dr](auto& elem) mutable
  318. {
  319. dr >> elem;
  320. });
  321. auto tp_new = std::tuple_cat(std::tuple<std::shared_ptr<caller_t>&>(caller_ptr), tp);
  322. return _invoke<fun_ret_type>(f, c, caller_ptr, caller, sr, dr, std::move(tp_new));
  323. }
  324. else if constexpr (std::is_same_v<arg0_type, caller_t>)
  325. {
  326. auto tp = _body_args_tuple((fun_args_tuple*)0);
  327. detail::for_each_tuple(tp, [&dr](auto& elem) mutable
  328. {
  329. dr >> elem;
  330. });
  331. auto tp_new = std::tuple_cat(std::tuple<caller_t&>(*caller), tp);
  332. return _invoke<fun_ret_type>(f, c, caller_ptr, caller, sr, dr, std::move(tp_new));
  333. }
  334. else
  335. {
  336. fun_args_tuple tp;
  337. detail::for_each_tuple(tp, [&dr](auto& elem) mutable
  338. {
  339. dr >> elem;
  340. });
  341. return _invoke<fun_ret_type>(f, c, caller_ptr, caller, sr, dr, std::move(tp));
  342. }
  343. }
  344. template<typename... Args>
  345. inline decltype(auto) _body_args_tuple(std::tuple<Args...>* tp)
  346. {
  347. return (_body_args_tuple_impl(std::make_index_sequence<sizeof...(Args) - 1>{}, tp));
  348. }
  349. template<std::size_t... I, typename... Args>
  350. inline decltype(auto) _body_args_tuple_impl(std::index_sequence<I...>, std::tuple<Args...>*) noexcept
  351. {
  352. return (std::tuple<typename std::tuple_element<I + 1, std::tuple<Args...>>::type...>{});
  353. }
  354. template<typename R, typename F, typename C>
  355. inline bool _invoke_with_future(F& f, C* c, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller,
  356. rpc_serializer& sr, rpc_deserializer& dr, typename rpc_result_t<R>::type r)
  357. {
  358. detail::ignore_unused(f, c, caller_ptr, caller, sr, dr);
  359. error_code ec = rpc::make_error_code(rpc::error::success);
  360. if (dr.buffer().in_avail() != 0)
  361. {
  362. ec = rpc::make_error_code(rpc::error::invalid_argument);
  363. }
  364. auto* defer = r.defer_.get();
  365. detail::io_context_work_guard iocg(caller->io_->context().get_executor());
  366. r.defer_->_bind(
  367. [caller_ptr, caller, &sr, ec, head = caller->header_, defer, iocg = std::move(iocg)]() mutable
  368. {
  369. detail::ignore_unused(caller_ptr, iocg);
  370. if (head.id() == static_cast<rpc_header::id_type>(0))
  371. return;
  372. // the "header_, async_send" should not appear in this "invoker" module, But I thought
  373. // for a long time and couldn't find of a good method to solve this problem.
  374. // the operator for "sr" must be in the io_context thread.
  375. asio::dispatch(caller->io_->context(), make_allocator(caller->wallocator(),
  376. [caller_ptr = std::move(caller_ptr), caller, &sr, ec, head = std::move(head),
  377. v = std::move(defer->v_)]
  378. () mutable
  379. {
  380. ASIO2_ASSERT(caller->io_->running_in_this_thread());
  381. if (!caller->is_started())
  382. return;
  383. head.type(rpc_type_rep);
  384. if (v.has_value() == false && (!ec))
  385. {
  386. ec = rpc::make_error_code(rpc::error::no_data);
  387. }
  388. sr.reset();
  389. sr << head;
  390. sr << ec;
  391. #if !defined(ASIO_NO_EXCEPTIONS) && !defined(BOOST_ASIO_NO_EXCEPTIONS)
  392. try
  393. {
  394. #endif
  395. if constexpr (!std::is_same_v<rpc::future<void>, R>)
  396. {
  397. if (!ec)
  398. {
  399. sr << std::move(v.value()); // maybe throw some exception
  400. }
  401. }
  402. else
  403. {
  404. std::ignore = v;
  405. }
  406. caller->internal_async_send(std::move(caller_ptr), sr.str());
  407. #if !defined(ASIO_NO_EXCEPTIONS) && !defined(BOOST_ASIO_NO_EXCEPTIONS)
  408. return; // not exception, return
  409. }
  410. catch (cereal::exception const&)
  411. {
  412. if (!ec) ec = rpc::make_error_code(rpc::error::invalid_argument);
  413. }
  414. catch (std::exception const&)
  415. {
  416. if (!ec) ec = rpc::make_error_code(rpc::error::unspecified_error);
  417. }
  418. // the error_code must not be 0.
  419. ASIO2_ASSERT(ec);
  420. // code run to here, it means that there has some exception.
  421. sr.reset();
  422. sr << head;
  423. sr << ec;
  424. caller->internal_async_send(std::move(caller_ptr), sr.str());
  425. #endif
  426. }));
  427. });
  428. return true;
  429. }
  430. // async - return true, sync - return false
  431. template<typename R, typename F, typename C, typename... Args>
  432. inline bool _invoke(F& f, C* c, std::shared_ptr<caller_t>& caller_ptr, caller_t* caller,
  433. rpc_serializer& sr, rpc_deserializer& dr, std::tuple<Args...>&& tp)
  434. {
  435. detail::ignore_unused(caller_ptr, caller, sr, dr);
  436. if (caller_ptr)
  437. {
  438. detail::get_current_object<std::shared_ptr<caller_t>>() = caller_ptr;
  439. }
  440. else
  441. {
  442. detail::get_current_object<caller_t*>() = caller;
  443. }
  444. typename rpc_result_t<R>::type r = _invoke_impl<R>(f, c,
  445. std::make_index_sequence<sizeof...(Args)>{}, std::move(tp));
  446. if constexpr (detail::is_template_instance_of_v<rpc::future, R>)
  447. {
  448. return _invoke_with_future<R>(f, c, caller_ptr, caller, sr, dr, std::move(r));
  449. }
  450. else if constexpr (!std::is_same_v<R, void>)
  451. {
  452. sr << rpc::make_error_code(rpc::error::success);
  453. sr << r;
  454. return false;
  455. }
  456. else
  457. {
  458. sr << rpc::make_error_code(rpc::error::success);
  459. std::ignore = r;
  460. return false;
  461. }
  462. }
  463. template<typename R, typename F, typename C, std::size_t... I, typename... Args>
  464. typename std::enable_if_t<!std::is_same_v<R, void>, typename rpc_result_t<R>::type>
  465. inline _invoke_impl(F& f, C* c, std::index_sequence<I...>, std::tuple<Args...>&& tp)
  466. {
  467. detail::ignore_unused(c);
  468. if constexpr (std::is_same_v<detail::remove_cvref_t<C>, dummy>)
  469. return f(std::get<I>(std::move(tp))...);
  470. else
  471. return (c->*f)(std::get<I>(std::move(tp))...);
  472. }
  473. template<typename R, typename F, typename C, std::size_t... I, typename... Args>
  474. typename std::enable_if_t<std::is_same_v<R, void>, typename rpc_result_t<R>::type>
  475. inline _invoke_impl(F& f, C* c, std::index_sequence<I...>, std::tuple<Args...>&& tp)
  476. {
  477. detail::ignore_unused(c);
  478. if constexpr (std::is_same_v<detail::remove_cvref_t<C>, dummy>)
  479. f(std::get<I>(std::move(tp))...);
  480. else
  481. (c->*f)(std::get<I>(std::move(tp))...);
  482. return 1;
  483. }
  484. protected:
  485. #if defined(ASIO2_ENABLE_RPC_INVOKER_THREAD_SAFE)
  486. mutable asio2::shared_mutexer rpc_invoker_mutex_;
  487. #endif
  488. std::unordered_map<std::string, std::shared_ptr<fntype>> rpc_invokers_
  489. #if defined(ASIO2_ENABLE_RPC_INVOKER_THREAD_SAFE)
  490. ASIO2_GUARDED_BY(rpc_invoker_mutex_)
  491. #endif
  492. ;
  493. };
  494. }
  495. #endif // !__ASIO2_RPC_INVOKER_HPP__