/* * Copyright (c) 2017-2023 zhllxt * * author : zhllxt * email : 37792738@qq.com * * Distributed under the Boost Software License, Version 1.0. (See accompanying * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) */ #ifndef __ASIO2_SERVER_HPP__ #define __ASIO2_SERVER_HPP__ #if defined(_MSC_VER) && (_MSC_VER >= 1200) #pragma once #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace asio2 { class server { public: inline constexpr static bool is_session() noexcept { return false; } inline constexpr static bool is_client () noexcept { return false; } inline constexpr static bool is_server () noexcept { return true ; } }; } namespace asio2::detail { ASIO2_CLASS_FORWARD_DECLARE_BASE; template class server_impl_t : public asio2::server , public object_t , public iopool_cp , public io_context_cp , public thread_id_cp , public event_queue_cp , public user_data_cp , public user_timer_cp , public post_cp , public condition_event_cp { ASIO2_CLASS_FRIEND_DECLARE_BASE; public: using super = object_t ; using self = server_impl_t; using iopoolcp = iopool_cp ; using args_type = typename session_t::args_type; using key_type = std::size_t; public: /** * @brief constructor */ template explicit server_impl_t(ThreadCountOrScheduler&& tcos) : object_t () , iopool_cp (std::forward(tcos)) , io_context_cp (iopoolcp::_get_io(0)) , thread_id_cp () , event_queue_cp () , user_data_cp () , user_timer_cp () , post_cp () , condition_event_cp() , rallocator_() , wallocator_() , listener_ () , sessions_ (this->io_, this->state_) { } /** * @brief destructor */ ~server_impl_t() { } /** * @brief start the server */ inline bool start() noexcept { ASIO2_ASSERT(this->io_->running_in_this_thread()); return true; } /** * @brief stop the server */ inline void stop() { ASIO2_ASSERT(this->io_->running_in_this_thread()); // can't use post, we need ensure when the derived stop is called, the chain // must be executed completed. this->derived().dispatch([this]() mutable { // close user custom timers this->_dispatch_stop_all_timers(); // close all posted timed tasks this->_dispatch_stop_all_timed_events(); // close all async_events this->notify_all_condition_events(); // destroy user data, maybe the user data is self shared_ptr, // if don't destroy it, will cause loop reference. // read/write user data in other thread which is not the io_context // thread maybe cause crash. this->user_data_.reset(); // destroy the ecs this->ecs_.reset(); }); } /** * @brief destroy the content of all member variables, this is used for solve the memory leaks. * After this function is called, this class object cannot be used again. */ inline void destroy() { derived_t& derive = this->derived(); derive.io_.reset(); derive.listener_.clear(); derive.destroy_iopool(); } /** * @brief check whether the server is started */ inline bool is_started() const noexcept { return (this->state_ == state_t::started); } /** * @brief check whether the server is stopped */ inline bool is_stopped() const noexcept { return (this->state_ == state_t::stopped); } /** * @brief get this object hash key */ inline key_type hash_key() const noexcept { return reinterpret_cast(this); } /** * @brief Asynchronous send data for each session * supporting multi data formats,see asio::buffer(...) in /asio/buffer.hpp * You can call this function on the communication thread and anywhere,it's multi thread safed. * PodType * : async_send("abc"); * PodType (&data)[N] : double m[10]; async_send(m); * std::array : std::array m; async_send(m); * std::vector : std::vector m; async_send(m); * std::basic_string : std::string m; async_send(m); */ template inline derived_t & async_send(const T& data) { this->sessions_.quick_for_each([&data](std::shared_ptr& session_ptr) mutable { session_ptr->async_send(data); }); return this->derived(); } /** * @brief Asynchronous send data for each session * You can call this function on the communication thread and anywhere,it's multi thread safed. * PodType * : async_send("abc"); */ template> inline typename std::enable_if_t, derived_t&> async_send(CharT* s) { return this->async_send(s, s ? Traits::length(s) : 0); } /** * @brief Asynchronous send data for each session * You can call this function on the communication thread and anywhere,it's multi thread safed. * PodType (&data)[N] : double m[10]; async_send(m,5); */ template inline typename std::enable_if_t>, derived_t&> async_send(CharT* s, SizeT count) { if (s) { this->sessions_.quick_for_each([s, count](std::shared_ptr& session_ptr) mutable { session_ptr->async_send(s, count); }); } return this->derived(); } public: /** * @brief get the acceptor reference, derived classes must override this function */ inline auto & acceptor() noexcept { return this->derived().acceptor(); } /** * @brief get the acceptor reference, derived classes must override this function */ inline auto const& acceptor() const noexcept { return this->derived().acceptor(); } /** * @brief get the listen address, same as get_listen_address */ inline std::string listen_address() const noexcept { return this->get_listen_address(); } /** * @brief get the listen address */ inline std::string get_listen_address() const noexcept { try { return this->acceptor().local_endpoint().address().to_string(); } catch (system_error & e) { set_last_error(e); } return std::string(); } /** * @brief get the listen port, same as get_listen_port */ inline unsigned short listen_port() const noexcept { return this->get_listen_port(); } /** * @brief get the listen port */ inline unsigned short get_listen_port() const noexcept { return this->acceptor().local_endpoint(get_last_error()).port(); } /** * @brief get connected session count, same as get_session_count */ inline std::size_t session_count() const noexcept { return this->get_session_count(); } /** * @brief get connected session count */ inline std::size_t get_session_count() const noexcept { return this->sessions_.size(); } /** * @brief Applies the given function object fn for each session. * @param fn - The handler to be called for each session. * Function signature : * void(std::shared_ptr& session_ptr) */ template inline derived_t & foreach_session(Fun&& fn) { this->sessions_.for_each(std::forward(fn)); return this->derived(); } /** * @brief find the session by session's hash key */ template inline std::shared_ptr find_session(const KeyType& key) { return this->sessions_.find(key); } /** * @brief find the session by user custom role * @param fn - The handler to be called when search the session. * Function signature : * bool(std::shared_ptr& session_ptr) * @return std::shared_ptr */ template inline std::shared_ptr find_session_if(Fun&& fn) { return std::shared_ptr(this->sessions_.find_if(std::forward(fn))); } protected: /** * @brief get the recv/read allocator object reference */ inline auto & rallocator() noexcept { return this->rallocator_; } /** * @brief get the send/write/post allocator object reference */ inline auto & wallocator() noexcept { return this->wallocator_; } inline session_mgr_t & sessions() noexcept { return this->sessions_; } inline listener_t & listener() noexcept { return this->listener_; } inline std::atomic & state () noexcept { return this->state_; } protected: // The memory to use for handler-based custom memory allocation. used for acceptor. handler_memory> rallocator_; /// The memory to use for handler-based custom memory allocation. used fo send/write/post. handler_memory> wallocator_; /// listener listener_t listener_; /// state std::atomic state_ = state_t::stopped; /// session_mgr session_mgr_t sessions_; /// use this to ensure that server stop only after all sessions are closed std::shared_ptr counter_ptr_; /// the pointer of ecs_t std::shared_ptr ecs_; #if defined(_DEBUG) || defined(DEBUG) std::atomic post_send_counter_ = 0; std::atomic post_recv_counter_ = 0; #endif }; } #include #endif // !__ASIO2_SERVER_HPP__