ws_stream_cp.hpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. */
  10. #ifndef __ASIO2_WS_STREAM_COMPONENT_HPP__
  11. #define __ASIO2_WS_STREAM_COMPONENT_HPP__
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. #pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <memory>
  16. #include <future>
  17. #include <utility>
  18. #include <string_view>
  19. #include <asio2/external/asio.hpp>
  20. #include <asio2/external/beast.hpp>
  21. #include <asio2/base/error.hpp>
  22. #include <asio2/base/detail/util.hpp>
  23. namespace asio2::detail
  24. {
  25. template<class, class = std::void_t<>>
  26. struct is_websocket_client : std::false_type {};
  27. template<class T>
  28. struct is_websocket_client<T, std::void_t<decltype(std::declval<T&>().ws_stream())>> : std::true_type {};
  29. template<class, class = std::void_t<>>
  30. struct is_websocket_server : std::false_type {};
  31. template<class T>
  32. struct is_websocket_server<T, std::void_t<decltype(std::declval<typename T::session_type&>().
  33. ws_stream())>> : std::true_type {};
  34. template<class derived_t, class args_t>
  35. class ws_stream_cp : public detail::ws_stream_tag
  36. {
  37. public:
  38. using ws_stream_type = typename args_t::stream_t;
  39. /**
  40. * @brief constructor
  41. */
  42. ws_stream_cp() {}
  43. /**
  44. * @brief destructor
  45. */
  46. ~ws_stream_cp() noexcept {}
  47. /**
  48. * @brief get the websocket stream object reference
  49. */
  50. inline ws_stream_type & ws_stream() noexcept
  51. {
  52. ASIO2_ASSERT(bool(this->ws_stream_));
  53. return (*(this->ws_stream_));
  54. }
  55. /**
  56. * @brief get the websocket stream object reference
  57. */
  58. inline ws_stream_type const& ws_stream() const noexcept
  59. {
  60. ASIO2_ASSERT(bool(this->ws_stream_));
  61. return (*(this->ws_stream_));
  62. }
  63. protected:
  64. template<typename C, typename Socket>
  65. inline void _ws_init(std::shared_ptr<ecs_t<C>>& ecs, Socket& socket)
  66. {
  67. derived_t& derive = static_cast<derived_t&>(*this);
  68. detail::ignore_unused(derive, ecs, socket);
  69. // In previous versions, "_ws_init" maybe called multi times, when "_ws_init" was called,
  70. // the "_ws_stop" maybe called at the same time, but "_ws_init" and "_ws_stop" was called
  71. // at different threads, this will cause the diffrent threads "read, write" the "ws_stream_"
  72. // variable, and then cause crash.
  73. if constexpr (args_t::is_client)
  74. {
  75. ASIO2_ASSERT(derive.io_->running_in_this_thread());
  76. }
  77. else
  78. {
  79. ASIO2_ASSERT(derive.sessions_.io_->running_in_this_thread());
  80. }
  81. this->ws_stream_ = std::make_unique<ws_stream_type>(socket);
  82. websocket::stream_base::timeout opt{};
  83. // Set suggested timeout settings for the websocket
  84. if constexpr (args_t::is_session)
  85. {
  86. opt = websocket::stream_base::timeout::suggested(beast::role_type::server);
  87. }
  88. else
  89. {
  90. opt = websocket::stream_base::timeout::suggested(beast::role_type::client);
  91. }
  92. opt.handshake_timeout = derive.get_connect_timeout();
  93. this->ws_stream_->set_option(opt);
  94. }
  95. template<typename C, typename Socket>
  96. inline void _ws_start(std::shared_ptr<derived_t>& this_ptr, std::shared_ptr<ecs_t<C>>& ecs, Socket& socket)
  97. {
  98. derived_t& derive = static_cast<derived_t&>(*this);
  99. detail::ignore_unused(derive, this_ptr, ecs, socket);
  100. ASIO2_ASSERT(derive.io_->running_in_this_thread());
  101. }
  102. template<typename DeferEvent>
  103. inline void _ws_stop(std::shared_ptr<derived_t> this_ptr, DeferEvent chain)
  104. {
  105. derived_t& derive = static_cast<derived_t&>(*this);
  106. ASIO2_ASSERT(derive.io_->running_in_this_thread());
  107. if (!this->ws_stream_)
  108. return;
  109. // bug fixed : resolve beast::websocket::detail::soft_mutex
  110. // BHO_ASSERT(id_ != T::id); failed (line 89).
  111. //
  112. // If this assert goes off it means you are attempting to
  113. // simultaneously initiate more than one of same asynchronous
  114. // operation, which is not allowed. For example, you must wait
  115. // for an async_read to complete before performing another
  116. // async_read.
  117. derive.disp_event([this, &derive, this_ptr = std::move(this_ptr), e = chain.move_event()]
  118. (event_queue_guard<derived_t> g) mutable
  119. {
  120. // must construct a new chain
  121. defer_event chain(std::move(e), std::move(g));
  122. // Set the handshake timeout to a small value, otherwise if the remote don't
  123. // send a websocket close frame, the async_close's callback will never be
  124. // called.
  125. websocket::stream_base::timeout opt{};
  126. opt.handshake_timeout = derive.get_disconnect_timeout();
  127. opt.idle_timeout = websocket::stream_base::none();
  128. try
  129. {
  130. derive.ws_stream_->set_option(opt);
  131. // Reset the decorator, beacuse the decorator maybe hold the self
  132. // shared_ptr, if don't reset it, the session ptr maybe can't detroyed.
  133. derive.ws_stream_->set_option(
  134. websocket::stream_base::decorator([](websocket::request_type&) {}));
  135. derive.ws_stream_->set_option(
  136. websocket::stream_base::decorator([](websocket::response_type&) {}));
  137. }
  138. catch (system_error const& e)
  139. {
  140. set_last_error(e);
  141. }
  142. // Can't call close twice
  143. // TODO return a custom error code
  144. // BHO_ASSERT(! impl.wr_close);
  145. if (!this->ws_stream_->is_open())
  146. {
  147. // must Reset the control frame callback. the control frame callback hold the
  148. // self shared_ptr, if don't reset it, will cause memory leaks.
  149. this->ws_stream_->control_callback();
  150. return;
  151. }
  152. #if defined(_DEBUG) || defined(DEBUG)
  153. ASIO2_ASSERT(derive.post_send_counter_.load() == 0);
  154. derive.post_send_counter_++;
  155. #endif
  156. ASIO2_LOG_DEBUG("ws_stream_cp enter async_close");
  157. // Close the WebSocket connection
  158. // async_close behavior :
  159. // send a websocket close frame to the remote, and wait for recv a websocket close
  160. // frame from the remote.
  161. // async_close maybe close the socket directly.
  162. this->ws_stream_->async_close(websocket::close_code::normal,
  163. [&derive, this_ptr = std::move(this_ptr), chain = std::move(chain)]
  164. (error_code ec) mutable
  165. {
  166. #if defined(_DEBUG) || defined(DEBUG)
  167. derive.post_send_counter_--;
  168. #endif
  169. detail::ignore_unused(derive, ec);
  170. ASIO2_LOG_DEBUG("ws_stream_cp leave async_close:{} {}", ec.value(), ec.message());
  171. // if async close failed, the inner timer of async close will exists for timeout,
  172. // it will cause the ws client can't be exited, so we reset the timeout to none
  173. // to notify the timer to exit.
  174. if (ec)
  175. {
  176. websocket::stream_base::timeout opt{};
  177. opt.handshake_timeout = websocket::stream_base::none();
  178. opt.idle_timeout = websocket::stream_base::none();
  179. try
  180. {
  181. derive.ws_stream_->set_option(opt);
  182. }
  183. catch (system_error const&)
  184. {
  185. }
  186. }
  187. //if (ec)
  188. // return;
  189. // If we get here then the connection is closed gracefully
  190. // must Reset the control frame callback. the control frame callback hold the
  191. // self shared_ptr, if don't reset it, will cause memory leaks.
  192. derive.ws_stream_->control_callback();
  193. });
  194. }, chain.move_guard());
  195. }
  196. template<typename C>
  197. inline void _ws_post_recv(std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  198. {
  199. derived_t& derive = static_cast<derived_t&>(*this);
  200. if (!derive.is_started())
  201. {
  202. if (derive.state_ == state_t::started)
  203. {
  204. derive._do_disconnect(asio2::get_last_error(), std::move(this_ptr));
  205. }
  206. return;
  207. }
  208. ASIO2_ASSERT(derive.io_->running_in_this_thread());
  209. ASIO2_ASSERT(bool(this->ws_stream_));
  210. #if defined(_DEBUG) || defined(DEBUG)
  211. ASIO2_ASSERT(derive.post_recv_counter_.load() == 0);
  212. derive.post_recv_counter_++;
  213. #endif
  214. ASIO2_ASSERT(derive.reading_ == false);
  215. derive.reading_ = true;
  216. // Read a message into our buffer
  217. this->ws_stream_->async_read(derive.buffer().base(), make_allocator(derive.rallocator(),
  218. [&derive, this_ptr = std::move(this_ptr), ecs = std::move(ecs)]
  219. (const error_code& ec, std::size_t bytes_recvd) mutable
  220. {
  221. #if defined(_DEBUG) || defined(DEBUG)
  222. derive.post_recv_counter_--;
  223. #endif
  224. derive.reading_ = false;
  225. derive._handle_recv(ec, bytes_recvd, std::move(this_ptr), std::move(ecs));
  226. }));
  227. }
  228. template<typename C>
  229. void _ws_handle_recv(
  230. const error_code& ec, std::size_t bytes_recvd,
  231. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  232. {
  233. derived_t& derive = static_cast<derived_t&>(*this);
  234. ASIO2_ASSERT(derive.io_->running_in_this_thread());
  235. set_last_error(ec);
  236. if (!derive.is_started())
  237. {
  238. if (derive.state_ == state_t::started)
  239. {
  240. ASIO2_LOG_INFOR("_ws_handle_recv with closed socket: {} {}", ec.value(), ec.message());
  241. derive._do_disconnect(ec, this_ptr);
  242. }
  243. derive._stop_readend_timer(std::move(this_ptr));
  244. return;
  245. }
  246. // bytes_recvd : The number of bytes in the streambuf's get area up to and including the delimiter.
  247. if (!ec)
  248. {
  249. // every times recv data,we update the last alive time.
  250. derive.update_alive_time();
  251. derive._fire_recv(this_ptr, ecs, std::string_view(reinterpret_cast<
  252. std::string_view::const_pointer>(derive.buffer().data().data()), bytes_recvd));
  253. derive.buffer().consume(bytes_recvd);
  254. derive._post_recv(std::move(this_ptr), std::move(ecs));
  255. }
  256. else
  257. {
  258. ASIO2_LOG_DEBUG("_ws_handle_recv with error: {} {}", ec.value(), ec.message());
  259. derive._do_disconnect(ec, this_ptr);
  260. derive._stop_readend_timer(std::move(this_ptr));
  261. }
  262. // If an error occurs then no new asynchronous operations are started. This
  263. // means that all shared_ptr references to the connection object will
  264. // disappear and the object will be destroyed automatically after this
  265. // handler returns. The connection class's destructor closes the socket.
  266. }
  267. template<typename C>
  268. inline void _post_control_callback(std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  269. {
  270. derived_t& derive = static_cast<derived_t&>(*this);
  271. ASIO2_ASSERT(bool(this->ws_stream_));
  272. // Set the control callback. This will be called
  273. // on every incoming ping, pong, and close frame.
  274. // can't use push_event, just only use asio::post, beacuse the callback handler
  275. // will not be called immediately, it is called only when it receives an event
  276. // on the another side.
  277. asio::post(derive.io_->context(), make_allocator(derive.wallocator(),
  278. [this, &derive, this_ptr = std::move(this_ptr), ecs = std::move(ecs)]() mutable
  279. {
  280. // Because hole this_ptr in control callback can easily cause circular references,
  281. // Once forget to call control callback with empty param or due to exceptions or
  282. // other reasons, will result in a circular reference, so we hold the weak ptr
  283. // in the control callback instead of shared ptr.
  284. std::weak_ptr<derived_t> this_wptr{ this_ptr };
  285. bool check_alive = (this_ptr != nullptr);
  286. this->ws_stream_->control_callback(
  287. [&derive, this_wptr = std::move(this_wptr), ecs = std::move(ecs), check_alive]
  288. (websocket::frame_type kind, beast::string_view payload) mutable
  289. {
  290. std::shared_ptr<derived_t> this_ptr = this_wptr.lock();
  291. // If the original shared ptr is not empty, but now it is empty, it means that
  292. // the object is destroyed, so we need return.
  293. // But this situation shouldn't happen.
  294. if (check_alive && this_ptr == nullptr)
  295. {
  296. ASIO2_ASSERT(false);
  297. return;
  298. }
  299. derive._handle_control_callback(kind, payload, std::move(this_ptr), ecs);
  300. });
  301. }));
  302. }
  303. template<typename C>
  304. inline void _handle_control_callback(
  305. websocket::frame_type kind, beast::string_view payload,
  306. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  307. {
  308. derived_t& derive = static_cast<derived_t&>(*this);
  309. detail::ignore_unused(payload, this_ptr, ecs);
  310. // Note that the connection is alive
  311. derive.update_alive_time();
  312. switch (kind)
  313. {
  314. case websocket::frame_type::ping:
  315. derive._handle_control_ping(payload, std::move(this_ptr), std::move(ecs));
  316. break;
  317. case websocket::frame_type::pong:
  318. derive._handle_control_pong(payload, std::move(this_ptr), std::move(ecs));
  319. break;
  320. case websocket::frame_type::close:
  321. derive._handle_control_close(payload, std::move(this_ptr), std::move(ecs));
  322. break;
  323. default:break;
  324. }
  325. }
  326. template<typename C>
  327. inline void _handle_control_ping(
  328. beast::string_view payload, std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  329. {
  330. detail::ignore_unused(payload, this_ptr, ecs);
  331. }
  332. template<typename C>
  333. inline void _handle_control_pong(
  334. beast::string_view payload, std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  335. {
  336. detail::ignore_unused(payload, this_ptr, ecs);
  337. }
  338. template<typename C>
  339. inline void _handle_control_close(
  340. beast::string_view payload, std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs)
  341. {
  342. derived_t& derive = static_cast<derived_t&>(*this);
  343. detail::ignore_unused(payload, this_ptr, ecs);
  344. if (derive.state_ == state_t::started)
  345. {
  346. ASIO2_LOG_DEBUG("ws_stream_cp::_handle_control_close _do_disconnect");
  347. derive._do_disconnect(websocket::error::closed, std::move(this_ptr));
  348. }
  349. }
  350. template<typename C, typename DeferEvent>
  351. inline void _post_read_upgrade_request(
  352. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
  353. {
  354. derived_t& derive = static_cast<derived_t&>(*this);
  355. ASIO2_ASSERT(derive.io_->running_in_this_thread());
  356. // Make the request empty before reading,
  357. // otherwise the operation behavior is undefined.
  358. derive.upgrade_req_ = {};
  359. #if defined(_DEBUG) || defined(DEBUG)
  360. ASIO2_ASSERT(derive.post_recv_counter_.load() == 0);
  361. derive.post_recv_counter_++;
  362. #endif
  363. ASIO2_ASSERT(derive.reading_ == false);
  364. derive.reading_ = true;
  365. // Read a request
  366. http::async_read(derive.upgrade_stream(), derive.buffer().base(), derive.upgrade_req_,
  367. make_allocator(derive.rallocator(),
  368. [&derive, this_ptr = std::move(this_ptr), ecs = std::move(ecs), chain = std::move(chain)]
  369. (const error_code & ec, [[maybe_unused]] std::size_t bytes_recvd) mutable
  370. {
  371. #if defined(_DEBUG) || defined(DEBUG)
  372. derive.post_recv_counter_--;
  373. #endif
  374. derive.reading_ = false;
  375. derive._handle_read_upgrade_request(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  376. }));
  377. }
  378. template<typename C, typename DeferEvent>
  379. inline void _handle_read_upgrade_request(
  380. const error_code& ec,
  381. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
  382. {
  383. derived_t& derive = static_cast<derived_t&>(*this);
  384. ASIO2_ASSERT(derive.io_->running_in_this_thread());
  385. set_last_error(ec);
  386. if (!ec)
  387. {
  388. // every times recv data,we update the last alive time.
  389. derive.update_alive_time();
  390. derive._post_control_callback(this_ptr, ecs);
  391. derive._post_upgrade(
  392. std::move(this_ptr), std::move(ecs), derive.upgrade_req_, std::move(chain));
  393. }
  394. else
  395. {
  396. derive._handle_upgrade(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  397. }
  398. }
  399. template<typename C, typename DeferEvent, typename Response, bool IsSession = args_t::is_session>
  400. typename std::enable_if_t<!IsSession, void>
  401. inline _post_upgrade(
  402. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, Response& rep, DeferEvent chain)
  403. {
  404. derived_t& derive = static_cast<derived_t&>(*this);
  405. ASIO2_ASSERT(bool(this->ws_stream_));
  406. #if defined(_DEBUG) || defined(DEBUG)
  407. ASIO2_ASSERT(derive.post_send_counter_.load() == 0);
  408. derive.post_send_counter_++;
  409. #endif
  410. // Perform the websocket handshake
  411. this->ws_stream_->async_handshake(rep, derive.host_, derive.get_upgrade_target(),
  412. make_allocator(derive.wallocator(),
  413. [&derive, this_ptr = std::move(this_ptr), ecs = std::move(ecs), chain = std::move(chain)]
  414. (error_code const& ec) mutable
  415. {
  416. #if defined(_DEBUG) || defined(DEBUG)
  417. derive.post_send_counter_--;
  418. #endif
  419. derive._handle_upgrade(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  420. }));
  421. }
  422. template<typename C, typename DeferEvent, typename Request, bool IsSession = args_t::is_session>
  423. typename std::enable_if_t<IsSession, void>
  424. inline _post_upgrade(
  425. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, Request const& req, DeferEvent chain)
  426. {
  427. derived_t& derive = static_cast<derived_t&>(*this);
  428. ASIO2_ASSERT(bool(this->ws_stream_));
  429. #if defined(_DEBUG) || defined(DEBUG)
  430. ASIO2_ASSERT(derive.post_send_counter_.load() == 0);
  431. derive.post_send_counter_++;
  432. #endif
  433. // Accept the websocket handshake
  434. // just write response.
  435. this->ws_stream_->async_accept(req, make_allocator(derive.wallocator(),
  436. [&derive, this_ptr = std::move(this_ptr), ecs = std::move(ecs), chain = std::move(chain)]
  437. (error_code ec) mutable
  438. {
  439. #if defined(_DEBUG) || defined(DEBUG)
  440. derive.post_send_counter_--;
  441. #endif
  442. derive._handle_upgrade(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  443. }));
  444. }
  445. template<typename C, typename DeferEvent, bool IsSession = args_t::is_session>
  446. typename std::enable_if_t<IsSession, void>
  447. inline _post_upgrade(std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
  448. {
  449. derived_t& derive = static_cast<derived_t&>(*this);
  450. ASIO2_ASSERT(bool(this->ws_stream_));
  451. #if defined(_DEBUG) || defined(DEBUG)
  452. ASIO2_ASSERT(derive.post_recv_counter_.load() == 0);
  453. derive.post_recv_counter_++;
  454. #endif
  455. // Accept the websocket handshake
  456. // first read request, then write response.
  457. this->ws_stream_->async_accept(make_allocator(derive.wallocator(),
  458. [&derive, this_ptr = std::move(this_ptr), ecs = std::move(ecs), chain = std::move(chain)]
  459. (error_code ec) mutable
  460. {
  461. #if defined(_DEBUG) || defined(DEBUG)
  462. derive.post_recv_counter_--;
  463. #endif
  464. derive._handle_upgrade(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  465. }));
  466. }
  467. template<typename C, typename DeferEvent>
  468. inline void _session_handle_upgrade(
  469. const error_code& ec,
  470. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
  471. {
  472. derived_t& derive = static_cast<derived_t&>(*this);
  473. // Use "sessions_.dispatch" to ensure that the _fire_accept function and the _fire_upgrade
  474. // function are fired in the same thread
  475. derive.sessions_.dispatch(
  476. [&derive, ec, this_ptr = std::move(this_ptr), ecs = std::move(ecs), chain = std::move(chain)]
  477. () mutable
  478. {
  479. set_last_error(ec);
  480. derive._fire_upgrade(this_ptr);
  481. if (ec)
  482. {
  483. derive._do_disconnect(ec, std::move(this_ptr), std::move(chain));
  484. return;
  485. }
  486. derive._done_connect(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  487. });
  488. }
  489. template<typename C, typename DeferEvent>
  490. inline void _client_handle_upgrade(
  491. const error_code& ec,
  492. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
  493. {
  494. derived_t& derive = static_cast<derived_t&>(*this);
  495. set_last_error(ec);
  496. derive._fire_upgrade(this_ptr);
  497. derive._done_connect(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  498. }
  499. template<typename C, typename DeferEvent>
  500. inline void _handle_upgrade(
  501. const error_code& ec,
  502. std::shared_ptr<derived_t> this_ptr, std::shared_ptr<ecs_t<C>> ecs, DeferEvent chain)
  503. {
  504. derived_t& derive = static_cast<derived_t&>(*this);
  505. ASIO2_ASSERT(bool(this->ws_stream_));
  506. if constexpr (args_t::is_session)
  507. {
  508. derive._session_handle_upgrade(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  509. }
  510. else
  511. {
  512. derive._client_handle_upgrade(ec, std::move(this_ptr), std::move(ecs), std::move(chain));
  513. }
  514. }
  515. protected:
  516. std::unique_ptr<ws_stream_type> ws_stream_;
  517. };
  518. }
  519. #endif // !__ASIO2_WS_STREAM_COMPONENT_HPP__