/* * Copyright (c) 2017-2023 zhllxt * * author : zhllxt * email : 37792738@qq.com * * Distributed under the Boost Software License, Version 1.0. (See accompanying * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) */ #ifndef __ASIO2_TCP_SERVER_HPP__ #define __ASIO2_TCP_SERVER_HPP__ #if defined(_MSC_VER) && (_MSC_VER >= 1200) #pragma once #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) #include #include #include namespace asio2::detail { ASIO2_CLASS_FORWARD_DECLARE_BASE; ASIO2_CLASS_FORWARD_DECLARE_TCP_BASE; ASIO2_CLASS_FORWARD_DECLARE_TCP_SERVER; template class tcp_server_impl_t : public server_impl_t, public tcp_tag { ASIO2_CLASS_FRIEND_DECLARE_BASE; ASIO2_CLASS_FRIEND_DECLARE_TCP_BASE; ASIO2_CLASS_FRIEND_DECLARE_TCP_SERVER; public: using super = server_impl_t ; using self = tcp_server_impl_t; using session_type = session_t; public: /** * @brief constructor */ explicit tcp_server_impl_t( std::size_t init_buf_size = tcp_frame_size, std::size_t max_buf_size = max_buffer_size, std::size_t concurrency = default_concurrency() + 1 // The 1 is used for tcp acceptor ) : super(concurrency) , acceptor_ (std::make_unique(this->io_->context())) , acceptor_timer_ (std::make_unique(this->io_->context())) , counter_timer_ (std::make_unique(this->io_->context())) , init_buffer_size_(init_buf_size) , max_buffer_size_ ( max_buf_size) { } template>, int> = 0> explicit tcp_server_impl_t( std::size_t init_buf_size, std::size_t max_buf_size, Scheduler&& scheduler ) : super(std::forward(scheduler)) , acceptor_ (std::make_unique(this->io_->context())) , acceptor_timer_ (std::make_unique(this->io_->context())) , counter_timer_ (std::make_unique(this->io_->context())) , init_buffer_size_(init_buf_size) , max_buffer_size_ ( max_buf_size) { } template>, int> = 0> explicit tcp_server_impl_t(Scheduler&& scheduler) : tcp_server_impl_t(tcp_frame_size, max_buffer_size, std::forward(scheduler)) { } // -- Support initializer_list causes the code of inherited classes to be not concised //template>, int> = 0> //explicit tcp_server_impl_t( // std::size_t init_buf_size, // std::size_t max_buf_size, // std::initializer_list scheduler //) // : tcp_server_impl_t(init_buf_size, max_buf_size, std::vector{std::move(scheduler)}) //{ //} //template>, int> = 0> //explicit tcp_server_impl_t(std::initializer_list scheduler) // : tcp_server_impl_t(tcp_frame_size, max_buffer_size, std::move(scheduler)) //{ //} /** * @brief destructor */ ~tcp_server_impl_t() { this->stop(); } /** * @brief start the server * @param host - A string identifying a location. May be a descriptive name or * a numeric address string. * @param service - A string identifying the requested service. This may be a * descriptive name or a numeric string corresponding to a port number. * @param args - The delimiter condition.Valid value types include the following: * char,std::string,std::string_view, * function:std::pair match_condition(iterator begin, iterator end), * asio::transfer_at_least,asio::transfer_exactly * more details see asio::read_until */ template inline bool start(String&& host, StrOrInt&& service, Args&&... args) { return this->derived()._do_start( std::forward(host), std::forward(service), ecs_helper::make_ecs(asio::transfer_at_least(1), std::forward(args)...)); } /** * @brief stop the server * You can call this function on the communication thread and anywhere to stop the server. */ inline void stop() { if (this->is_iopool_stopped()) return; derived_t& derive = this->derived(); derive.io_->unregobj(&derive); derive.post([&derive]() mutable { derive._do_stop(asio::error::operation_aborted, derive.selfptr()); }); this->stop_iopool(); // asio bug , see : https://www.boost.org/users/history/version_1_72_0.html // Fixed a lost "outstanding work count" that can occur when an asynchronous // accept operation is automatically restarted. // Using boost 1.72.0 or above version can avoid this problem (asio 1.16.0) } /** * @brief destroy the content of all member variables, this is used for solve the memory leaks. * After this function is called, this class object cannot be used again. */ inline void destroy() { derived_t& derive = this->derived(); derive.counter_timer_.reset(); derive.acceptor_timer_.reset(); derive.acceptor_.reset(); super::destroy(); } /** * @brief check whether the server is started */ inline bool is_started() const { return (super::is_started() && this->acceptor_->is_open()); } /** * @brief check whether the server is stopped */ inline bool is_stopped() const { return (this->state_ == state_t::stopped && !this->acceptor_->is_open()); } public: /** * @brief bind recv listener * @param fun - a user defined callback function. * @param obj - a pointer or reference to a class object, this parameter can be none. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the * the class object's pointer or reference. * Function signature : void(std::shared_ptr& session_ptr, std::string_view data) */ template inline derived_t & bind_recv(F&& fun, C&&... obj) { this->listener_.bind(event_type::recv, observer_t&, std::string_view>( std::forward(fun), std::forward(obj)...)); return (this->derived()); } /** * @brief bind accept listener * @param fun - a user defined callback function. * @param obj - a pointer or reference to a class object, this parameter can be none. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the * the class object's pointer or reference. * This notification is invoked immediately when the server accept a new connection * Function signature : void(std::shared_ptr& session_ptr) */ template inline derived_t & bind_accept(F&& fun, C&&... obj) { this->listener_.bind(event_type::accept, observer_t&>( std::forward(fun), std::forward(obj)...)); return (this->derived()); } /** * @brief bind connect listener * @param fun - a user defined callback function. * @param obj - a pointer or reference to a class object, this parameter can be none. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the * the class object's pointer or reference. * This notification is invoked after the connection is fully established, * and only after the connect/handshake/upgrade are completed. * Function signature : void(std::shared_ptr& session_ptr) */ template inline derived_t & bind_connect(F&& fun, C&&... obj) { this->listener_.bind(event_type::connect, observer_t&>( std::forward(fun), std::forward(obj)...)); return (this->derived()); } /** * @brief bind disconnect listener * @param fun - a user defined callback function. * @param obj - a pointer or reference to a class object, this parameter can be none. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the * the class object's pointer or reference. * This notification is invoked before the connection is disconnected, you can call * get_last_error/last_error_msg, etc, to get the disconnected error information * Function signature : void(std::shared_ptr& session_ptr) * If is http or websocket server, when enter the disconnect callback, the socket of the * session maybe closed already. */ template inline derived_t & bind_disconnect(F&& fun, C&&... obj) { this->listener_.bind(event_type::disconnect, observer_t&>( std::forward(fun), std::forward(obj)...)); return (this->derived()); } /** * @brief bind init listener,we should set socket options at here * @param fun - a user defined callback function. * @param obj - a pointer or reference to a class object, this parameter can be none. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the * the class object's pointer or reference. * This notification is called after the socket is opened. * You can set the socket option in this notification. * Function signature : void() */ template inline derived_t & bind_init(F&& fun, C&&... obj) { this->listener_.bind(event_type::init, observer_t<>( std::forward(fun), std::forward(obj)...)); return (this->derived()); } /** * @brief bind start listener * @param fun - a user defined callback function. * @param obj - a pointer or reference to a class object, this parameter can be none. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the * the class object's pointer or reference. * This notification is called after the server starts up, whether successful or unsuccessful * Function signature : void() */ template inline derived_t & bind_start(F&& fun, C&&... obj) { this->listener_.bind(event_type::start, observer_t<>( std::forward(fun), std::forward(obj)...)); return (this->derived()); } /** * @brief bind stop listener * @param fun - a user defined callback function. * @param obj - a pointer or reference to a class object, this parameter can be none. * @li if fun is nonmember function, the obj param must be none, otherwise the obj must be the * the class object's pointer or reference. * This notification is called before the server is ready to stop * Function signature : void() */ template inline derived_t & bind_stop(F&& fun, C&&... obj) { this->listener_.bind(event_type::stop, observer_t<>( std::forward(fun), std::forward(obj)...)); return (this->derived()); } public: /** * @brief get the acceptor reference */ inline asio::ip::tcp::acceptor & acceptor() noexcept { return *(this->acceptor_); } /** * @brief get the acceptor reference */ inline asio::ip::tcp::acceptor const& acceptor() const noexcept { return *(this->acceptor_); } protected: template inline bool _do_start(String&& host, StrOrInt&& port, std::shared_ptr> ecs) { derived_t& derive = this->derived(); // if log is enabled, init the log first, otherwise when "Too many open files" error occurs, // the log file will be created failed too. #if defined(ASIO2_ENABLE_LOG) asio2::detail::get_logger(); #endif this->start_iopool(); if (!this->is_iopool_started()) { set_last_error(asio::error::operation_aborted); return false; } asio::dispatch(derive.io_->context(), [&derive, this_ptr = derive.selfptr()]() mutable { detail::ignore_unused(this_ptr); // init the running thread id derive.io_->init_thread_id(); }); // use promise to get the result of async accept std::promise promise; std::future future = promise.get_future(); // use derfer to ensure the promise's value must be seted. detail::defer_event pg { [promise = std::move(promise)]() mutable { promise.set_value(get_last_error()); } }; derive.post( [this, this_ptr = derive.selfptr(), ecs = std::move(ecs), pg = std::move(pg), host = std::forward(host), port = std::forward(port)] () mutable { derived_t& derive = this->derived(); state_t expected = state_t::stopped; if (!this->state_.compare_exchange_strong(expected, state_t::starting)) { // if the state is not stopped, set the last error to already_started set_last_error(asio::error::already_started); return; } // must read/write ecs in the io_context thread. derive.ecs_ = ecs; derive.io_->regobj(&derive); #if defined(_DEBUG) || defined(DEBUG) this->sessions_.is_all_session_stop_called_ = false; this->is_stop_called_ = false; #endif // convert to string maybe throw some exception. std::string h = detail::to_string(std::move(host)); std::string p = detail::to_string(std::move(port)); expected = state_t::starting; if (!this->state_.compare_exchange_strong(expected, state_t::starting)) { ASIO2_ASSERT(false); derive._handle_start(asio::error::operation_aborted, std::move(this_ptr), std::move(ecs)); return; } super::start(); this->counter_ptr_ = std::shared_ptr((void*)1, [&derive](void*) mutable { derive._exec_stop(asio::error::operation_aborted, derive.selfptr()); }); error_code ec, ec_ignore; this->acceptor_->cancel(ec_ignore); this->acceptor_->close(ec_ignore); // parse address and port asio::ip::tcp::resolver resolver(this->io_->context()); auto results = resolver.resolve(h, p, asio::ip::resolver_base::flags::passive | asio::ip::resolver_base::flags::address_configured, ec); if (ec) { derive._handle_start(ec, std::move(this_ptr), std::move(ecs)); return; } if (results.empty()) { derive._handle_start(asio::error::host_not_found, std::move(this_ptr), std::move(ecs)); return; } asio::ip::tcp::endpoint endpoint = *results.begin(); this->acceptor_->open(endpoint.protocol(), ec); if (ec) { derive._handle_start(ec, std::move(this_ptr), std::move(ecs)); return; } // when you close socket in linux system,and start socket // immediate,you will get like this "the address is in use", // and bind is failed,but i'm suer i close the socket correct // already before,why does this happen? the reasion is the // socket option "TIME_WAIT",although you close the socket, // but the system not release the socket,util 2~4 seconds // later,so we can use the SO_REUSEADDR option to avoid this // problem,like below // set port reuse this->acceptor_->set_option(asio::ip::tcp::acceptor::reuse_address(true), ec_ignore); clear_last_error(); derive._fire_init(); this->acceptor_->bind(endpoint, ec); if (ec) { derive._handle_start(ec, std::move(this_ptr), std::move(ecs)); return; } this->acceptor_->listen(asio::socket_base::max_listen_connections, ec); if (ec) { derive._handle_start(ec, std::move(this_ptr), std::move(ecs)); return; } // if the some error occured in the _fire_init notify function, the // get_last_error maybe not zero, so if we use _handle_start(get_last_error()... // at here, the start will failed, and the user don't know what happend. // so we need use as this : _handle_start(error_code{}... derive._handle_start(ec, std::move(this_ptr), std::move(ecs)); }); if (!derive.io_->running_in_this_thread()) { set_last_error(future.get()); return static_cast(!get_last_error()); } else { set_last_error(asio::error::in_progress); } // if the state is stopped , the return value is "is_started()". // if the state is stopping, the return value is false, the last error is already_started // if the state is starting, the return value is false, the last error is already_started // if the state is started , the return value is true , the last error is already_started return derive.is_started(); } template inline void _handle_start(error_code ec, std::shared_ptr this_ptr, std::shared_ptr> ecs) { ASIO2_ASSERT(this->derived().io_->running_in_this_thread()); // Whether the startup succeeds or fails, always call fire_start notification state_t expected = state_t::starting; if (!ec) if (!this->state_.compare_exchange_strong(expected, state_t::started)) ec = asio::error::operation_aborted; set_last_error(ec); this->derived()._fire_start(); expected = state_t::started; if (!ec) if (!this->state_.compare_exchange_strong(expected, state_t::started)) ec = asio::error::operation_aborted; if (ec) { this->derived()._do_stop(ec, std::move(this_ptr)); return; } this->derived()._post_accept(std::move(this_ptr), std::move(ecs)); } inline void _do_stop(const error_code& ec, std::shared_ptr this_ptr) { ASIO2_ASSERT(this->derived().io_->running_in_this_thread()); state_t expected = state_t::starting; if (this->state_.compare_exchange_strong(expected, state_t::stopping)) return this->derived()._post_stop(ec, std::move(this_ptr), expected); expected = state_t::started; if (this->state_.compare_exchange_strong(expected, state_t::stopping)) return this->derived()._post_stop(ec, std::move(this_ptr), expected); } inline void _post_stop(const error_code& ec, std::shared_ptr this_ptr, state_t old_state) { // asio don't allow operate the same socket in multi thread, // if you close socket in one thread and another thread is // calling socket's async_... function,it will crash.so we // must care for operate the socket. when need close the // socket ,we use the io_context to post a event,make sure the // socket's close operation is in the same thread. asio::dispatch(this->derived().io_->context(), make_allocator(this->derived().wallocator(), [this, ec, old_state, this_ptr = std::move(this_ptr)]() mutable { detail::ignore_unused(this, ec, old_state, this_ptr); set_last_error(ec); ASIO2_ASSERT(this->state_ == state_t::stopping); // start timer to hold the acceptor io_context // should hold the server shared ptr too, if server is constructed with iopool, and // server is a tmp local variable, then the server maybe destroyed before sessions. // so we need hold this ptr to ensure server must be destroyed after sessions. this->counter_timer_->expires_after((std::chrono::nanoseconds::max)()); this->counter_timer_->async_wait([this_ptr](const error_code&) { detail::ignore_unused(this_ptr); }); // stop all the sessions, the session::stop must be no blocking, // otherwise it may be cause loop lock. this->sessions_.quick_for_each([](std::shared_ptr & session_ptr) mutable { session_ptr->stop(); }); #if defined(_DEBUG) || defined(DEBUG) // Check whether all sessions are evenly distributed in io threads std::vector> iots; std::vector session_counter; iots.resize(this->iopool().size()); session_counter.resize(this->iopool().size()); for (std::size_t i = 0; i < iots.size(); ++i) { iots[i] = this->_get_io(i); } this->sessions_.quick_for_each([&iots, &session_counter](std::shared_ptr& session_ptr) mutable { for (std::size_t i = 0; i < iots.size(); ++i) { if (session_ptr->io_ == iots[i]) { session_counter[i]++; break; } } }); if (iots.size() > std::size_t(2) && this->get_session_count() > ((iots.size() - 1) * 5)) { ASIO2_ASSERT(session_counter[0] == 0); int count_diff = (std::max)(int(this->get_session_count() / (iots.size() - 1) / 10), 10); for (std::size_t i = 1; i < iots.size(); ++i) { ASIO2_ASSERT(std::abs(session_counter[1] - session_counter[i]) < count_diff); } } asio2::ignore_unused(iots, session_counter); asio2::ignore_unused(this->sessions_.empty()); // used to test ThreadSafetyAnalysis #endif #if defined(_DEBUG) || defined(DEBUG) this->sessions_.is_all_session_stop_called_ = true; #endif if (this->counter_ptr_) { this->counter_ptr_.reset(); } else { this->derived()._exec_stop(ec, std::move(this_ptr)); } })); } inline void _exec_stop(const error_code& ec, std::shared_ptr this_ptr) { // use asio::post to ensure this server's _handle_stop is called must be after // all sessions _handle_stop has been called already. // if use asio::dispatch, session's _handle_stop maybe called first. asio::post(this->derived().io_->context(), make_allocator(this->derived().wallocator(), [this, ec, this_ptr = std::move(this_ptr)]() mutable { state_t expected = state_t::stopping; if (this->state_.compare_exchange_strong(expected, state_t::stopped)) { this->derived()._handle_stop(ec, std::move(this_ptr)); } else { ASIO2_ASSERT(false); } })); } inline void _handle_stop(const error_code& ec, std::shared_ptr this_ptr) { set_last_error(ec); this->derived()._fire_stop(); detail::cancel_timer(*(this->acceptor_timer_)); detail::cancel_timer(*(this->counter_timer_)); // call the base class stop function super::stop(); error_code ec_ignore{}; // call acceptor's close function to notify the _handle_accept // function response with error > 0 , then the listen socket // can get notify to exit must ensure the close function has // been called,otherwise the _handle_accept will never return this->acceptor_->cancel(ec_ignore); this->acceptor_->close(ec_ignore); ASIO2_ASSERT(this->state_ == state_t::stopped); } template inline std::shared_ptr _make_session(Args&&... args) { // skip zero io, the 0 io is used for acceptor. // but if the iopool size is 1, this io will be the zero io forever. std::shared_ptr iot; if (this->iots_.size() > std::size_t(1)) { iot = this->_get_io(); if (iot == this->_get_io(0)) iot = this->_get_io(); } else { iot = this->_get_io(); } return std::make_shared(std::forward(args)..., this->sessions_, this->listener_, std::move(iot), this->init_buffer_size_, this->max_buffer_size_); } template inline void _post_accept(std::shared_ptr this_ptr, std::shared_ptr> ecs) { ASIO2_ASSERT(this->derived().io_->running_in_this_thread()); if (!this->derived().is_started()) return; std::shared_ptr session_ptr = this->derived()._make_session(); #if defined(_DEBUG) || defined(DEBUG) ASIO2_ASSERT(this->derived().post_recv_counter_.load() == 0); this->derived().post_recv_counter_++; #endif asio::io_context& ex = session_ptr->io_->context(); this->acceptor_->async_accept(ex, make_allocator(this->rallocator_, [this, sptr = std::move(session_ptr), this_ptr = std::move(this_ptr), ecs = std::move(ecs)] (const error_code& ec, asio::ip::tcp::socket peer) mutable { #if defined(_DEBUG) || defined(DEBUG) this->derived().post_recv_counter_--; #endif sptr->socket().lowest_layer() = std::move(peer); this->derived()._handle_accept(ec, std::move(sptr), std::move(this_ptr), std::move(ecs)); })); } template inline void _handle_accept( const error_code& ec, std::shared_ptr session_ptr, std::shared_ptr this_ptr, std::shared_ptr> ecs) { set_last_error(ec); // if the acceptor status is closed,don't call _post_accept again. if (ec == asio::error::operation_aborted) return; if (!this->derived().is_started()) return; session_ptr->counter_ptr_ = this->counter_ptr_; session_ptr->start(detail::to_shared_ptr(ecs->clone())); // handle exception, may be is the exception "Too many open files" (exception code : 24) // asio::error::no_descriptors - Too many open files if (ec) { ASIO2_LOG_ERROR("Error occurred when accept:{} {}", ec.value(), ec.message()); this->acceptor_timer_->expires_after(std::chrono::seconds(1)); this->acceptor_timer_->async_wait( [this, this_ptr = std::move(this_ptr), ecs = std::move(ecs)] (const error_code& ec) mutable { if (ec == asio::error::operation_aborted) return; asio::post(this->io_->context(), make_allocator(this->wallocator(), [this, this_ptr = std::move(this_ptr), ecs = std::move(ecs)]() mutable { this->derived()._post_accept(std::move(this_ptr), std::move(ecs)); })); }); return; } this->derived()._post_accept(std::move(this_ptr), std::move(ecs)); } inline void _fire_init() { // the _fire_init must be executed in the thread 0. ASIO2_ASSERT(this->derived().io_->running_in_this_thread()); ASIO2_ASSERT(!get_last_error()); this->listener_.notify(event_type::init); } inline void _fire_start() { // the _fire_start must be executed in the thread 0. ASIO2_ASSERT(this->derived().io_->running_in_this_thread()); #if defined(_DEBUG) || defined(DEBUG) ASIO2_ASSERT(this->is_stop_called_ == false); #endif this->listener_.notify(event_type::start); } inline void _fire_stop() { // the _fire_stop must be executed in the thread 0. ASIO2_ASSERT(this->derived().io_->running_in_this_thread()); #if defined(_DEBUG) || defined(DEBUG) this->is_stop_called_ = true; #endif this->listener_.notify(event_type::stop); } protected: /// acceptor to accept client connection std::unique_ptr acceptor_; /// timer for acceptor exception, like the exception "Too many open files" (exception code : 24) std::unique_ptr acceptor_timer_; /// used to hold the acceptor io_context util all sessions are closed already. std::unique_ptr counter_timer_; std::size_t init_buffer_size_ = tcp_frame_size; std::size_t max_buffer_size_ = max_buffer_size; #if defined(_DEBUG) || defined(DEBUG) bool is_stop_called_ = false; #endif }; } namespace asio2 { template using tcp_server_impl_t = detail::tcp_server_impl_t; /** * @throws constructor maybe throw exception "Too many open files" (exception code : 24) * asio::error::no_descriptors - Too many open files */ template class tcp_server_t : public detail::tcp_server_impl_t, session_t> { public: using detail::tcp_server_impl_t, session_t>::tcp_server_impl_t; }; /** * @brief tcp server * If this object is created as a shared_ptr like std::shared_ptr server; * you must call the server->stop() manual when exit, otherwise maybe cause memory leaks. * @throws constructor maybe throw exception "Too many open files" (exception code : 24) * asio::error::no_descriptors - Too many open files */ using tcp_server = tcp_server_t; } #if defined(ASIO2_INCLUDE_RATE_LIMIT) #include namespace asio2 { template class tcp_rate_server_t : public asio2::tcp_server_impl_t, session_t> { public: using asio2::tcp_server_impl_t, session_t>::tcp_server_impl_t; }; using tcp_rate_server = tcp_rate_server_t; } #endif #include #endif // !__ASIO2_TCP_SERVER_HPP__