write.hpp 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904
  1. //
  2. // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // Official repository: https://github.com/boostorg/beast
  8. //
  9. #ifndef BHO_BEAST_WEBSOCKET_IMPL_WRITE_HPP
  10. #define BHO_BEAST_WEBSOCKET_IMPL_WRITE_HPP
  11. #include <asio2/bho/beast/websocket/detail/mask.hpp>
  12. #include <asio2/bho/beast/core/async_base.hpp>
  13. #include <asio2/bho/beast/core/bind_handler.hpp>
  14. #include <asio2/bho/beast/core/buffer_traits.hpp>
  15. #include <asio2/bho/beast/core/buffers_cat.hpp>
  16. #include <asio2/bho/beast/core/buffers_prefix.hpp>
  17. #include <asio2/bho/beast/core/buffers_range.hpp>
  18. #include <asio2/bho/beast/core/buffers_suffix.hpp>
  19. #include <asio2/bho/beast/core/flat_static_buffer.hpp>
  20. #include <asio2/bho/beast/core/stream_traits.hpp>
  21. #include <asio2/bho/beast/core/detail/bind_continuation.hpp>
  22. #include <asio2/bho/beast/core/detail/clamp.hpp>
  23. #include <asio2/bho/beast/core/detail/config.hpp>
  24. #include <asio2/bho/beast/websocket/detail/frame.hpp>
  25. #include <asio2/bho/beast/websocket/impl/stream_impl.hpp>
  26. #include <asio/coroutine.hpp>
  27. #include <asio2/bho/assert.hpp>
  28. #include <asio2/bho/config.hpp>
  29. #include <asio2/bho/throw_exception.hpp>
  30. #include <algorithm>
  31. #include <memory>
  32. namespace bho {
  33. namespace beast {
  34. namespace websocket {
  35. template<class NextLayer, bool deflateSupported>
  36. template<class Handler, class Buffers>
  37. class stream<NextLayer, deflateSupported>::write_some_op
  38. : public beast::async_base<
  39. Handler, beast::executor_type<stream>>
  40. , public asio::coroutine
  41. {
  42. enum
  43. {
  44. do_nomask_nofrag,
  45. do_nomask_frag,
  46. do_mask_nofrag,
  47. do_mask_frag,
  48. do_deflate
  49. };
  50. std::weak_ptr<impl_type> wp_;
  51. buffers_suffix<Buffers> cb_;
  52. detail::frame_header fh_;
  53. detail::prepared_key key_;
  54. std::size_t bytes_transferred_ = 0;
  55. std::size_t remain_;
  56. std::size_t in_;
  57. int how_;
  58. bool fin_;
  59. bool more_ = false; // for ubsan
  60. bool cont_ = false;
  61. public:
  62. static constexpr int id = 2; // for soft_mutex
  63. template<class Handler_>
  64. write_some_op(
  65. Handler_&& h,
  66. std::shared_ptr<impl_type> const& sp,
  67. bool fin,
  68. Buffers const& bs)
  69. : beast::async_base<Handler,
  70. beast::executor_type<stream>>(
  71. std::forward<Handler_>(h),
  72. sp->stream().get_executor())
  73. , wp_(sp)
  74. , cb_(bs)
  75. , fin_(fin)
  76. {
  77. auto& impl = *sp;
  78. // Set up the outgoing frame header
  79. if(! impl.wr_cont)
  80. {
  81. impl.begin_msg(beast::buffer_bytes(bs));
  82. fh_.rsv1 = impl.wr_compress;
  83. }
  84. else
  85. {
  86. fh_.rsv1 = false;
  87. }
  88. fh_.rsv2 = false;
  89. fh_.rsv3 = false;
  90. fh_.op = impl.wr_cont ?
  91. detail::opcode::cont : impl.wr_opcode;
  92. fh_.mask =
  93. impl.role == role_type::client;
  94. // Choose a write algorithm
  95. if(impl.wr_compress)
  96. {
  97. how_ = do_deflate;
  98. }
  99. else if(! fh_.mask)
  100. {
  101. if(! impl.wr_frag)
  102. {
  103. how_ = do_nomask_nofrag;
  104. }
  105. else
  106. {
  107. BHO_ASSERT(impl.wr_buf_size != 0);
  108. remain_ = beast::buffer_bytes(cb_);
  109. if(remain_ > impl.wr_buf_size)
  110. how_ = do_nomask_frag;
  111. else
  112. how_ = do_nomask_nofrag;
  113. }
  114. }
  115. else
  116. {
  117. if(! impl.wr_frag)
  118. {
  119. how_ = do_mask_nofrag;
  120. }
  121. else
  122. {
  123. BHO_ASSERT(impl.wr_buf_size != 0);
  124. remain_ = beast::buffer_bytes(cb_);
  125. if(remain_ > impl.wr_buf_size)
  126. how_ = do_mask_frag;
  127. else
  128. how_ = do_mask_nofrag;
  129. }
  130. }
  131. (*this)({}, 0, false);
  132. }
  133. void operator()(
  134. error_code ec = {},
  135. std::size_t bytes_transferred = 0,
  136. bool cont = true);
  137. };
  138. template<class NextLayer, bool deflateSupported>
  139. template<class Handler, class Buffers>
  140. void
  141. stream<NextLayer, deflateSupported>::
  142. write_some_op<Handler, Buffers>::
  143. operator()(
  144. error_code ec,
  145. std::size_t bytes_transferred,
  146. bool cont)
  147. {
  148. using beast::detail::clamp;
  149. std::size_t n;
  150. net::mutable_buffer b;
  151. auto sp = wp_.lock();
  152. if(! sp)
  153. {
  154. BHO_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
  155. bytes_transferred_ = 0;
  156. return this->complete(cont, ec, bytes_transferred_);
  157. }
  158. auto& impl = *sp;
  159. ASIO_CORO_REENTER(*this)
  160. {
  161. // Acquire the write lock
  162. if(! impl.wr_block.try_lock(this))
  163. {
  164. do_suspend:
  165. ASIO_CORO_YIELD
  166. {
  167. ASIO_HANDLER_LOCATION((
  168. __FILE__, __LINE__,
  169. fin_ ?
  170. "websocket::async_write" :
  171. "websocket::async_write_some"
  172. ));
  173. this->set_allowed_cancellation(net::cancellation_type::all);
  174. impl.op_wr.emplace(std::move(*this),
  175. net::cancellation_type::all);
  176. }
  177. if (ec)
  178. return this->complete(cont, ec, bytes_transferred_);
  179. this->set_allowed_cancellation(net::cancellation_type::terminal);
  180. impl.wr_block.lock(this);
  181. ASIO_CORO_YIELD
  182. {
  183. ASIO_HANDLER_LOCATION((
  184. __FILE__, __LINE__,
  185. fin_ ?
  186. "websocket::async_write" :
  187. "websocket::async_write_some"
  188. ));
  189. const auto ex = this->get_immediate_executor();
  190. net::dispatch(ex, std::move(*this));
  191. }
  192. BHO_ASSERT(impl.wr_block.is_locked(this));
  193. }
  194. if(impl.check_stop_now(ec))
  195. goto upcall;
  196. //------------------------------------------------------------------
  197. if(how_ == do_nomask_nofrag)
  198. {
  199. // send a single frame
  200. fh_.fin = fin_;
  201. fh_.len = beast::buffer_bytes(cb_);
  202. impl.wr_fb.clear();
  203. detail::write<flat_static_buffer_base>(
  204. impl.wr_fb, fh_);
  205. impl.wr_cont = ! fin_;
  206. ASIO_CORO_YIELD
  207. {
  208. ASIO_HANDLER_LOCATION((
  209. __FILE__, __LINE__,
  210. fin_ ?
  211. "websocket::async_write" :
  212. "websocket::async_write_some"
  213. ));
  214. net::async_write(impl.stream(),
  215. buffers_cat(
  216. net::const_buffer(impl.wr_fb.data()),
  217. net::const_buffer(0, 0),
  218. cb_,
  219. buffers_prefix(0, cb_)
  220. ),
  221. beast::detail::bind_continuation(std::move(*this)));
  222. }
  223. bytes_transferred_ += clamp(fh_.len);
  224. if(impl.check_stop_now(ec))
  225. goto upcall;
  226. goto upcall;
  227. }
  228. //------------------------------------------------------------------
  229. if(how_ == do_nomask_frag)
  230. {
  231. // send multiple frames
  232. for(;;)
  233. {
  234. n = clamp(remain_, impl.wr_buf_size);
  235. fh_.len = n;
  236. remain_ -= n;
  237. fh_.fin = fin_ ? remain_ == 0 : false;
  238. impl.wr_fb.clear();
  239. detail::write<flat_static_buffer_base>(
  240. impl.wr_fb, fh_);
  241. impl.wr_cont = ! fin_;
  242. // Send frame
  243. ASIO_CORO_YIELD
  244. {
  245. ASIO_HANDLER_LOCATION((
  246. __FILE__, __LINE__,
  247. fin_ ?
  248. "websocket::async_write" :
  249. "websocket::async_write_some"
  250. ));
  251. buffers_suffix<Buffers> empty_cb(cb_);
  252. empty_cb.consume(~std::size_t(0));
  253. net::async_write(impl.stream(),
  254. buffers_cat(
  255. net::const_buffer(impl.wr_fb.data()),
  256. net::const_buffer(0, 0),
  257. empty_cb,
  258. buffers_prefix(clamp(fh_.len), cb_)
  259. ),
  260. beast::detail::bind_continuation(std::move(*this)));
  261. }
  262. n = clamp(fh_.len); // restore `n` on yield
  263. bytes_transferred_ += n;
  264. if(impl.check_stop_now(ec))
  265. goto upcall;
  266. if(remain_ == 0)
  267. break;
  268. cb_.consume(n);
  269. fh_.op = detail::opcode::cont;
  270. // Give up the write lock in between each frame
  271. // so that outgoing control frames might be sent.
  272. impl.wr_block.unlock(this);
  273. if( impl.op_close.maybe_invoke()
  274. || impl.op_idle_ping.maybe_invoke()
  275. || impl.op_rd.maybe_invoke()
  276. || impl.op_ping.maybe_invoke())
  277. {
  278. BHO_ASSERT(impl.wr_block.is_locked());
  279. goto do_suspend;
  280. }
  281. impl.wr_block.lock(this);
  282. }
  283. goto upcall;
  284. }
  285. //------------------------------------------------------------------
  286. if(how_ == do_mask_nofrag)
  287. {
  288. // send a single frame using multiple writes
  289. remain_ = beast::buffer_bytes(cb_);
  290. fh_.fin = fin_;
  291. fh_.len = remain_;
  292. fh_.key = impl.create_mask();
  293. detail::prepare_key(key_, fh_.key);
  294. impl.wr_fb.clear();
  295. detail::write<flat_static_buffer_base>(
  296. impl.wr_fb, fh_);
  297. n = clamp(remain_, impl.wr_buf_size);
  298. net::buffer_copy(net::buffer(
  299. impl.wr_buf.get(), n), cb_);
  300. detail::mask_inplace(net::buffer(
  301. impl.wr_buf.get(), n), key_);
  302. remain_ -= n;
  303. impl.wr_cont = ! fin_;
  304. // write frame header and some payload
  305. ASIO_CORO_YIELD
  306. {
  307. ASIO_HANDLER_LOCATION((
  308. __FILE__, __LINE__,
  309. fin_ ?
  310. "websocket::async_write" :
  311. "websocket::async_write_some"
  312. ));
  313. buffers_suffix<Buffers> empty_cb(cb_);
  314. empty_cb.consume(~std::size_t(0));
  315. net::async_write(impl.stream(),
  316. buffers_cat(
  317. net::const_buffer(impl.wr_fb.data()),
  318. net::const_buffer(net::buffer(impl.wr_buf.get(), n)),
  319. empty_cb,
  320. buffers_prefix(0, empty_cb)
  321. ),
  322. beast::detail::bind_continuation(std::move(*this)));
  323. }
  324. // VFALCO What about consuming the buffer on error?
  325. bytes_transferred_ +=
  326. bytes_transferred - impl.wr_fb.size();
  327. if(impl.check_stop_now(ec))
  328. goto upcall;
  329. while(remain_ > 0)
  330. {
  331. cb_.consume(impl.wr_buf_size);
  332. n = clamp(remain_, impl.wr_buf_size);
  333. net::buffer_copy(net::buffer(
  334. impl.wr_buf.get(), n), cb_);
  335. detail::mask_inplace(net::buffer(
  336. impl.wr_buf.get(), n), key_);
  337. remain_ -= n;
  338. // write more payload
  339. ASIO_CORO_YIELD
  340. {
  341. ASIO_HANDLER_LOCATION((
  342. __FILE__, __LINE__,
  343. fin_ ?
  344. "websocket::async_write" :
  345. "websocket::async_write_some"
  346. ));
  347. buffers_suffix<Buffers> empty_cb(cb_);
  348. empty_cb.consume(~std::size_t(0));
  349. net::async_write(impl.stream(),
  350. buffers_cat(
  351. net::const_buffer(0, 0),
  352. net::const_buffer(net::buffer(impl.wr_buf.get(), n)),
  353. empty_cb,
  354. buffers_prefix(0, empty_cb)
  355. ),
  356. beast::detail::bind_continuation(std::move(*this)));
  357. }
  358. bytes_transferred_ += bytes_transferred;
  359. if(impl.check_stop_now(ec))
  360. goto upcall;
  361. }
  362. goto upcall;
  363. }
  364. //------------------------------------------------------------------
  365. if(how_ == do_mask_frag)
  366. {
  367. // send multiple frames
  368. for(;;)
  369. {
  370. n = clamp(remain_, impl.wr_buf_size);
  371. remain_ -= n;
  372. fh_.len = n;
  373. fh_.key = impl.create_mask();
  374. fh_.fin = fin_ ? remain_ == 0 : false;
  375. detail::prepare_key(key_, fh_.key);
  376. net::buffer_copy(net::buffer(
  377. impl.wr_buf.get(), n), cb_);
  378. detail::mask_inplace(net::buffer(
  379. impl.wr_buf.get(), n), key_);
  380. impl.wr_fb.clear();
  381. detail::write<flat_static_buffer_base>(
  382. impl.wr_fb, fh_);
  383. impl.wr_cont = ! fin_;
  384. // Send frame
  385. ASIO_CORO_YIELD
  386. {
  387. ASIO_HANDLER_LOCATION((
  388. __FILE__, __LINE__,
  389. fin_ ?
  390. "websocket::async_write" :
  391. "websocket::async_write_some"
  392. ));
  393. buffers_suffix<Buffers> empty_cb(cb_);
  394. empty_cb.consume(~std::size_t(0));
  395. net::async_write(impl.stream(),
  396. buffers_cat(
  397. net::const_buffer(impl.wr_fb.data()),
  398. net::const_buffer(net::buffer(impl.wr_buf.get(), n)),
  399. empty_cb,
  400. buffers_prefix(0, empty_cb)
  401. ),
  402. beast::detail::bind_continuation(std::move(*this)));
  403. }
  404. n = bytes_transferred - impl.wr_fb.size();
  405. bytes_transferred_ += n;
  406. if(impl.check_stop_now(ec))
  407. goto upcall;
  408. if(remain_ == 0)
  409. break;
  410. cb_.consume(n);
  411. fh_.op = detail::opcode::cont;
  412. // Give up the write lock in between each frame
  413. // so that outgoing control frames might be sent.
  414. impl.wr_block.unlock(this);
  415. if( impl.op_close.maybe_invoke()
  416. || impl.op_idle_ping.maybe_invoke()
  417. || impl.op_rd.maybe_invoke()
  418. || impl.op_ping.maybe_invoke())
  419. {
  420. BHO_ASSERT(impl.wr_block.is_locked());
  421. goto do_suspend;
  422. }
  423. impl.wr_block.lock(this);
  424. }
  425. goto upcall;
  426. }
  427. //------------------------------------------------------------------
  428. if(how_ == do_deflate)
  429. {
  430. // send compressed frames
  431. for(;;)
  432. {
  433. b = net::buffer(impl.wr_buf.get(),
  434. impl.wr_buf_size);
  435. more_ = impl.deflate(b, cb_, fin_, in_, ec);
  436. if(impl.check_stop_now(ec))
  437. goto upcall;
  438. n = beast::buffer_bytes(b);
  439. if(n == 0)
  440. {
  441. // The input was consumed, but there is
  442. // no output due to compression latency.
  443. BHO_ASSERT(! fin_);
  444. BHO_ASSERT(beast::buffer_bytes(cb_) == 0);
  445. goto upcall;
  446. }
  447. if(fh_.mask)
  448. {
  449. fh_.key = impl.create_mask();
  450. detail::prepared_key key;
  451. detail::prepare_key(key, fh_.key);
  452. detail::mask_inplace(b, key);
  453. }
  454. fh_.fin = ! more_;
  455. fh_.len = n;
  456. impl.wr_fb.clear();
  457. detail::write<
  458. flat_static_buffer_base>(impl.wr_fb, fh_);
  459. impl.wr_cont = ! fin_;
  460. // Send frame
  461. ASIO_CORO_YIELD
  462. {
  463. ASIO_HANDLER_LOCATION((
  464. __FILE__, __LINE__,
  465. fin_ ?
  466. "websocket::async_write" :
  467. "websocket::async_write_some"
  468. ));
  469. buffers_suffix<Buffers> empty_cb(cb_);
  470. empty_cb.consume(~std::size_t(0));
  471. net::async_write(impl.stream(),
  472. buffers_cat(
  473. net::const_buffer(impl.wr_fb.data()),
  474. net::const_buffer(b),
  475. empty_cb,
  476. buffers_prefix(0, empty_cb)
  477. ),
  478. beast::detail::bind_continuation(std::move(*this)));
  479. }
  480. bytes_transferred_ += in_;
  481. if(impl.check_stop_now(ec))
  482. goto upcall;
  483. if(more_)
  484. {
  485. fh_.op = detail::opcode::cont;
  486. fh_.rsv1 = false;
  487. // Give up the write lock in between each frame
  488. // so that outgoing control frames might be sent.
  489. impl.wr_block.unlock(this);
  490. if( impl.op_close.maybe_invoke()
  491. || impl.op_idle_ping.maybe_invoke()
  492. || impl.op_rd.maybe_invoke()
  493. || impl.op_ping.maybe_invoke())
  494. {
  495. BHO_ASSERT(impl.wr_block.is_locked());
  496. goto do_suspend;
  497. }
  498. impl.wr_block.lock(this);
  499. }
  500. else
  501. {
  502. if(fh_.fin)
  503. impl.do_context_takeover_write(impl.role);
  504. goto upcall;
  505. }
  506. }
  507. }
  508. //--------------------------------------------------------------------------
  509. upcall:
  510. impl.wr_block.unlock(this);
  511. impl.op_close.maybe_invoke()
  512. || impl.op_idle_ping.maybe_invoke()
  513. || impl.op_rd.maybe_invoke()
  514. || impl.op_ping.maybe_invoke();
  515. this->complete(cont, ec, bytes_transferred_);
  516. }
  517. }
  518. template<class NextLayer, bool deflateSupported>
  519. struct stream<NextLayer, deflateSupported>::
  520. run_write_some_op
  521. {
  522. template<
  523. class WriteHandler,
  524. class ConstBufferSequence>
  525. void
  526. operator()(
  527. WriteHandler&& h,
  528. std::shared_ptr<impl_type> const& sp,
  529. bool fin,
  530. ConstBufferSequence const& b)
  531. {
  532. // If you get an error on the following line it means
  533. // that your handler does not meet the documented type
  534. // requirements for the handler.
  535. static_assert(
  536. beast::detail::is_invocable<WriteHandler,
  537. void(error_code, std::size_t)>::value,
  538. "WriteHandler type requirements not met");
  539. write_some_op<
  540. typename std::decay<WriteHandler>::type,
  541. ConstBufferSequence>(
  542. std::forward<WriteHandler>(h),
  543. sp,
  544. fin,
  545. b);
  546. }
  547. };
  548. //------------------------------------------------------------------------------
  549. template<class NextLayer, bool deflateSupported>
  550. template<class ConstBufferSequence>
  551. std::size_t
  552. stream<NextLayer, deflateSupported>::
  553. write_some(bool fin, ConstBufferSequence const& buffers)
  554. {
  555. static_assert(is_sync_stream<next_layer_type>::value,
  556. "SyncStream type requirements not met");
  557. static_assert(net::is_const_buffer_sequence<
  558. ConstBufferSequence>::value,
  559. "ConstBufferSequence type requirements not met");
  560. error_code ec;
  561. auto const bytes_transferred =
  562. write_some(fin, buffers, ec);
  563. if(ec)
  564. BHO_THROW_EXCEPTION(system_error{ec});
  565. return bytes_transferred;
  566. }
  567. template<class NextLayer, bool deflateSupported>
  568. template<class ConstBufferSequence>
  569. std::size_t
  570. stream<NextLayer, deflateSupported>::
  571. write_some(bool fin,
  572. ConstBufferSequence const& buffers, error_code& ec)
  573. {
  574. static_assert(is_sync_stream<next_layer_type>::value,
  575. "SyncStream type requirements not met");
  576. static_assert(net::is_const_buffer_sequence<
  577. ConstBufferSequence>::value,
  578. "ConstBufferSequence type requirements not met");
  579. using beast::detail::clamp;
  580. auto& impl = *impl_;
  581. std::size_t bytes_transferred = 0;
  582. ec = {};
  583. if(impl.check_stop_now(ec))
  584. return bytes_transferred;
  585. detail::frame_header fh;
  586. if(! impl.wr_cont)
  587. {
  588. impl.begin_msg(beast::buffer_bytes(buffers));
  589. fh.rsv1 = impl.wr_compress;
  590. }
  591. else
  592. {
  593. fh.rsv1 = false;
  594. }
  595. fh.rsv2 = false;
  596. fh.rsv3 = false;
  597. fh.op = impl.wr_cont ?
  598. detail::opcode::cont : impl.wr_opcode;
  599. fh.mask = impl.role == role_type::client;
  600. auto remain = beast::buffer_bytes(buffers);
  601. if(impl.wr_compress)
  602. {
  603. buffers_suffix<
  604. ConstBufferSequence> cb(buffers);
  605. for(;;)
  606. {
  607. auto b = net::buffer(
  608. impl.wr_buf.get(), impl.wr_buf_size);
  609. auto const more = impl.deflate(
  610. b, cb, fin, bytes_transferred, ec);
  611. if(impl.check_stop_now(ec))
  612. return bytes_transferred;
  613. auto const n = beast::buffer_bytes(b);
  614. if(n == 0)
  615. {
  616. // The input was consumed, but there
  617. // is no output due to compression
  618. // latency.
  619. BHO_ASSERT(! fin);
  620. BHO_ASSERT(beast::buffer_bytes(cb) == 0);
  621. fh.fin = false;
  622. break;
  623. }
  624. if(fh.mask)
  625. {
  626. fh.key = this->impl_->create_mask();
  627. detail::prepared_key key;
  628. detail::prepare_key(key, fh.key);
  629. detail::mask_inplace(b, key);
  630. }
  631. fh.fin = ! more;
  632. fh.len = n;
  633. detail::fh_buffer fh_buf;
  634. detail::write<
  635. flat_static_buffer_base>(fh_buf, fh);
  636. impl.wr_cont = ! fin;
  637. net::write(impl.stream(),
  638. buffers_cat(fh_buf.data(), b), ec);
  639. if(impl.check_stop_now(ec))
  640. return bytes_transferred;
  641. if(! more)
  642. break;
  643. fh.op = detail::opcode::cont;
  644. fh.rsv1 = false;
  645. }
  646. if(fh.fin)
  647. impl.do_context_takeover_write(impl.role);
  648. }
  649. else if(! fh.mask)
  650. {
  651. if(! impl.wr_frag)
  652. {
  653. // no mask, no autofrag
  654. fh.fin = fin;
  655. fh.len = remain;
  656. detail::fh_buffer fh_buf;
  657. detail::write<
  658. flat_static_buffer_base>(fh_buf, fh);
  659. impl.wr_cont = ! fin;
  660. net::write(impl.stream(),
  661. buffers_cat(fh_buf.data(), buffers), ec);
  662. if(impl.check_stop_now(ec))
  663. return bytes_transferred;
  664. bytes_transferred += remain;
  665. }
  666. else
  667. {
  668. // no mask, autofrag
  669. BHO_ASSERT(impl.wr_buf_size != 0);
  670. buffers_suffix<
  671. ConstBufferSequence> cb{buffers};
  672. for(;;)
  673. {
  674. auto const n = clamp(remain, impl.wr_buf_size);
  675. remain -= n;
  676. fh.len = n;
  677. fh.fin = fin ? remain == 0 : false;
  678. detail::fh_buffer fh_buf;
  679. detail::write<
  680. flat_static_buffer_base>(fh_buf, fh);
  681. impl.wr_cont = ! fin;
  682. net::write(impl.stream(),
  683. beast::buffers_cat(fh_buf.data(),
  684. beast::buffers_prefix(n, cb)), ec);
  685. bytes_transferred += n;
  686. if(impl.check_stop_now(ec))
  687. return bytes_transferred;
  688. if(remain == 0)
  689. break;
  690. fh.op = detail::opcode::cont;
  691. cb.consume(n);
  692. }
  693. }
  694. }
  695. else if(! impl.wr_frag)
  696. {
  697. // mask, no autofrag
  698. fh.fin = fin;
  699. fh.len = remain;
  700. fh.key = this->impl_->create_mask();
  701. detail::prepared_key key;
  702. detail::prepare_key(key, fh.key);
  703. detail::fh_buffer fh_buf;
  704. detail::write<
  705. flat_static_buffer_base>(fh_buf, fh);
  706. buffers_suffix<
  707. ConstBufferSequence> cb{buffers};
  708. {
  709. auto const n =
  710. clamp(remain, impl.wr_buf_size);
  711. auto const b =
  712. net::buffer(impl.wr_buf.get(), n);
  713. net::buffer_copy(b, cb);
  714. cb.consume(n);
  715. remain -= n;
  716. detail::mask_inplace(b, key);
  717. impl.wr_cont = ! fin;
  718. net::write(impl.stream(),
  719. buffers_cat(fh_buf.data(), b), ec);
  720. bytes_transferred += n;
  721. if(impl.check_stop_now(ec))
  722. return bytes_transferred;
  723. }
  724. while(remain > 0)
  725. {
  726. auto const n =
  727. clamp(remain, impl.wr_buf_size);
  728. auto const b =
  729. net::buffer(impl.wr_buf.get(), n);
  730. net::buffer_copy(b, cb);
  731. cb.consume(n);
  732. remain -= n;
  733. detail::mask_inplace(b, key);
  734. net::write(impl.stream(), b, ec);
  735. bytes_transferred += n;
  736. if(impl.check_stop_now(ec))
  737. return bytes_transferred;
  738. }
  739. }
  740. else
  741. {
  742. // mask, autofrag
  743. BHO_ASSERT(impl.wr_buf_size != 0);
  744. buffers_suffix<
  745. ConstBufferSequence> cb(buffers);
  746. for(;;)
  747. {
  748. fh.key = this->impl_->create_mask();
  749. detail::prepared_key key;
  750. detail::prepare_key(key, fh.key);
  751. auto const n =
  752. clamp(remain, impl.wr_buf_size);
  753. auto const b =
  754. net::buffer(impl.wr_buf.get(), n);
  755. net::buffer_copy(b, cb);
  756. detail::mask_inplace(b, key);
  757. fh.len = n;
  758. remain -= n;
  759. fh.fin = fin ? remain == 0 : false;
  760. impl.wr_cont = ! fh.fin;
  761. detail::fh_buffer fh_buf;
  762. detail::write<
  763. flat_static_buffer_base>(fh_buf, fh);
  764. net::write(impl.stream(),
  765. buffers_cat(fh_buf.data(), b), ec);
  766. bytes_transferred += n;
  767. if(impl.check_stop_now(ec))
  768. return bytes_transferred;
  769. if(remain == 0)
  770. break;
  771. fh.op = detail::opcode::cont;
  772. cb.consume(n);
  773. }
  774. }
  775. return bytes_transferred;
  776. }
  777. template<class NextLayer, bool deflateSupported>
  778. template<class ConstBufferSequence, BHO_BEAST_ASYNC_TPARAM2 WriteHandler>
  779. BHO_BEAST_ASYNC_RESULT2(WriteHandler)
  780. stream<NextLayer, deflateSupported>::
  781. async_write_some(bool fin,
  782. ConstBufferSequence const& bs, WriteHandler&& handler)
  783. {
  784. static_assert(is_async_stream<next_layer_type>::value,
  785. "AsyncStream type requirements not met");
  786. static_assert(net::is_const_buffer_sequence<
  787. ConstBufferSequence>::value,
  788. "ConstBufferSequence type requirements not met");
  789. return net::async_initiate<
  790. WriteHandler,
  791. void(error_code, std::size_t)>(
  792. run_write_some_op{},
  793. handler,
  794. impl_,
  795. fin,
  796. bs);
  797. }
  798. //------------------------------------------------------------------------------
  799. template<class NextLayer, bool deflateSupported>
  800. template<class ConstBufferSequence>
  801. std::size_t
  802. stream<NextLayer, deflateSupported>::
  803. write(ConstBufferSequence const& buffers)
  804. {
  805. static_assert(is_sync_stream<next_layer_type>::value,
  806. "SyncStream type requirements not met");
  807. static_assert(net::is_const_buffer_sequence<
  808. ConstBufferSequence>::value,
  809. "ConstBufferSequence type requirements not met");
  810. error_code ec;
  811. auto const bytes_transferred = write(buffers, ec);
  812. if(ec)
  813. BHO_THROW_EXCEPTION(system_error{ec});
  814. return bytes_transferred;
  815. }
  816. template<class NextLayer, bool deflateSupported>
  817. template<class ConstBufferSequence>
  818. std::size_t
  819. stream<NextLayer, deflateSupported>::
  820. write(ConstBufferSequence const& buffers, error_code& ec)
  821. {
  822. static_assert(is_sync_stream<next_layer_type>::value,
  823. "SyncStream type requirements not met");
  824. static_assert(net::is_const_buffer_sequence<
  825. ConstBufferSequence>::value,
  826. "ConstBufferSequence type requirements not met");
  827. return write_some(true, buffers, ec);
  828. }
  829. template<class NextLayer, bool deflateSupported>
  830. template<class ConstBufferSequence, BHO_BEAST_ASYNC_TPARAM2 WriteHandler>
  831. BHO_BEAST_ASYNC_RESULT2(WriteHandler)
  832. stream<NextLayer, deflateSupported>::
  833. async_write(
  834. ConstBufferSequence const& bs, WriteHandler&& handler)
  835. {
  836. static_assert(is_async_stream<next_layer_type>::value,
  837. "AsyncStream type requirements not met");
  838. static_assert(net::is_const_buffer_sequence<
  839. ConstBufferSequence>::value,
  840. "ConstBufferSequence type requirements not met");
  841. return net::async_initiate<
  842. WriteHandler,
  843. void(error_code, std::size_t)>(
  844. run_write_some_op{},
  845. handler,
  846. impl_,
  847. true,
  848. bs);
  849. }
  850. } // websocket
  851. } // beast
  852. } // bho
  853. #endif