/* * Copyright (c) 2017-2023 zhllxt * * author : zhllxt * email : 37792738@qq.com * * Distributed under the Boost Software License, Version 1.0. (See accompanying * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) */ #ifndef __ASIO2_RPC_INVOKER_HPP__ #define __ASIO2_RPC_INVOKER_HPP__ #if defined(_MSC_VER) && (_MSC_VER >= 1200) #pragma once #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace asio2::detail { // forward declare template class rpc_invoker_t; } namespace asio2::rpc { template class response_defer { template friend class asio2::detail::rpc_invoker_t; public: response_defer() = default; ~response_defer() { ASIO2_ASSERT(f_); if (f_) { f_(); } } template inline void set_value(V&& v) { v_ = std::forward(v); } protected: template inline void _bind (F&& f) { f_ = std::forward(f); } protected: std::optional v_{}; std::function f_{}; }; template class future { template friend class asio2::detail::rpc_invoker_t; public: future() = delete; future(std::shared_ptr> defer) noexcept : defer_(std::move(defer)) { } ~future() = default; future(future&&) noexcept = default; future(future const&) = default; future& operator=(future&&) noexcept = default; future& operator=(future const&) = default; protected: std::shared_ptr> defer_{}; }; template class promise { template friend class asio2::detail::rpc_invoker_t; public: promise() = default; ~promise() = default; promise(promise&&) noexcept = default; promise(promise const&) = default; promise& operator=(promise&&) noexcept = default; promise& operator=(promise const&) = default; inline future get_future() const noexcept { return future{ defer_ }; } template inline void set_value(V&& v) { defer_->set_value(std::forward(v)); } protected: std::shared_ptr> defer_ = std::make_shared>(); }; //--------------------------------------------------------------------------------------------- // specialize for void //--------------------------------------------------------------------------------------------- template<> class response_defer { template friend class asio2::detail::rpc_invoker_t; public: response_defer() = default; ~response_defer() { ASIO2_ASSERT(f_); if (f_) { f_(); } } template inline void set_value() { v_ = '0'; } protected: template inline void _bind (F&& f) { f_ = std::forward(f); } protected: std::optional v_{}; std::function f_{}; }; template<> class future { template friend class asio2::detail::rpc_invoker_t; public: future() = delete; future(std::shared_ptr> defer) noexcept : defer_(std::move(defer)) { } ~future() = default; future(future&&) noexcept = default; future(future const&) = default; future& operator=(future&&) noexcept = default; future& operator=(future const&) = default; protected: std::shared_ptr> defer_{}; }; template<> class promise { template friend class asio2::detail::rpc_invoker_t; public: promise() = default; ~promise() = default; promise(promise&&) noexcept = default; promise(promise const&) = default; promise& operator=(promise&&) noexcept = default; promise& operator=(promise const&) = default; inline future get_future() const noexcept { return future{ defer_ }; } template inline void set_value() { defer_->set_value(); } protected: std::shared_ptr> defer_ = std::make_shared>(); }; } namespace asio2::detail { template struct rpc_result_t { using type = typename std::remove_cv_t>; }; template<> struct rpc_result_t { using type = std::int8_t; }; template class rpc_invoker_t { protected: struct dummy {}; public: using self = rpc_invoker_t; using fntype = std::function< bool(std::shared_ptr&, caller_t*, rpc_serializer&, rpc_deserializer&)>; /** * @brief constructor */ rpc_invoker_t() = default; /** * @brief destructor */ ~rpc_invoker_t() = default; rpc_invoker_t(rpc_invoker_t&& o) noexcept : rpc_invokers_(std::move(o.rpc_invokers_)) { } rpc_invoker_t(rpc_invoker_t const& o) : rpc_invokers_(o.rpc_invokers_) { } rpc_invoker_t& operator=(rpc_invoker_t&& o) noexcept { this->rpc_invokers_ = std::move(o.rpc_invokers_); } rpc_invoker_t& operator=(rpc_invoker_t const& o) { this->rpc_invokers_ = o.rpc_invokers_; } /** * @brief bind a rpc function * @param name - Function name in string format. * @param fun - Function object. * @param obj - A pointer or reference to a class object, this parameter can be none. * if fun is nonmember function, the obj param must be none, otherwise the obj must be the * the class object's pointer or reference. */ template inline self& bind(std::string name, F&& fun, C&&... obj) { asio2::trim_both(name); ASIO2_ASSERT(!name.empty()); if (name.empty()) return (*this); #if defined(_DEBUG) || defined(DEBUG) { #if defined(ASIO2_ENABLE_RPC_INVOKER_THREAD_SAFE) asio2::shared_locker guard(this->rpc_invoker_mutex_); #endif ASIO2_ASSERT(this->rpc_invokers_.find(name) == this->rpc_invokers_.end()); } #endif this->_bind(std::move(name), std::forward(fun), std::forward(obj)...); return (*this); } /** * @brief unbind a rpc function */ inline self& unbind(std::string const& name) { #if defined(ASIO2_ENABLE_RPC_INVOKER_THREAD_SAFE) asio2::unique_locker guard(this->rpc_invoker_mutex_); #endif this->rpc_invokers_.erase(name); return (*this); } /** * @brief find binded rpc function by name */ inline std::shared_ptr find(std::string const& name) { #if defined(ASIO2_ENABLE_RPC_INVOKER_THREAD_SAFE) asio2::shared_locker guard(this->rpc_invoker_mutex_); #endif if (auto iter = this->rpc_invokers_.find(name); iter != this->rpc_invokers_.end()) return iter->second; return nullptr; } protected: inline self& _invoker() noexcept { return (*this); } inline self const& _invoker() const noexcept { return (*this); } template inline void _bind(std::string name, F f) { #if defined(ASIO2_ENABLE_RPC_INVOKER_THREAD_SAFE) asio2::unique_locker guard(this->rpc_invoker_mutex_); #endif this->rpc_invokers_[std::move(name)] = std::make_shared(std::bind(&self::template _proxy, this, std::move(f), nullptr, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)); } template inline void _bind(std::string name, F f, C& c) { #if defined(ASIO2_ENABLE_RPC_INVOKER_THREAD_SAFE) asio2::unique_locker guard(this->rpc_invoker_mutex_); #endif this->rpc_invokers_[std::move(name)] = std::make_shared(std::bind(&self::template _proxy, this, std::move(f), std::addressof(c), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)); } template inline void _bind(std::string name, F f, C* c) { #if defined(ASIO2_ENABLE_RPC_INVOKER_THREAD_SAFE) asio2::unique_locker guard(this->rpc_invoker_mutex_); #endif this->rpc_invokers_[std::move(name)] = std::make_shared(std::bind(&self::template _proxy, this, std::move(f), c, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)); } template inline bool _proxy(F& f, C* c, std::shared_ptr& caller_ptr, caller_t* caller, rpc_serializer& sr, rpc_deserializer& dr) { using fun_traits_type = function_traits; return _argc_proxy(f, c, caller_ptr, caller, sr, dr); } template typename std::enable_if_t inline _argc_proxy(F& f, C* c, std::shared_ptr& caller_ptr, caller_t* caller, rpc_serializer& sr, rpc_deserializer& dr) { using fun_traits_type = function_traits; using fun_args_tuple = typename fun_traits_type::pod_tuple_type; using fun_ret_type = typename fun_traits_type::return_type; fun_args_tuple tp; detail::for_each_tuple(tp, [&dr](auto& elem) mutable { dr >> elem; }); return _invoke(f, c, caller_ptr, caller, sr, dr, std::move(tp)); } template typename std::enable_if_t inline _argc_proxy(F& f, C* c, std::shared_ptr& caller_ptr, caller_t* caller, rpc_serializer& sr, rpc_deserializer& dr) { detail::ignore_unused(caller); using fun_traits_type = function_traits; using fun_args_tuple = typename fun_traits_type::pod_tuple_type; using fun_ret_type = typename fun_traits_type::return_type; using arg0_type = typename std::remove_cv_t::type>>; if constexpr /**/ (std::is_same_v>) { auto tp = _body_args_tuple((fun_args_tuple*)0); detail::for_each_tuple(tp, [&dr](auto& elem) mutable { dr >> elem; }); auto tp_new = std::tuple_cat(std::tuple&>(caller_ptr), tp); return _invoke(f, c, caller_ptr, caller, sr, dr, std::move(tp_new)); } else if constexpr (std::is_same_v) { auto tp = _body_args_tuple((fun_args_tuple*)0); detail::for_each_tuple(tp, [&dr](auto& elem) mutable { dr >> elem; }); auto tp_new = std::tuple_cat(std::tuple(*caller), tp); return _invoke(f, c, caller_ptr, caller, sr, dr, std::move(tp_new)); } else { fun_args_tuple tp; detail::for_each_tuple(tp, [&dr](auto& elem) mutable { dr >> elem; }); return _invoke(f, c, caller_ptr, caller, sr, dr, std::move(tp)); } } template inline decltype(auto) _body_args_tuple(std::tuple* tp) { return (_body_args_tuple_impl(std::make_index_sequence{}, tp)); } template inline decltype(auto) _body_args_tuple_impl(std::index_sequence, std::tuple*) noexcept { return (std::tuple>::type...>{}); } template inline bool _invoke_with_future(F& f, C* c, std::shared_ptr& caller_ptr, caller_t* caller, rpc_serializer& sr, rpc_deserializer& dr, typename rpc_result_t::type r) { detail::ignore_unused(f, c, caller_ptr, caller, sr, dr); error_code ec = rpc::make_error_code(rpc::error::success); if (dr.buffer().in_avail() != 0) { ec = rpc::make_error_code(rpc::error::invalid_argument); } auto* defer = r.defer_.get(); detail::io_context_work_guard iocg(caller->io_->context().get_executor()); r.defer_->_bind( [caller_ptr, caller, &sr, ec, head = caller->header_, defer, iocg = std::move(iocg)]() mutable { detail::ignore_unused(caller_ptr, iocg); if (head.id() == static_cast(0)) return; // the "header_, async_send" should not appear in this "invoker" module, But I thought // for a long time and couldn't find of a good method to solve this problem. // the operator for "sr" must be in the io_context thread. asio::dispatch(caller->io_->context(), make_allocator(caller->wallocator(), [caller_ptr = std::move(caller_ptr), caller, &sr, ec, head = std::move(head), v = std::move(defer->v_)] () mutable { ASIO2_ASSERT(caller->io_->running_in_this_thread()); if (!caller->is_started()) return; head.type(rpc_type_rep); if (v.has_value() == false && (!ec)) { ec = rpc::make_error_code(rpc::error::no_data); } sr.reset(); sr << head; sr << ec; #if !defined(ASIO_NO_EXCEPTIONS) && !defined(BOOST_ASIO_NO_EXCEPTIONS) try { #endif if constexpr (!std::is_same_v, R>) { if (!ec) { sr << std::move(v.value()); // maybe throw some exception } } else { std::ignore = v; } caller->internal_async_send(std::move(caller_ptr), sr.str()); #if !defined(ASIO_NO_EXCEPTIONS) && !defined(BOOST_ASIO_NO_EXCEPTIONS) return; // not exception, return } catch (cereal::exception const&) { if (!ec) ec = rpc::make_error_code(rpc::error::invalid_argument); } catch (std::exception const&) { if (!ec) ec = rpc::make_error_code(rpc::error::unspecified_error); } // the error_code must not be 0. ASIO2_ASSERT(ec); // code run to here, it means that there has some exception. sr.reset(); sr << head; sr << ec; caller->internal_async_send(std::move(caller_ptr), sr.str()); #endif })); }); return true; } // async - return true, sync - return false template inline bool _invoke(F& f, C* c, std::shared_ptr& caller_ptr, caller_t* caller, rpc_serializer& sr, rpc_deserializer& dr, std::tuple&& tp) { detail::ignore_unused(caller_ptr, caller, sr, dr); if (caller_ptr) { detail::get_current_object>() = caller_ptr; } else { detail::get_current_object() = caller; } typename rpc_result_t::type r = _invoke_impl(f, c, std::make_index_sequence{}, std::move(tp)); if constexpr (detail::is_template_instance_of_v) { return _invoke_with_future(f, c, caller_ptr, caller, sr, dr, std::move(r)); } else if constexpr (!std::is_same_v) { sr << rpc::make_error_code(rpc::error::success); sr << r; return false; } else { sr << rpc::make_error_code(rpc::error::success); std::ignore = r; return false; } } template typename std::enable_if_t, typename rpc_result_t::type> inline _invoke_impl(F& f, C* c, std::index_sequence, std::tuple&& tp) { detail::ignore_unused(c); if constexpr (std::is_same_v, dummy>) return f(std::get(std::move(tp))...); else return (c->*f)(std::get(std::move(tp))...); } template typename std::enable_if_t, typename rpc_result_t::type> inline _invoke_impl(F& f, C* c, std::index_sequence, std::tuple&& tp) { detail::ignore_unused(c); if constexpr (std::is_same_v, dummy>) f(std::get(std::move(tp))...); else (c->*f)(std::get(std::move(tp))...); return 1; } protected: #if defined(ASIO2_ENABLE_RPC_INVOKER_THREAD_SAFE) mutable asio2::shared_mutexer rpc_invoker_mutex_; #endif std::unordered_map> rpc_invokers_ #if defined(ASIO2_ENABLE_RPC_INVOKER_THREAD_SAFE) ASIO2_GUARDED_BY(rpc_invoker_mutex_) #endif ; }; } #endif // !__ASIO2_RPC_INVOKER_HPP__