/* * Copyright (c) 2017-2023 zhllxt * * author : zhllxt * email : 37792738@qq.com * * Distributed under the Boost Software License, Version 1.0. (See accompanying * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) */ #ifndef __ASIO2_WS_SESSION_HPP__ #define __ASIO2_WS_SESSION_HPP__ #if defined(_MSC_VER) && (_MSC_VER >= 1200) #pragma once #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) #include #include #include #include #include #include namespace asio2::detail { struct template_args_ws_session : public template_args_tcp_session { using stream_t = websocket::stream; using body_t = http::string_body; using buffer_t = beast::flat_buffer; }; ASIO2_CLASS_FORWARD_DECLARE_BASE; ASIO2_CLASS_FORWARD_DECLARE_TCP_BASE; ASIO2_CLASS_FORWARD_DECLARE_TCP_SERVER; ASIO2_CLASS_FORWARD_DECLARE_TCP_SESSION; template class ws_session_impl_t : public tcp_session_impl_t , public ws_stream_cp , public ws_send_op { ASIO2_CLASS_FRIEND_DECLARE_BASE; ASIO2_CLASS_FRIEND_DECLARE_TCP_BASE; ASIO2_CLASS_FRIEND_DECLARE_TCP_SERVER; ASIO2_CLASS_FRIEND_DECLARE_TCP_SESSION; public: using super = tcp_session_impl_t; using self = ws_session_impl_t ; using args_type = args_t; using key_type = std::size_t; using body_type = typename args_t::body_t; using buffer_type = typename args_t::buffer_t; using ws_stream_comp = ws_stream_cp; using super::send; using super::async_send; public: /** * @brief constructor */ explicit ws_session_impl_t( session_mgr_t & sessions, listener_t & listener, std::shared_ptr rwio, std::size_t init_buf_size, std::size_t max_buf_size ) : super(sessions, listener, std::move(rwio), init_buf_size, max_buf_size) , ws_stream_cp() , ws_send_op () { } /** * @brief destructor */ ~ws_session_impl_t() { } /** * @brief destroy the content of all member variables, this is used for solve the memory leaks. * After this function is called, this class object cannot be used again. */ inline void destroy() { derived_t& derive = this->derived(); derive.ws_stream_.reset(); super::destroy(); } /** * @brief return the websocket stream object reference */ inline typename args_t::stream_t & stream() noexcept { return this->derived().ws_stream(); } /** * @brief return the websocket stream object reference */ inline typename args_t::stream_t const& stream() const noexcept { return this->derived().ws_stream(); } public: /** * @brief get this object hash key,used for session map */ inline key_type hash_key() const noexcept { return reinterpret_cast(this); } /** * @brief get the websocket upgraged request object */ inline websocket::request_type& get_upgrade_request() noexcept { return this->upgrade_req_; } /** * @brief get the websocket upgraged request object */ inline const websocket::request_type& get_upgrade_request() const noexcept { return this->upgrade_req_; } protected: inline typename super::socket_type& upgrade_stream() noexcept { return this->socket(); } inline typename super::socket_type const& upgrade_stream() const noexcept { return this->socket(); } template inline void _do_init(std::shared_ptr& this_ptr, std::shared_ptr>& ecs) { super::_do_init(this_ptr, ecs); this->derived()._ws_init(ecs, this->derived().socket()); } template inline void _post_shutdown(const error_code& ec, std::shared_ptr this_ptr, DeferEvent chain) { ASIO2_LOG_DEBUG("ws_session::_post_shutdown: {} {}", ec.value(), ec.message()); this->derived()._ws_stop(this_ptr, defer_event { [this, ec, this_ptr, e = chain.move_event()] (event_queue_guard g) mutable { super::_post_shutdown(ec, std::move(this_ptr), defer_event(std::move(e), std::move(g))); }, chain.move_guard() }); } template inline void _handle_connect( const error_code& ec, std::shared_ptr this_ptr, std::shared_ptr> ecs, DeferEvent chain) { detail::ignore_unused(ec); derived_t& derive = this->derived(); ASIO2_ASSERT(!ec); ASIO2_ASSERT(derive.sessions_.io_->running_in_this_thread()); asio::dispatch(this->io_->context(), make_allocator(this->wallocator_, [&derive, this_ptr = std::move(this_ptr), ecs = std::move(ecs), chain = std::move(chain)] () mutable { derive._ws_start(this_ptr, ecs, derive.socket()); derive._post_read_upgrade_request(std::move(this_ptr), std::move(ecs), std::move(chain)); })); } template inline bool _do_send(Data& data, Callback&& callback) { return this->derived()._ws_send(data, std::forward(callback)); } protected: template inline void _post_recv(std::shared_ptr this_ptr, std::shared_ptr> ecs) { this->derived()._ws_post_recv(std::move(this_ptr), std::move(ecs)); } template inline void _handle_recv( const error_code& ec, std::size_t bytes_recvd, std::shared_ptr this_ptr, std::shared_ptr> ecs) { this->derived()._ws_handle_recv(ec, bytes_recvd, std::move(this_ptr), std::move(ecs)); } inline void _fire_upgrade(std::shared_ptr& this_ptr) { // the _fire_upgrade must be executed in the thread 0. ASIO2_ASSERT(this->sessions_.io_->running_in_this_thread()); this->listener_.notify(event_type::upgrade, this_ptr); } protected: websocket::request_type upgrade_req_; }; } namespace asio2 { using ws_session_args = detail::template_args_ws_session; template using ws_session_impl_t = detail::ws_session_impl_t; template class ws_session_t : public detail::ws_session_impl_t { public: using detail::ws_session_impl_t::ws_session_impl_t; }; class ws_session : public ws_session_t { public: using ws_session_t::ws_session_t; }; } #if defined(ASIO2_INCLUDE_RATE_LIMIT) #include namespace asio2 { struct ws_rate_session_args : public ws_session_args { using socket_t = asio2::tcp_stream; using stream_t = websocket::stream; }; template class ws_rate_session_t : public asio2::ws_session_impl_t { public: using asio2::ws_session_impl_t::ws_session_impl_t; }; class ws_rate_session : public asio2::ws_rate_session_t { public: using asio2::ws_rate_session_t::ws_rate_session_t; }; } #endif #include #endif // !__ASIO2_WS_SESSION_HPP__