close.hpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. //
  2. // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // Official repository: https://github.com/boostorg/beast
  8. //
  9. #ifndef BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_HPP
  10. #define BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_HPP
  11. #include <boost/beast/websocket/teardown.hpp>
  12. #include <boost/beast/websocket/detail/mask.hpp>
  13. #include <boost/beast/websocket/impl/stream_impl.hpp>
  14. #include <boost/beast/core/async_base.hpp>
  15. #include <boost/beast/core/flat_static_buffer.hpp>
  16. #include <boost/beast/core/stream_traits.hpp>
  17. #include <boost/beast/core/detail/bind_continuation.hpp>
  18. #include <boost/asio/coroutine.hpp>
  19. #include <boost/asio/dispatch.hpp>
  20. #include <boost/throw_exception.hpp>
  21. #include <memory>
  22. namespace boost {
  23. namespace beast {
  24. namespace websocket {
  25. /* Close the WebSocket Connection
  26. This composed operation sends the close frame if it hasn't already
  27. been sent, then reads and discards frames until receiving a close
  28. frame. Finally it invokes the teardown operation to shut down the
  29. underlying connection.
  30. */
  31. template<class NextLayer, bool deflateSupported>
  32. template<class Handler>
  33. class stream<NextLayer, deflateSupported>::close_op
  34. : public beast::stable_async_base<
  35. Handler, beast::executor_type<stream>>
  36. , public asio::coroutine
  37. {
  38. boost::weak_ptr<impl_type> wp_;
  39. error_code ev_;
  40. detail::frame_buffer& fb_;
  41. public:
  42. static constexpr int id = 5; // for soft_mutex
  43. template<class Handler_>
  44. close_op(
  45. Handler_&& h,
  46. boost::shared_ptr<impl_type> const& sp,
  47. close_reason const& cr)
  48. : stable_async_base<Handler,
  49. beast::executor_type<stream>>(
  50. std::forward<Handler_>(h),
  51. sp->stream().get_executor())
  52. , wp_(sp)
  53. , fb_(beast::allocate_stable<
  54. detail::frame_buffer>(*this))
  55. {
  56. // Serialize the close frame
  57. sp->template write_close<
  58. flat_static_buffer_base>(fb_, cr);
  59. (*this)({}, 0, false);
  60. }
  61. void
  62. operator()(
  63. error_code ec = {},
  64. std::size_t bytes_transferred = 0,
  65. bool cont = true)
  66. {
  67. using beast::detail::clamp;
  68. auto sp = wp_.lock();
  69. if(! sp)
  70. {
  71. BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
  72. return this->complete(cont, ec);
  73. }
  74. auto& impl = *sp;
  75. BOOST_ASIO_CORO_REENTER(*this)
  76. {
  77. // Acquire the write lock
  78. if(! impl.wr_block.try_lock(this))
  79. {
  80. BOOST_ASIO_CORO_YIELD
  81. {
  82. BOOST_ASIO_HANDLER_LOCATION((
  83. __FILE__, __LINE__,
  84. "websocket::async_close"));
  85. this->set_allowed_cancellation(net::cancellation_type::all);
  86. impl.op_close.emplace(std::move(*this),
  87. net::cancellation_type::all);
  88. }
  89. // cancel fired before we could do anything.
  90. if (ec == net::error::operation_aborted)
  91. return this->complete(cont, ec);
  92. this->set_allowed_cancellation(net::cancellation_type::terminal);
  93. impl.wr_block.lock(this);
  94. BOOST_ASIO_CORO_YIELD
  95. {
  96. BOOST_ASIO_HANDLER_LOCATION((
  97. __FILE__, __LINE__,
  98. "websocket::async_close"));
  99. const auto ex = this->get_immediate_executor();
  100. net::dispatch(ex, std::move(*this));
  101. }
  102. BOOST_ASSERT(impl.wr_block.is_locked(this));
  103. }
  104. if(impl.check_stop_now(ec))
  105. goto upcall;
  106. // Can't call close twice
  107. // TODO return a custom error code
  108. BOOST_ASSERT(! impl.wr_close);
  109. // Send close frame
  110. impl.wr_close = true;
  111. impl.change_status(status::closing);
  112. impl.update_timer(this->get_executor());
  113. BOOST_ASIO_CORO_YIELD
  114. {
  115. BOOST_ASIO_HANDLER_LOCATION((
  116. __FILE__, __LINE__,
  117. "websocket::async_close"));
  118. net::async_write(impl.stream(), fb_.data(),
  119. beast::detail::bind_continuation(std::move(*this)));
  120. }
  121. if(impl.check_stop_now(ec))
  122. goto upcall;
  123. if(impl.rd_close)
  124. {
  125. // This happens when the read_op gets a close frame
  126. // at the same time close_op is sending the close frame.
  127. // The read_op will be suspended on the write block.
  128. goto teardown;
  129. }
  130. // Acquire the read lock
  131. if(! impl.rd_block.try_lock(this))
  132. {
  133. BOOST_ASIO_CORO_YIELD
  134. {
  135. BOOST_ASIO_HANDLER_LOCATION((
  136. __FILE__, __LINE__,
  137. "websocket::async_close"));
  138. // terminal only, that's the default
  139. impl.op_r_close.emplace(std::move(*this));
  140. }
  141. if (ec == net::error::operation_aborted)
  142. {
  143. // if a cancellation fires here, we do a dirty shutdown
  144. impl.change_status(status::closed);
  145. close_socket(get_lowest_layer(impl.stream()));
  146. return this->complete(cont, ec);
  147. }
  148. impl.rd_block.lock(this);
  149. BOOST_ASIO_CORO_YIELD
  150. {
  151. BOOST_ASIO_HANDLER_LOCATION((
  152. __FILE__, __LINE__,
  153. "websocket::async_close"));
  154. const auto ex = this->get_immediate_executor();
  155. net::dispatch(ex, std::move(*this));
  156. }
  157. BOOST_ASSERT(impl.rd_block.is_locked(this));
  158. if(impl.check_stop_now(ec))
  159. goto upcall;
  160. BOOST_ASSERT(! impl.rd_close);
  161. }
  162. // Read until a receiving a close frame
  163. // TODO There should be a timeout on this
  164. if(impl.rd_remain > 0)
  165. goto read_payload;
  166. for(;;)
  167. {
  168. // Read frame header
  169. while(! impl.parse_fh(
  170. impl.rd_fh, impl.rd_buf, ev_))
  171. {
  172. if(ev_)
  173. goto teardown;
  174. BOOST_ASIO_CORO_YIELD
  175. {
  176. BOOST_ASIO_HANDLER_LOCATION((
  177. __FILE__, __LINE__,
  178. "websocket::async_close"));
  179. impl.stream().async_read_some(
  180. impl.rd_buf.prepare(read_size(
  181. impl.rd_buf, impl.rd_buf.max_size())),
  182. beast::detail::bind_continuation(std::move(*this)));
  183. }
  184. impl.rd_buf.commit(bytes_transferred);
  185. if(impl.check_stop_now(ec)) //< this catches cancellation
  186. goto upcall;
  187. }
  188. if(detail::is_control(impl.rd_fh.op))
  189. {
  190. // Discard ping or pong frame
  191. if(impl.rd_fh.op != detail::opcode::close)
  192. {
  193. impl.rd_buf.consume(clamp(impl.rd_fh.len));
  194. continue;
  195. }
  196. // Process close frame
  197. // TODO Should we invoke the control callback?
  198. BOOST_ASSERT(! impl.rd_close);
  199. impl.rd_close = true;
  200. auto const mb = buffers_prefix(
  201. clamp(impl.rd_fh.len),
  202. impl.rd_buf.data());
  203. if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
  204. detail::mask_inplace(mb, impl.rd_key);
  205. detail::read_close(impl.cr, mb, ev_);
  206. if(ev_)
  207. goto teardown;
  208. impl.rd_buf.consume(clamp(impl.rd_fh.len));
  209. goto teardown;
  210. }
  211. read_payload:
  212. // Discard message frame
  213. while(impl.rd_buf.size() < impl.rd_remain)
  214. {
  215. impl.rd_remain -= impl.rd_buf.size();
  216. impl.rd_buf.consume(impl.rd_buf.size());
  217. BOOST_ASIO_CORO_YIELD
  218. {
  219. BOOST_ASIO_HANDLER_LOCATION((
  220. __FILE__, __LINE__,
  221. "websocket::async_close"));
  222. impl.stream().async_read_some(
  223. impl.rd_buf.prepare(read_size(
  224. impl.rd_buf, impl.rd_buf.max_size())),
  225. beast::detail::bind_continuation(std::move(*this)));
  226. }
  227. impl.rd_buf.commit(bytes_transferred);
  228. if(impl.check_stop_now(ec))
  229. goto upcall;
  230. }
  231. BOOST_ASSERT(impl.rd_buf.size() >= impl.rd_remain);
  232. impl.rd_buf.consume(clamp(impl.rd_remain));
  233. impl.rd_remain = 0;
  234. }
  235. teardown:
  236. // Teardown
  237. BOOST_ASSERT(impl.wr_block.is_locked(this));
  238. using beast::websocket::async_teardown;
  239. BOOST_ASIO_CORO_YIELD
  240. {
  241. BOOST_ASIO_HANDLER_LOCATION((
  242. __FILE__, __LINE__,
  243. "websocket::async_close"));
  244. async_teardown(impl.role, impl.stream(),
  245. beast::detail::bind_continuation(std::move(*this)));
  246. }
  247. BOOST_ASSERT(impl.wr_block.is_locked(this));
  248. if(ec == net::error::eof)
  249. {
  250. // Rationale:
  251. // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
  252. ec = {};
  253. }
  254. if(! ec)
  255. {
  256. BOOST_BEAST_ASSIGN_EC(ec, ev_);
  257. }
  258. if(ec)
  259. impl.change_status(status::failed);
  260. else
  261. impl.change_status(status::closed);
  262. impl.close();
  263. upcall:
  264. impl.wr_block.unlock(this);
  265. impl.rd_block.try_unlock(this)
  266. && impl.op_r_rd.maybe_invoke();
  267. impl.op_rd.maybe_invoke()
  268. || impl.op_idle_ping.maybe_invoke()
  269. || impl.op_ping.maybe_invoke()
  270. || impl.op_wr.maybe_invoke();
  271. this->complete(cont, ec);
  272. }
  273. }
  274. };
  275. template<class NextLayer, bool deflateSupported>
  276. struct stream<NextLayer, deflateSupported>::
  277. run_close_op
  278. {
  279. boost::shared_ptr<impl_type> const& self;
  280. using executor_type = typename stream::executor_type;
  281. executor_type
  282. get_executor() const noexcept
  283. {
  284. return self->stream().get_executor();
  285. }
  286. template<class CloseHandler>
  287. void
  288. operator()(
  289. CloseHandler&& h,
  290. close_reason const& cr)
  291. {
  292. // If you get an error on the following line it means
  293. // that your handler does not meet the documented type
  294. // requirements for the handler.
  295. static_assert(
  296. beast::detail::is_invocable<CloseHandler,
  297. void(error_code)>::value,
  298. "CloseHandler type requirements not met");
  299. close_op<
  300. typename std::decay<CloseHandler>::type>(
  301. std::forward<CloseHandler>(h),
  302. self,
  303. cr);
  304. }
  305. };
  306. //------------------------------------------------------------------------------
  307. template<class NextLayer, bool deflateSupported>
  308. void
  309. stream<NextLayer, deflateSupported>::
  310. close(close_reason const& cr)
  311. {
  312. static_assert(is_sync_stream<next_layer_type>::value,
  313. "SyncStream type requirements not met");
  314. error_code ec;
  315. close(cr, ec);
  316. if(ec)
  317. BOOST_THROW_EXCEPTION(system_error{ec});
  318. }
  319. template<class NextLayer, bool deflateSupported>
  320. void
  321. stream<NextLayer, deflateSupported>::
  322. close(close_reason const& cr, error_code& ec)
  323. {
  324. static_assert(is_sync_stream<next_layer_type>::value,
  325. "SyncStream type requirements not met");
  326. using beast::detail::clamp;
  327. auto& impl = *impl_;
  328. ec = {};
  329. if(impl.check_stop_now(ec))
  330. return;
  331. BOOST_ASSERT(! impl.rd_close);
  332. // Can't call close twice
  333. // TODO return a custom error code
  334. BOOST_ASSERT(! impl.wr_close);
  335. // Send close frame
  336. {
  337. impl.wr_close = true;
  338. impl.change_status(status::closing);
  339. detail::frame_buffer fb;
  340. impl.template write_close<flat_static_buffer_base>(fb, cr);
  341. net::write(impl.stream(), fb.data(), ec);
  342. if(impl.check_stop_now(ec))
  343. return;
  344. }
  345. // Read until a receiving a close frame
  346. error_code ev;
  347. if(impl.rd_remain > 0)
  348. goto read_payload;
  349. for(;;)
  350. {
  351. // Read frame header
  352. while(! impl.parse_fh(
  353. impl.rd_fh, impl.rd_buf, ev))
  354. {
  355. if(ev)
  356. {
  357. // Protocol violation
  358. return do_fail(close_code::none, ev, ec);
  359. }
  360. impl.rd_buf.commit(impl.stream().read_some(
  361. impl.rd_buf.prepare(read_size(
  362. impl.rd_buf, impl.rd_buf.max_size())), ec));
  363. if(impl.check_stop_now(ec))
  364. return;
  365. }
  366. if(detail::is_control(impl.rd_fh.op))
  367. {
  368. // Discard ping/pong frame
  369. if(impl.rd_fh.op != detail::opcode::close)
  370. {
  371. impl.rd_buf.consume(clamp(impl.rd_fh.len));
  372. continue;
  373. }
  374. // Handle close frame
  375. // TODO Should we invoke the control callback?
  376. BOOST_ASSERT(! impl.rd_close);
  377. impl.rd_close = true;
  378. auto const mb = buffers_prefix(
  379. clamp(impl.rd_fh.len),
  380. impl.rd_buf.data());
  381. if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
  382. detail::mask_inplace(mb, impl.rd_key);
  383. detail::read_close(impl.cr, mb, ev);
  384. if(ev)
  385. {
  386. // Protocol violation
  387. return do_fail(close_code::none, ev, ec);
  388. }
  389. impl.rd_buf.consume(clamp(impl.rd_fh.len));
  390. break;
  391. }
  392. read_payload:
  393. // Discard message frame
  394. while(impl.rd_buf.size() < impl.rd_remain)
  395. {
  396. impl.rd_remain -= impl.rd_buf.size();
  397. impl.rd_buf.consume(impl.rd_buf.size());
  398. impl.rd_buf.commit(
  399. impl.stream().read_some(
  400. impl.rd_buf.prepare(
  401. read_size(
  402. impl.rd_buf,
  403. impl.rd_buf.max_size())),
  404. ec));
  405. if(impl.check_stop_now(ec))
  406. return;
  407. }
  408. BOOST_ASSERT(
  409. impl.rd_buf.size() >= impl.rd_remain);
  410. impl.rd_buf.consume(clamp(impl.rd_remain));
  411. impl.rd_remain = 0;
  412. }
  413. // _Close the WebSocket Connection_
  414. do_fail(close_code::none, error::closed, ec);
  415. if(ec == error::closed)
  416. ec = {};
  417. }
  418. template<class NextLayer, bool deflateSupported>
  419. template<BOOST_BEAST_ASYNC_TPARAM1 CloseHandler>
  420. BOOST_BEAST_ASYNC_RESULT1(CloseHandler)
  421. stream<NextLayer, deflateSupported>::
  422. async_close(close_reason const& cr, CloseHandler&& handler)
  423. {
  424. static_assert(is_async_stream<next_layer_type>::value,
  425. "AsyncStream type requirements not met");
  426. return net::async_initiate<
  427. CloseHandler,
  428. void(error_code)>(
  429. run_close_op{impl_},
  430. handler,
  431. cr);
  432. }
  433. } // websocket
  434. } // beast
  435. } // boost
  436. #endif