read.hpp 53 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423
  1. //
  2. // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // Official repository: https://github.com/boostorg/beast
  8. //
  9. #ifndef BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
  10. #define BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
  11. #include <boost/beast/core/buffer_traits.hpp>
  12. #include <boost/beast/websocket/teardown.hpp>
  13. #include <boost/beast/websocket/detail/mask.hpp>
  14. #include <boost/beast/websocket/impl/stream_impl.hpp>
  15. #include <boost/beast/core/async_base.hpp>
  16. #include <boost/beast/core/buffers_prefix.hpp>
  17. #include <boost/beast/core/buffers_suffix.hpp>
  18. #include <boost/beast/core/flat_static_buffer.hpp>
  19. #include <boost/beast/core/read_size.hpp>
  20. #include <boost/beast/core/stream_traits.hpp>
  21. #include <boost/beast/core/detail/bind_continuation.hpp>
  22. #include <boost/beast/core/detail/buffer.hpp>
  23. #include <boost/beast/core/detail/clamp.hpp>
  24. #include <boost/beast/core/detail/config.hpp>
  25. #include <boost/asio/coroutine.hpp>
  26. #include <boost/assert.hpp>
  27. #include <boost/config.hpp>
  28. #include <boost/optional.hpp>
  29. #include <boost/throw_exception.hpp>
  30. #include <algorithm>
  31. #include <limits>
  32. #include <memory>
  33. namespace boost {
  34. namespace beast {
  35. namespace websocket {
  36. /* Read some message data into a buffer sequence.
  37. Also reads and handles control frames.
  38. */
  39. template<class NextLayer, bool deflateSupported>
  40. template<class Handler, class MutableBufferSequence>
  41. class stream<NextLayer, deflateSupported>::read_some_op
  42. : public beast::async_base<
  43. Handler, beast::executor_type<stream>>
  44. , public asio::coroutine
  45. {
  46. boost::weak_ptr<impl_type> wp_;
  47. MutableBufferSequence bs_;
  48. buffers_suffix<MutableBufferSequence> cb_;
  49. std::size_t bytes_written_ = 0;
  50. error_code result_;
  51. close_code code_;
  52. bool did_read_ = false;
  53. public:
  54. static constexpr int id = 1; // for soft_mutex
  55. template<class Handler_>
  56. read_some_op(
  57. Handler_&& h,
  58. boost::shared_ptr<impl_type> const& sp,
  59. MutableBufferSequence const& bs)
  60. : async_base<
  61. Handler, beast::executor_type<stream>>(
  62. std::forward<Handler_>(h),
  63. sp->stream().get_executor())
  64. , wp_(sp)
  65. , bs_(bs)
  66. , cb_(bs)
  67. , code_(close_code::none)
  68. {
  69. (*this)({}, 0, false);
  70. }
  71. void operator()(
  72. error_code ec = {},
  73. std::size_t bytes_transferred = 0,
  74. bool cont = true)
  75. {
  76. using beast::detail::clamp;
  77. auto sp = wp_.lock();
  78. if(! sp)
  79. {
  80. BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
  81. bytes_written_ = 0;
  82. return this->complete(cont, ec, bytes_written_);
  83. }
  84. auto& impl = *sp;
  85. BOOST_ASIO_CORO_REENTER(*this)
  86. {
  87. impl.update_timer(this->get_executor());
  88. acquire_read_lock:
  89. // Acquire the read lock
  90. if(! impl.rd_block.try_lock(this))
  91. {
  92. do_suspend:
  93. BOOST_ASIO_CORO_YIELD
  94. {
  95. BOOST_ASIO_HANDLER_LOCATION((
  96. __FILE__, __LINE__,
  97. "websocket::async_read_some"));
  98. this->set_allowed_cancellation(net::cancellation_type::all);
  99. impl.op_r_rd.emplace(std::move(*this), net::cancellation_type::all);
  100. }
  101. if (ec)
  102. return this->complete(cont, ec, bytes_written_);
  103. this->set_allowed_cancellation(net::cancellation_type::terminal);
  104. impl.rd_block.lock(this);
  105. BOOST_ASIO_CORO_YIELD
  106. {
  107. BOOST_ASIO_HANDLER_LOCATION((
  108. __FILE__, __LINE__,
  109. "websocket::async_read_some"));
  110. const auto ex = this->get_immediate_executor();
  111. net::dispatch(ex, std::move(*this));
  112. }
  113. BOOST_ASSERT(impl.rd_block.is_locked(this));
  114. BOOST_ASSERT(!ec);
  115. if(impl.check_stop_now(ec))
  116. {
  117. // Issue 2264 - There is no guarantee that the next
  118. // error will be operation_aborted.
  119. // The error could be a result of the peer resetting the
  120. // connection
  121. // BOOST_ASSERT(ec == net::error::operation_aborted);
  122. goto upcall;
  123. }
  124. // VFALCO Should never get here
  125. // The only way to get read blocked is if
  126. // a `close_op` wrote a close frame
  127. BOOST_ASSERT(impl.wr_close);
  128. BOOST_ASSERT(impl.status_ != status::open);
  129. BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
  130. goto upcall;
  131. }
  132. else
  133. {
  134. // Make sure the stream is not closed
  135. if( impl.status_ == status::closed ||
  136. impl.status_ == status::failed)
  137. {
  138. BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
  139. goto upcall;
  140. }
  141. }
  142. // if status_ == status::closing, we want to suspend
  143. // the read operation until the close completes,
  144. // then finish the read with operation_aborted.
  145. loop:
  146. BOOST_ASSERT(impl.rd_block.is_locked(this));
  147. // See if we need to read a frame header. This
  148. // condition is structured to give the decompressor
  149. // a chance to emit the final empty deflate block
  150. //
  151. if(impl.rd_remain == 0 &&
  152. (! impl.rd_fh.fin || impl.rd_done))
  153. {
  154. // Read frame header
  155. while(! impl.parse_fh(
  156. impl.rd_fh, impl.rd_buf, result_))
  157. {
  158. if(result_)
  159. {
  160. // _Fail the WebSocket Connection_
  161. if(result_ == error::message_too_big)
  162. code_ = close_code::too_big;
  163. else
  164. code_ = close_code::protocol_error;
  165. goto close;
  166. }
  167. BOOST_ASSERT(impl.rd_block.is_locked(this));
  168. BOOST_ASIO_CORO_YIELD
  169. {
  170. BOOST_ASIO_HANDLER_LOCATION((
  171. __FILE__, __LINE__,
  172. "websocket::async_read_some"));
  173. impl.stream().async_read_some(
  174. impl.rd_buf.prepare(read_size(
  175. impl.rd_buf, impl.rd_buf.max_size())),
  176. std::move(*this));
  177. }
  178. BOOST_ASSERT(impl.rd_block.is_locked(this));
  179. impl.rd_buf.commit(bytes_transferred);
  180. if(impl.check_stop_now(ec))
  181. goto upcall;
  182. impl.reset_idle();
  183. // Allow a close operation
  184. // to acquire the read block
  185. impl.rd_block.unlock(this);
  186. if( impl.op_r_close.maybe_invoke())
  187. {
  188. // Suspend
  189. BOOST_ASSERT(impl.rd_block.is_locked());
  190. goto do_suspend;
  191. }
  192. // Acquire read block
  193. impl.rd_block.lock(this);
  194. }
  195. // Immediately apply the mask to the portion
  196. // of the buffer holding payload data.
  197. if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
  198. detail::mask_inplace(buffers_prefix(
  199. clamp(impl.rd_fh.len),
  200. impl.rd_buf.data()),
  201. impl.rd_key);
  202. if(detail::is_control(impl.rd_fh.op))
  203. {
  204. // Clear this otherwise the next
  205. // frame will be considered final.
  206. impl.rd_fh.fin = false;
  207. // Handle ping frame
  208. if(impl.rd_fh.op == detail::opcode::ping)
  209. {
  210. if(impl.ctrl_cb)
  211. {
  212. if(! cont)
  213. {
  214. BOOST_ASIO_CORO_YIELD
  215. {
  216. BOOST_ASIO_HANDLER_LOCATION((
  217. __FILE__, __LINE__,
  218. "websocket::async_read_some"));
  219. const auto ex = this->get_immediate_executor();
  220. net::dispatch(ex, std::move(*this));
  221. }
  222. BOOST_ASSERT(cont);
  223. // VFALCO call check_stop_now() here?
  224. }
  225. }
  226. {
  227. auto const b = buffers_prefix(
  228. clamp(impl.rd_fh.len),
  229. impl.rd_buf.data());
  230. auto const len = buffer_bytes(b);
  231. BOOST_ASSERT(len == impl.rd_fh.len);
  232. ping_data payload;
  233. detail::read_ping(payload, b);
  234. impl.rd_buf.consume(len);
  235. // Ignore ping when closing
  236. if(impl.status_ == status::closing)
  237. goto loop;
  238. if(impl.ctrl_cb)
  239. impl.ctrl_cb(
  240. frame_type::ping, to_string_view(payload));
  241. impl.rd_fb.clear();
  242. impl.template write_ping<
  243. flat_static_buffer_base>(impl.rd_fb,
  244. detail::opcode::pong, payload);
  245. }
  246. // Allow a close operation
  247. // to acquire the read block
  248. impl.rd_block.unlock(this);
  249. impl.op_r_close.maybe_invoke();
  250. // Acquire the write lock
  251. if(! impl.wr_block.try_lock(this))
  252. {
  253. BOOST_ASIO_CORO_YIELD
  254. {
  255. BOOST_ASIO_HANDLER_LOCATION((
  256. __FILE__, __LINE__,
  257. "websocket::async_read_some"));
  258. impl.op_rd.emplace(std::move(*this));
  259. }
  260. if (ec)
  261. return this->complete(cont, ec, bytes_written_);
  262. impl.wr_block.lock(this);
  263. BOOST_ASIO_CORO_YIELD
  264. {
  265. BOOST_ASIO_HANDLER_LOCATION((
  266. __FILE__, __LINE__,
  267. "websocket::async_read_some"));
  268. const auto ex = this->get_immediate_executor();
  269. net::dispatch(ex, std::move(*this));
  270. }
  271. BOOST_ASSERT(impl.wr_block.is_locked(this));
  272. if(impl.check_stop_now(ec))
  273. goto upcall;
  274. }
  275. // Send pong
  276. BOOST_ASSERT(impl.wr_block.is_locked(this));
  277. BOOST_ASIO_CORO_YIELD
  278. {
  279. BOOST_ASIO_HANDLER_LOCATION((
  280. __FILE__, __LINE__,
  281. "websocket::async_read_some"));
  282. net::async_write(
  283. impl.stream(), net::const_buffer(impl.rd_fb.data()),
  284. beast::detail::bind_continuation(std::move(*this)));
  285. }
  286. BOOST_ASSERT(impl.wr_block.is_locked(this));
  287. if(impl.check_stop_now(ec))
  288. goto upcall;
  289. impl.wr_block.unlock(this);
  290. impl.op_close.maybe_invoke()
  291. || impl.op_idle_ping.maybe_invoke()
  292. || impl.op_ping.maybe_invoke()
  293. || impl.op_wr.maybe_invoke();
  294. goto acquire_read_lock;
  295. }
  296. // Handle pong frame
  297. if(impl.rd_fh.op == detail::opcode::pong)
  298. {
  299. // Ignore pong when closing
  300. if(! impl.wr_close && impl.ctrl_cb)
  301. {
  302. if(! cont)
  303. {
  304. BOOST_ASIO_CORO_YIELD
  305. {
  306. BOOST_ASIO_HANDLER_LOCATION((
  307. __FILE__, __LINE__,
  308. "websocket::async_read_some"));
  309. const auto ex = this->get_immediate_executor();
  310. net::dispatch(ex, std::move(*this));
  311. }
  312. BOOST_ASSERT(cont);
  313. }
  314. }
  315. auto const cb = buffers_prefix(clamp(
  316. impl.rd_fh.len), impl.rd_buf.data());
  317. auto const len = buffer_bytes(cb);
  318. BOOST_ASSERT(len == impl.rd_fh.len);
  319. ping_data payload;
  320. detail::read_ping(payload, cb);
  321. impl.rd_buf.consume(len);
  322. // Ignore pong when closing
  323. if(! impl.wr_close && impl.ctrl_cb)
  324. impl.ctrl_cb(frame_type::pong, to_string_view(payload));
  325. goto loop;
  326. }
  327. // Handle close frame
  328. BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
  329. {
  330. if(impl.ctrl_cb)
  331. {
  332. if(! cont)
  333. {
  334. BOOST_ASIO_CORO_YIELD
  335. {
  336. BOOST_ASIO_HANDLER_LOCATION((
  337. __FILE__, __LINE__,
  338. "websocket::async_read_some"));
  339. const auto ex = this->get_immediate_executor();
  340. net::dispatch(ex, std::move(*this));
  341. }
  342. BOOST_ASSERT(cont);
  343. }
  344. }
  345. auto const cb = buffers_prefix(clamp(
  346. impl.rd_fh.len), impl.rd_buf.data());
  347. auto const len = buffer_bytes(cb);
  348. BOOST_ASSERT(len == impl.rd_fh.len);
  349. BOOST_ASSERT(! impl.rd_close);
  350. impl.rd_close = true;
  351. close_reason cr;
  352. detail::read_close(cr, cb, result_);
  353. if(result_)
  354. {
  355. // _Fail the WebSocket Connection_
  356. code_ = close_code::protocol_error;
  357. goto close;
  358. }
  359. impl.cr = cr;
  360. impl.rd_buf.consume(len);
  361. if(impl.ctrl_cb)
  362. impl.ctrl_cb(frame_type::close,
  363. to_string_view(impl.cr.reason));
  364. // See if we are already closing
  365. if(impl.status_ == status::closing)
  366. {
  367. // _Close the WebSocket Connection_
  368. BOOST_ASSERT(impl.wr_close);
  369. code_ = close_code::none;
  370. result_ = error::closed;
  371. goto close;
  372. }
  373. // _Start the WebSocket Closing Handshake_
  374. code_ = cr.code == close_code::none ?
  375. close_code::normal :
  376. static_cast<close_code>(cr.code);
  377. result_ = error::closed;
  378. goto close;
  379. }
  380. }
  381. if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
  382. {
  383. // Empty non-final frame
  384. goto loop;
  385. }
  386. impl.rd_done = false;
  387. }
  388. if(! impl.rd_deflated())
  389. {
  390. if(impl.rd_remain > 0)
  391. {
  392. if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
  393. (std::min)(clamp(impl.rd_remain),
  394. buffer_bytes(cb_)))
  395. {
  396. // Fill the read buffer first, otherwise we
  397. // get fewer bytes at the cost of one I/O.
  398. BOOST_ASIO_CORO_YIELD
  399. {
  400. BOOST_ASIO_HANDLER_LOCATION((
  401. __FILE__, __LINE__,
  402. "websocket::async_read_some"));
  403. impl.stream().async_read_some(
  404. impl.rd_buf.prepare(read_size(
  405. impl.rd_buf, impl.rd_buf.max_size())),
  406. std::move(*this));
  407. }
  408. impl.rd_buf.commit(bytes_transferred);
  409. if(impl.check_stop_now(ec))
  410. goto upcall;
  411. impl.reset_idle();
  412. if(impl.rd_fh.mask)
  413. detail::mask_inplace(buffers_prefix(clamp(
  414. impl.rd_remain), impl.rd_buf.data()),
  415. impl.rd_key);
  416. }
  417. if(impl.rd_buf.size() > 0)
  418. {
  419. // Copy from the read buffer.
  420. // The mask was already applied.
  421. bytes_transferred = net::buffer_copy(cb_,
  422. impl.rd_buf.data(), clamp(impl.rd_remain));
  423. auto const mb = buffers_prefix(
  424. bytes_transferred, cb_);
  425. impl.rd_remain -= bytes_transferred;
  426. if(impl.rd_op == detail::opcode::text)
  427. {
  428. if(! impl.rd_utf8.write(mb) ||
  429. (impl.rd_remain == 0 && impl.rd_fh.fin &&
  430. ! impl.rd_utf8.finish()))
  431. {
  432. // _Fail the WebSocket Connection_
  433. code_ = close_code::bad_payload;
  434. result_ = error::bad_frame_payload;
  435. goto close;
  436. }
  437. }
  438. bytes_written_ += bytes_transferred;
  439. impl.rd_size += bytes_transferred;
  440. impl.rd_buf.consume(bytes_transferred);
  441. }
  442. else
  443. {
  444. // Read into caller's buffer
  445. BOOST_ASSERT(impl.rd_remain > 0);
  446. BOOST_ASSERT(buffer_bytes(cb_) > 0);
  447. BOOST_ASSERT(buffer_bytes(buffers_prefix(
  448. clamp(impl.rd_remain), cb_)) > 0);
  449. BOOST_ASIO_CORO_YIELD
  450. {
  451. BOOST_ASIO_HANDLER_LOCATION((
  452. __FILE__, __LINE__,
  453. "websocket::async_read_some"));
  454. impl.stream().async_read_some(buffers_prefix(
  455. clamp(impl.rd_remain), cb_), std::move(*this));
  456. }
  457. if(impl.check_stop_now(ec))
  458. goto upcall;
  459. impl.reset_idle();
  460. BOOST_ASSERT(bytes_transferred > 0);
  461. auto const mb = buffers_prefix(
  462. bytes_transferred, cb_);
  463. impl.rd_remain -= bytes_transferred;
  464. if(impl.rd_fh.mask)
  465. detail::mask_inplace(mb, impl.rd_key);
  466. if(impl.rd_op == detail::opcode::text)
  467. {
  468. if(! impl.rd_utf8.write(mb) ||
  469. (impl.rd_remain == 0 && impl.rd_fh.fin &&
  470. ! impl.rd_utf8.finish()))
  471. {
  472. // _Fail the WebSocket Connection_
  473. code_ = close_code::bad_payload;
  474. result_ = error::bad_frame_payload;
  475. goto close;
  476. }
  477. }
  478. bytes_written_ += bytes_transferred;
  479. impl.rd_size += bytes_transferred;
  480. }
  481. }
  482. BOOST_ASSERT( ! impl.rd_done );
  483. if( impl.rd_remain == 0 && impl.rd_fh.fin )
  484. impl.rd_done = true;
  485. }
  486. else
  487. {
  488. // Read compressed message frame payload:
  489. // inflate even if rd_fh_.len == 0, otherwise we
  490. // never emit the end-of-stream deflate block.
  491. while(buffer_bytes(cb_) > 0)
  492. {
  493. if( impl.rd_remain > 0 &&
  494. impl.rd_buf.size() == 0 &&
  495. ! did_read_)
  496. {
  497. // read new
  498. BOOST_ASIO_CORO_YIELD
  499. {
  500. BOOST_ASIO_HANDLER_LOCATION((
  501. __FILE__, __LINE__,
  502. "websocket::async_read_some"));
  503. impl.stream().async_read_some(
  504. impl.rd_buf.prepare(read_size(
  505. impl.rd_buf, impl.rd_buf.max_size())),
  506. std::move(*this));
  507. }
  508. if(impl.check_stop_now(ec))
  509. goto upcall;
  510. impl.reset_idle();
  511. BOOST_ASSERT(bytes_transferred > 0);
  512. impl.rd_buf.commit(bytes_transferred);
  513. if(impl.rd_fh.mask)
  514. detail::mask_inplace(
  515. buffers_prefix(clamp(impl.rd_remain),
  516. impl.rd_buf.data()), impl.rd_key);
  517. did_read_ = true;
  518. }
  519. zlib::z_params zs;
  520. {
  521. auto const out = buffers_front(cb_);
  522. zs.next_out = out.data();
  523. zs.avail_out = out.size();
  524. BOOST_ASSERT(zs.avail_out > 0);
  525. }
  526. // boolean to track the end of the message.
  527. bool fin = false;
  528. if(impl.rd_remain > 0)
  529. {
  530. if(impl.rd_buf.size() > 0)
  531. {
  532. // use what's there
  533. auto const in = buffers_prefix(
  534. clamp(impl.rd_remain), buffers_front(
  535. impl.rd_buf.data()));
  536. zs.avail_in = in.size();
  537. zs.next_in = in.data();
  538. }
  539. else
  540. {
  541. break;
  542. }
  543. }
  544. else if(impl.rd_fh.fin)
  545. {
  546. // append the empty block codes
  547. static std::uint8_t constexpr
  548. empty_block[4] = { 0x00, 0x00, 0xff, 0xff };
  549. zs.next_in = empty_block;
  550. zs.avail_in = sizeof(empty_block);
  551. fin = true;
  552. }
  553. else
  554. {
  555. break;
  556. }
  557. impl.inflate(zs, zlib::Flush::sync, ec);
  558. if(impl.check_stop_now(ec))
  559. goto upcall;
  560. if(fin && zs.total_out == 0) {
  561. impl.do_context_takeover_read(impl.role);
  562. impl.rd_done = true;
  563. break;
  564. }
  565. if(impl.rd_msg_max && beast::detail::sum_exceeds(
  566. impl.rd_size, zs.total_out, impl.rd_msg_max))
  567. {
  568. // _Fail the WebSocket Connection_
  569. code_ = close_code::too_big;
  570. result_ = error::message_too_big;
  571. goto close;
  572. }
  573. cb_.consume(zs.total_out);
  574. impl.rd_size += zs.total_out;
  575. if (! fin) {
  576. impl.rd_remain -= zs.total_in;
  577. impl.rd_buf.consume(zs.total_in);
  578. }
  579. bytes_written_ += zs.total_out;
  580. }
  581. if(impl.rd_op == detail::opcode::text)
  582. {
  583. // check utf8
  584. if(! impl.rd_utf8.write(
  585. buffers_prefix(bytes_written_, bs_)) || (
  586. impl.rd_done && ! impl.rd_utf8.finish()))
  587. {
  588. // _Fail the WebSocket Connection_
  589. code_ = close_code::bad_payload;
  590. result_ = error::bad_frame_payload;
  591. goto close;
  592. }
  593. }
  594. }
  595. goto upcall;
  596. close:
  597. // Acquire the write lock
  598. if(! impl.wr_block.try_lock(this))
  599. {
  600. BOOST_ASIO_CORO_YIELD
  601. {
  602. BOOST_ASIO_HANDLER_LOCATION((
  603. __FILE__, __LINE__,
  604. "websocket::async_read_some"));
  605. impl.op_rd.emplace(std::move(*this));
  606. }
  607. if (ec)
  608. return this->complete(cont, ec, bytes_written_);
  609. impl.wr_block.lock(this);
  610. BOOST_ASIO_CORO_YIELD
  611. {
  612. BOOST_ASIO_HANDLER_LOCATION((
  613. __FILE__, __LINE__,
  614. "websocket::async_read_some"));
  615. const auto ex = this->get_immediate_executor();
  616. net::dispatch(ex, std::move(*this));
  617. }
  618. BOOST_ASSERT(impl.wr_block.is_locked(this));
  619. if(impl.check_stop_now(ec))
  620. goto upcall;
  621. }
  622. impl.change_status(status::closing);
  623. if(! impl.wr_close)
  624. {
  625. impl.wr_close = true;
  626. // Serialize close frame
  627. impl.rd_fb.clear();
  628. impl.template write_close<
  629. flat_static_buffer_base>(
  630. impl.rd_fb, code_);
  631. // Send close frame
  632. BOOST_ASSERT(impl.wr_block.is_locked(this));
  633. BOOST_ASIO_CORO_YIELD
  634. {
  635. BOOST_ASIO_HANDLER_LOCATION((
  636. __FILE__, __LINE__,
  637. "websocket::async_read_some"));
  638. net::async_write(impl.stream(), net::const_buffer(impl.rd_fb.data()),
  639. beast::detail::bind_continuation(std::move(*this)));
  640. }
  641. BOOST_ASSERT(impl.wr_block.is_locked(this));
  642. if(impl.check_stop_now(ec))
  643. goto upcall;
  644. }
  645. // Teardown
  646. using beast::websocket::async_teardown;
  647. BOOST_ASSERT(impl.wr_block.is_locked(this));
  648. BOOST_ASIO_CORO_YIELD
  649. {
  650. BOOST_ASIO_HANDLER_LOCATION((
  651. __FILE__, __LINE__,
  652. "websocket::async_read_some"));
  653. async_teardown(impl.role, impl.stream(),
  654. beast::detail::bind_continuation(std::move(*this)));
  655. }
  656. BOOST_ASSERT(impl.wr_block.is_locked(this));
  657. if(ec == net::error::eof)
  658. {
  659. // Rationale:
  660. // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
  661. ec = {};
  662. }
  663. if(! ec)
  664. {
  665. BOOST_BEAST_ASSIGN_EC(ec, result_);
  666. }
  667. if(ec && ec != error::closed)
  668. impl.change_status(status::failed);
  669. else
  670. impl.change_status(status::closed);
  671. impl.close();
  672. upcall:
  673. impl.rd_block.try_unlock(this);
  674. impl.op_r_close.maybe_invoke();
  675. if(impl.wr_block.try_unlock(this))
  676. impl.op_close.maybe_invoke()
  677. || impl.op_idle_ping.maybe_invoke()
  678. || impl.op_ping.maybe_invoke()
  679. || impl.op_wr.maybe_invoke();
  680. this->complete(cont, ec, bytes_written_);
  681. }
  682. }
  683. };
  684. //------------------------------------------------------------------------------
  685. template<class NextLayer, bool deflateSupported>
  686. template<class Handler, class DynamicBuffer>
  687. class stream<NextLayer, deflateSupported>::read_op
  688. : public beast::async_base<
  689. Handler, beast::executor_type<stream>>
  690. , public asio::coroutine
  691. {
  692. boost::weak_ptr<impl_type> wp_;
  693. DynamicBuffer& b_;
  694. std::size_t limit_;
  695. std::size_t bytes_written_ = 0;
  696. bool some_;
  697. public:
  698. template<class Handler_>
  699. read_op(
  700. Handler_&& h,
  701. boost::shared_ptr<impl_type> const& sp,
  702. DynamicBuffer& b,
  703. std::size_t limit,
  704. bool some)
  705. : async_base<Handler,
  706. beast::executor_type<stream>>(
  707. std::forward<Handler_>(h),
  708. sp->stream().get_executor())
  709. , wp_(sp)
  710. , b_(b)
  711. , limit_(limit ? limit : (
  712. std::numeric_limits<std::size_t>::max)())
  713. , some_(some)
  714. {
  715. (*this)({}, 0, false);
  716. }
  717. void operator()(
  718. error_code ec = {},
  719. std::size_t bytes_transferred = 0,
  720. bool cont = true)
  721. {
  722. using beast::detail::clamp;
  723. auto sp = wp_.lock();
  724. if(! sp)
  725. {
  726. BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
  727. bytes_written_ = 0;
  728. return this->complete(cont, ec, bytes_written_);
  729. }
  730. auto& impl = *sp;
  731. using mutable_buffers_type = typename
  732. DynamicBuffer::mutable_buffers_type;
  733. BOOST_ASIO_CORO_REENTER(*this)
  734. {
  735. do
  736. {
  737. // VFALCO TODO use boost::beast::bind_continuation
  738. BOOST_ASIO_CORO_YIELD
  739. {
  740. auto mb = beast::detail::dynamic_buffer_prepare(b_,
  741. clamp(impl.read_size_hint_db(b_), limit_),
  742. ec, error::buffer_overflow);
  743. if(impl.check_stop_now(ec))
  744. goto upcall;
  745. BOOST_ASIO_HANDLER_LOCATION((
  746. __FILE__, __LINE__,
  747. "websocket::async_read"));
  748. read_some_op<read_op, mutable_buffers_type>(
  749. std::move(*this), sp, *mb);
  750. }
  751. b_.commit(bytes_transferred);
  752. bytes_written_ += bytes_transferred;
  753. if(ec)
  754. goto upcall;
  755. }
  756. while(! some_ && ! impl.rd_done);
  757. upcall:
  758. this->complete(cont, ec, bytes_written_);
  759. }
  760. }
  761. };
  762. template<class NextLayer, bool deflateSupported>
  763. struct stream<NextLayer, deflateSupported>::
  764. run_read_some_op
  765. {
  766. boost::shared_ptr<impl_type> const& self;
  767. using executor_type = typename stream::executor_type;
  768. executor_type
  769. get_executor() const noexcept
  770. {
  771. return self->stream().get_executor();
  772. }
  773. template<
  774. class ReadHandler,
  775. class MutableBufferSequence>
  776. void
  777. operator()(
  778. ReadHandler&& h,
  779. MutableBufferSequence const& b)
  780. {
  781. // If you get an error on the following line it means
  782. // that your handler does not meet the documented type
  783. // requirements for the handler.
  784. static_assert(
  785. beast::detail::is_invocable<ReadHandler,
  786. void(error_code, std::size_t)>::value,
  787. "ReadHandler type requirements not met");
  788. read_some_op<
  789. typename std::decay<ReadHandler>::type,
  790. MutableBufferSequence>(
  791. std::forward<ReadHandler>(h),
  792. self,
  793. b);
  794. }
  795. };
  796. template<class NextLayer, bool deflateSupported>
  797. struct stream<NextLayer, deflateSupported>::
  798. run_read_op
  799. {
  800. boost::shared_ptr<impl_type> const& self;
  801. using executor_type = typename stream::executor_type;
  802. executor_type
  803. get_executor() const noexcept
  804. {
  805. return self->stream().get_executor();
  806. }
  807. template<
  808. class ReadHandler,
  809. class DynamicBuffer>
  810. void
  811. operator()(
  812. ReadHandler&& h,
  813. DynamicBuffer* b,
  814. std::size_t limit,
  815. bool some)
  816. {
  817. // If you get an error on the following line it means
  818. // that your handler does not meet the documented type
  819. // requirements for the handler.
  820. static_assert(
  821. beast::detail::is_invocable<ReadHandler,
  822. void(error_code, std::size_t)>::value,
  823. "ReadHandler type requirements not met");
  824. read_op<
  825. typename std::decay<ReadHandler>::type,
  826. DynamicBuffer>(
  827. std::forward<ReadHandler>(h),
  828. self,
  829. *b,
  830. limit,
  831. some);
  832. }
  833. };
  834. //------------------------------------------------------------------------------
  835. template<class NextLayer, bool deflateSupported>
  836. template<class DynamicBuffer>
  837. std::size_t
  838. stream<NextLayer, deflateSupported>::
  839. read(DynamicBuffer& buffer)
  840. {
  841. static_assert(is_sync_stream<next_layer_type>::value,
  842. "SyncStream type requirements not met");
  843. static_assert(
  844. net::is_dynamic_buffer<DynamicBuffer>::value,
  845. "DynamicBuffer type requirements not met");
  846. error_code ec;
  847. auto const bytes_written = read(buffer, ec);
  848. if(ec)
  849. BOOST_THROW_EXCEPTION(system_error{ec});
  850. return bytes_written;
  851. }
  852. template<class NextLayer, bool deflateSupported>
  853. template<class DynamicBuffer>
  854. std::size_t
  855. stream<NextLayer, deflateSupported>::
  856. read(DynamicBuffer& buffer, error_code& ec)
  857. {
  858. static_assert(is_sync_stream<next_layer_type>::value,
  859. "SyncStream type requirements not met");
  860. static_assert(
  861. net::is_dynamic_buffer<DynamicBuffer>::value,
  862. "DynamicBuffer type requirements not met");
  863. std::size_t bytes_written = 0;
  864. do
  865. {
  866. bytes_written += read_some(buffer, 0, ec);
  867. if(ec)
  868. return bytes_written;
  869. }
  870. while(! is_message_done());
  871. return bytes_written;
  872. }
  873. template<class NextLayer, bool deflateSupported>
  874. template<class DynamicBuffer, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
  875. BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
  876. stream<NextLayer, deflateSupported>::
  877. async_read(DynamicBuffer& buffer, ReadHandler&& handler)
  878. {
  879. static_assert(is_async_stream<next_layer_type>::value,
  880. "AsyncStream type requirements not met");
  881. static_assert(
  882. net::is_dynamic_buffer<DynamicBuffer>::value,
  883. "DynamicBuffer type requirements not met");
  884. return net::async_initiate<
  885. ReadHandler,
  886. void(error_code, std::size_t)>(
  887. run_read_op{impl_},
  888. handler,
  889. &buffer,
  890. 0,
  891. false);
  892. }
  893. //------------------------------------------------------------------------------
  894. template<class NextLayer, bool deflateSupported>
  895. template<class DynamicBuffer>
  896. std::size_t
  897. stream<NextLayer, deflateSupported>::
  898. read_some(
  899. DynamicBuffer& buffer,
  900. std::size_t limit)
  901. {
  902. static_assert(is_sync_stream<next_layer_type>::value,
  903. "SyncStream type requirements not met");
  904. static_assert(
  905. net::is_dynamic_buffer<DynamicBuffer>::value,
  906. "DynamicBuffer type requirements not met");
  907. error_code ec;
  908. auto const bytes_written =
  909. read_some(buffer, limit, ec);
  910. if(ec)
  911. BOOST_THROW_EXCEPTION(system_error{ec});
  912. return bytes_written;
  913. }
  914. template<class NextLayer, bool deflateSupported>
  915. template<class DynamicBuffer>
  916. std::size_t
  917. stream<NextLayer, deflateSupported>::
  918. read_some(
  919. DynamicBuffer& buffer,
  920. std::size_t limit,
  921. error_code& ec)
  922. {
  923. static_assert(is_sync_stream<next_layer_type>::value,
  924. "SyncStream type requirements not met");
  925. static_assert(
  926. net::is_dynamic_buffer<DynamicBuffer>::value,
  927. "DynamicBuffer type requirements not met");
  928. using beast::detail::clamp;
  929. if(! limit)
  930. limit = (std::numeric_limits<std::size_t>::max)();
  931. auto const size =
  932. clamp(impl_->read_size_hint_db(buffer), limit);
  933. BOOST_ASSERT(size > 0);
  934. auto mb = beast::detail::dynamic_buffer_prepare(
  935. buffer, size, ec, error::buffer_overflow);
  936. if(impl_->check_stop_now(ec))
  937. return 0;
  938. auto const bytes_written = read_some(*mb, ec);
  939. buffer.commit(bytes_written);
  940. return bytes_written;
  941. }
  942. template<class NextLayer, bool deflateSupported>
  943. template<class DynamicBuffer, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
  944. BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
  945. stream<NextLayer, deflateSupported>::
  946. async_read_some(
  947. DynamicBuffer& buffer,
  948. std::size_t limit,
  949. ReadHandler&& handler)
  950. {
  951. static_assert(is_async_stream<next_layer_type>::value,
  952. "AsyncStream type requirements not met");
  953. static_assert(
  954. net::is_dynamic_buffer<DynamicBuffer>::value,
  955. "DynamicBuffer type requirements not met");
  956. return net::async_initiate<
  957. ReadHandler,
  958. void(error_code, std::size_t)>(
  959. run_read_op{impl_},
  960. handler,
  961. &buffer,
  962. limit,
  963. true);
  964. }
  965. //------------------------------------------------------------------------------
  966. template<class NextLayer, bool deflateSupported>
  967. template<class MutableBufferSequence>
  968. std::size_t
  969. stream<NextLayer, deflateSupported>::
  970. read_some(
  971. MutableBufferSequence const& buffers)
  972. {
  973. static_assert(is_sync_stream<next_layer_type>::value,
  974. "SyncStream type requirements not met");
  975. static_assert(net::is_mutable_buffer_sequence<
  976. MutableBufferSequence>::value,
  977. "MutableBufferSequence type requirements not met");
  978. error_code ec;
  979. auto const bytes_written = read_some(buffers, ec);
  980. if(ec)
  981. BOOST_THROW_EXCEPTION(system_error{ec});
  982. return bytes_written;
  983. }
  984. template<class NextLayer, bool deflateSupported>
  985. template<class MutableBufferSequence>
  986. std::size_t
  987. stream<NextLayer, deflateSupported>::
  988. read_some(
  989. MutableBufferSequence const& buffers,
  990. error_code& ec)
  991. {
  992. static_assert(is_sync_stream<next_layer_type>::value,
  993. "SyncStream type requirements not met");
  994. static_assert(net::is_mutable_buffer_sequence<
  995. MutableBufferSequence>::value,
  996. "MutableBufferSequence type requirements not met");
  997. using beast::detail::clamp;
  998. auto& impl = *impl_;
  999. close_code code{};
  1000. std::size_t bytes_written = 0;
  1001. ec = {};
  1002. // Make sure the stream is open
  1003. if(impl.check_stop_now(ec))
  1004. return bytes_written;
  1005. loop:
  1006. // See if we need to read a frame header. This
  1007. // condition is structured to give the decompressor
  1008. // a chance to emit the final empty deflate block
  1009. //
  1010. if(impl.rd_remain == 0 && (
  1011. ! impl.rd_fh.fin || impl.rd_done))
  1012. {
  1013. // Read frame header
  1014. error_code result;
  1015. while(! impl.parse_fh(impl.rd_fh, impl.rd_buf, result))
  1016. {
  1017. if(result)
  1018. {
  1019. // _Fail the WebSocket Connection_
  1020. if(result == error::message_too_big)
  1021. code = close_code::too_big;
  1022. else
  1023. code = close_code::protocol_error;
  1024. do_fail(code, result, ec);
  1025. return bytes_written;
  1026. }
  1027. auto const bytes_transferred =
  1028. impl.stream().read_some(
  1029. impl.rd_buf.prepare(read_size(
  1030. impl.rd_buf, impl.rd_buf.max_size())),
  1031. ec);
  1032. impl.rd_buf.commit(bytes_transferred);
  1033. if(impl.check_stop_now(ec))
  1034. return bytes_written;
  1035. }
  1036. // Immediately apply the mask to the portion
  1037. // of the buffer holding payload data.
  1038. if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
  1039. detail::mask_inplace(buffers_prefix(
  1040. clamp(impl.rd_fh.len), impl.rd_buf.data()),
  1041. impl.rd_key);
  1042. if(detail::is_control(impl.rd_fh.op))
  1043. {
  1044. // Get control frame payload
  1045. auto const b = buffers_prefix(
  1046. clamp(impl.rd_fh.len), impl.rd_buf.data());
  1047. auto const len = buffer_bytes(b);
  1048. BOOST_ASSERT(len == impl.rd_fh.len);
  1049. // Clear this otherwise the next
  1050. // frame will be considered final.
  1051. impl.rd_fh.fin = false;
  1052. // Handle ping frame
  1053. if(impl.rd_fh.op == detail::opcode::ping)
  1054. {
  1055. ping_data payload;
  1056. detail::read_ping(payload, b);
  1057. impl.rd_buf.consume(len);
  1058. if(impl.wr_close)
  1059. {
  1060. // Ignore ping when closing
  1061. goto loop;
  1062. }
  1063. if(impl.ctrl_cb)
  1064. impl.ctrl_cb(frame_type::ping, to_string_view(payload));
  1065. detail::frame_buffer fb;
  1066. impl.template write_ping<flat_static_buffer_base>(fb,
  1067. detail::opcode::pong, payload);
  1068. net::write(impl.stream(), fb.data(), ec);
  1069. if(impl.check_stop_now(ec))
  1070. return bytes_written;
  1071. goto loop;
  1072. }
  1073. // Handle pong frame
  1074. if(impl.rd_fh.op == detail::opcode::pong)
  1075. {
  1076. ping_data payload;
  1077. detail::read_ping(payload, b);
  1078. impl.rd_buf.consume(len);
  1079. if(impl.ctrl_cb)
  1080. impl.ctrl_cb(frame_type::pong, to_string_view(payload));
  1081. goto loop;
  1082. }
  1083. // Handle close frame
  1084. BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
  1085. {
  1086. BOOST_ASSERT(! impl.rd_close);
  1087. impl.rd_close = true;
  1088. close_reason cr;
  1089. detail::read_close(cr, b, result);
  1090. if(result)
  1091. {
  1092. // _Fail the WebSocket Connection_
  1093. do_fail(close_code::protocol_error,
  1094. result, ec);
  1095. return bytes_written;
  1096. }
  1097. impl.cr = cr;
  1098. impl.rd_buf.consume(len);
  1099. if(impl.ctrl_cb)
  1100. impl.ctrl_cb(frame_type::close, to_string_view(impl.cr.reason));
  1101. BOOST_ASSERT(! impl.wr_close);
  1102. // _Start the WebSocket Closing Handshake_
  1103. do_fail(
  1104. cr.code == close_code::none ?
  1105. close_code::normal :
  1106. static_cast<close_code>(cr.code),
  1107. error::closed, ec);
  1108. return bytes_written;
  1109. }
  1110. }
  1111. if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
  1112. {
  1113. // Empty non-final frame
  1114. goto loop;
  1115. }
  1116. impl.rd_done = false;
  1117. }
  1118. else
  1119. {
  1120. ec = {};
  1121. }
  1122. if(! impl.rd_deflated())
  1123. {
  1124. if(impl.rd_remain > 0)
  1125. {
  1126. if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
  1127. (std::min)(clamp(impl.rd_remain),
  1128. buffer_bytes(buffers)))
  1129. {
  1130. // Fill the read buffer first, otherwise we
  1131. // get fewer bytes at the cost of one I/O.
  1132. impl.rd_buf.commit(impl.stream().read_some(
  1133. impl.rd_buf.prepare(read_size(impl.rd_buf,
  1134. impl.rd_buf.max_size())), ec));
  1135. if(impl.check_stop_now(ec))
  1136. return bytes_written;
  1137. if(impl.rd_fh.mask)
  1138. detail::mask_inplace(
  1139. buffers_prefix(clamp(impl.rd_remain),
  1140. impl.rd_buf.data()), impl.rd_key);
  1141. }
  1142. if(impl.rd_buf.size() > 0)
  1143. {
  1144. // Copy from the read buffer.
  1145. // The mask was already applied.
  1146. auto const bytes_transferred = net::buffer_copy(
  1147. buffers, impl.rd_buf.data(),
  1148. clamp(impl.rd_remain));
  1149. auto const mb = buffers_prefix(
  1150. bytes_transferred, buffers);
  1151. impl.rd_remain -= bytes_transferred;
  1152. if(impl.rd_op == detail::opcode::text)
  1153. {
  1154. if(! impl.rd_utf8.write(mb) ||
  1155. (impl.rd_remain == 0 && impl.rd_fh.fin &&
  1156. ! impl.rd_utf8.finish()))
  1157. {
  1158. // _Fail the WebSocket Connection_
  1159. do_fail(close_code::bad_payload,
  1160. error::bad_frame_payload, ec);
  1161. return bytes_written;
  1162. }
  1163. }
  1164. bytes_written += bytes_transferred;
  1165. impl.rd_size += bytes_transferred;
  1166. impl.rd_buf.consume(bytes_transferred);
  1167. }
  1168. else
  1169. {
  1170. // Read into caller's buffer
  1171. BOOST_ASSERT(impl.rd_remain > 0);
  1172. BOOST_ASSERT(buffer_bytes(buffers) > 0);
  1173. BOOST_ASSERT(buffer_bytes(buffers_prefix(
  1174. clamp(impl.rd_remain), buffers)) > 0);
  1175. auto const bytes_transferred =
  1176. impl.stream().read_some(buffers_prefix(
  1177. clamp(impl.rd_remain), buffers), ec);
  1178. // VFALCO What if some bytes were written?
  1179. if(impl.check_stop_now(ec))
  1180. return bytes_written;
  1181. BOOST_ASSERT(bytes_transferred > 0);
  1182. auto const mb = buffers_prefix(
  1183. bytes_transferred, buffers);
  1184. impl.rd_remain -= bytes_transferred;
  1185. if(impl.rd_fh.mask)
  1186. detail::mask_inplace(mb, impl.rd_key);
  1187. if(impl.rd_op == detail::opcode::text)
  1188. {
  1189. if(! impl.rd_utf8.write(mb) ||
  1190. (impl.rd_remain == 0 && impl.rd_fh.fin &&
  1191. ! impl.rd_utf8.finish()))
  1192. {
  1193. // _Fail the WebSocket Connection_
  1194. do_fail(close_code::bad_payload,
  1195. error::bad_frame_payload, ec);
  1196. return bytes_written;
  1197. }
  1198. }
  1199. bytes_written += bytes_transferred;
  1200. impl.rd_size += bytes_transferred;
  1201. }
  1202. }
  1203. BOOST_ASSERT( ! impl.rd_done );
  1204. if( impl.rd_remain == 0 && impl.rd_fh.fin )
  1205. impl.rd_done = true;
  1206. }
  1207. else
  1208. {
  1209. // Read compressed message frame payload:
  1210. // inflate even if rd_fh_.len == 0, otherwise we
  1211. // never emit the end-of-stream deflate block.
  1212. //
  1213. bool did_read = false;
  1214. buffers_suffix<MutableBufferSequence> cb(buffers);
  1215. while(buffer_bytes(cb) > 0)
  1216. {
  1217. zlib::z_params zs;
  1218. {
  1219. auto const out = beast::buffers_front(cb);
  1220. zs.next_out = out.data();
  1221. zs.avail_out = out.size();
  1222. BOOST_ASSERT(zs.avail_out > 0);
  1223. }
  1224. // boolean to track the end of the message.
  1225. bool fin = false;
  1226. if(impl.rd_remain > 0)
  1227. {
  1228. if(impl.rd_buf.size() > 0)
  1229. {
  1230. // use what's there
  1231. auto const in = buffers_prefix(
  1232. clamp(impl.rd_remain), beast::buffers_front(
  1233. impl.rd_buf.data()));
  1234. zs.avail_in = in.size();
  1235. zs.next_in = in.data();
  1236. }
  1237. else if(! did_read)
  1238. {
  1239. // read new
  1240. auto const bytes_transferred =
  1241. impl.stream().read_some(
  1242. impl.rd_buf.prepare(read_size(
  1243. impl.rd_buf, impl.rd_buf.max_size())),
  1244. ec);
  1245. if(impl.check_stop_now(ec))
  1246. return bytes_written;
  1247. BOOST_ASSERT(bytes_transferred > 0);
  1248. impl.rd_buf.commit(bytes_transferred);
  1249. if(impl.rd_fh.mask)
  1250. detail::mask_inplace(
  1251. buffers_prefix(clamp(impl.rd_remain),
  1252. impl.rd_buf.data()), impl.rd_key);
  1253. auto const in = buffers_prefix(
  1254. clamp(impl.rd_remain), buffers_front(
  1255. impl.rd_buf.data()));
  1256. zs.avail_in = in.size();
  1257. zs.next_in = in.data();
  1258. did_read = true;
  1259. }
  1260. else
  1261. {
  1262. break;
  1263. }
  1264. }
  1265. else if(impl.rd_fh.fin)
  1266. {
  1267. // append the empty block codes
  1268. static std::uint8_t constexpr
  1269. empty_block[4] = { 0x00, 0x00, 0xff, 0xff };
  1270. zs.next_in = empty_block;
  1271. zs.avail_in = sizeof(empty_block);
  1272. fin = true;
  1273. }
  1274. else
  1275. {
  1276. break;
  1277. }
  1278. impl.inflate(zs, zlib::Flush::sync, ec);
  1279. if(impl.check_stop_now(ec))
  1280. return bytes_written;
  1281. if (fin && zs.total_out == 0) {
  1282. impl.do_context_takeover_read(impl.role);
  1283. impl.rd_done = true;
  1284. break;
  1285. }
  1286. if(impl.rd_msg_max && beast::detail::sum_exceeds(
  1287. impl.rd_size, zs.total_out, impl.rd_msg_max))
  1288. {
  1289. do_fail(close_code::too_big,
  1290. error::message_too_big, ec);
  1291. return bytes_written;
  1292. }
  1293. cb.consume(zs.total_out);
  1294. impl.rd_size += zs.total_out;
  1295. if (! fin) {
  1296. impl.rd_remain -= zs.total_in;
  1297. impl.rd_buf.consume(zs.total_in);
  1298. }
  1299. bytes_written += zs.total_out;
  1300. }
  1301. if(impl.rd_op == detail::opcode::text)
  1302. {
  1303. // check utf8
  1304. if(! impl.rd_utf8.write(beast::buffers_prefix(
  1305. bytes_written, buffers)) || (
  1306. impl.rd_done && ! impl.rd_utf8.finish()))
  1307. {
  1308. // _Fail the WebSocket Connection_
  1309. do_fail(close_code::bad_payload,
  1310. error::bad_frame_payload, ec);
  1311. return bytes_written;
  1312. }
  1313. }
  1314. }
  1315. return bytes_written;
  1316. }
  1317. template<class NextLayer, bool deflateSupported>
  1318. template<class MutableBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
  1319. BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
  1320. stream<NextLayer, deflateSupported>::
  1321. async_read_some(
  1322. MutableBufferSequence const& buffers,
  1323. ReadHandler&& handler)
  1324. {
  1325. static_assert(is_async_stream<next_layer_type>::value,
  1326. "AsyncStream type requirements not met");
  1327. static_assert(net::is_mutable_buffer_sequence<
  1328. MutableBufferSequence>::value,
  1329. "MutableBufferSequence type requirements not met");
  1330. return net::async_initiate<
  1331. ReadHandler,
  1332. void(error_code, std::size_t)>(
  1333. run_read_some_op{impl_},
  1334. handler,
  1335. buffers);
  1336. }
  1337. } // websocket
  1338. } // beast
  1339. } // boost
  1340. #endif