basic_stream.hpp 31 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123
  1. //
  2. // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // Official repository: https://github.com/boostorg/beast
  8. //
  9. #ifndef BOOST_BEAST_CORE_IMPL_BASIC_STREAM_HPP
  10. #define BOOST_BEAST_CORE_IMPL_BASIC_STREAM_HPP
  11. #include <boost/beast/core/async_base.hpp>
  12. #include <boost/beast/core/buffer_traits.hpp>
  13. #include <boost/beast/core/buffers_prefix.hpp>
  14. #include <boost/beast/websocket/teardown.hpp>
  15. #include <boost/asio/append.hpp>
  16. #include <boost/asio/coroutine.hpp>
  17. #include <boost/assert.hpp>
  18. #include <boost/make_shared.hpp>
  19. #include <boost/core/exchange.hpp>
  20. #include <cstdlib>
  21. #include <type_traits>
  22. #include <utility>
  23. namespace boost {
  24. namespace beast {
  25. //------------------------------------------------------------------------------
  26. template<class Protocol, class Executor, class RatePolicy>
  27. template<class... Args>
  28. basic_stream<Protocol, Executor, RatePolicy>::
  29. impl_type::
  30. impl_type(std::false_type, Args&&... args)
  31. : socket(std::forward<Args>(args)...)
  32. , read(ex())
  33. , write(ex())
  34. , timer(ex())
  35. {
  36. reset();
  37. }
  38. template<class Protocol, class Executor, class RatePolicy>
  39. template<class RatePolicy_, class... Args>
  40. basic_stream<Protocol, Executor, RatePolicy>::
  41. impl_type::
  42. impl_type(std::true_type,
  43. RatePolicy_&& policy, Args&&... args)
  44. : boost::empty_value<RatePolicy>(
  45. boost::empty_init_t{},
  46. std::forward<RatePolicy_>(policy))
  47. , socket(std::forward<Args>(args)...)
  48. , read(ex())
  49. , write(ex())
  50. , timer(ex())
  51. {
  52. reset();
  53. }
  54. template<class Protocol, class Executor, class RatePolicy>
  55. template<class Executor2>
  56. void
  57. basic_stream<Protocol, Executor, RatePolicy>::
  58. impl_type::
  59. on_timer(Executor2 const& ex2)
  60. {
  61. BOOST_ASSERT(waiting > 0);
  62. // the last waiter starts the new slice
  63. if(--waiting > 0)
  64. return;
  65. // update the expiration time
  66. BOOST_VERIFY(timer.expires_after(
  67. std::chrono::seconds(1)) == 0);
  68. rate_policy_access::on_timer(policy());
  69. struct handler : boost::empty_value<Executor2>
  70. {
  71. boost::weak_ptr<impl_type> wp;
  72. using executor_type = Executor2;
  73. executor_type
  74. get_executor() const noexcept
  75. {
  76. return this->get();
  77. }
  78. handler(
  79. Executor2 const& ex2,
  80. boost::shared_ptr<impl_type> const& sp)
  81. : boost::empty_value<Executor2>(
  82. boost::empty_init_t{}, ex2)
  83. , wp(sp)
  84. {
  85. }
  86. void
  87. operator()(error_code ec)
  88. {
  89. auto sp = wp.lock();
  90. if(! sp)
  91. return;
  92. if(ec == net::error::operation_aborted)
  93. return;
  94. BOOST_ASSERT(! ec);
  95. if(ec)
  96. return;
  97. sp->on_timer(this->get());
  98. }
  99. };
  100. // wait on the timer again
  101. ++waiting;
  102. timer.async_wait(handler(ex2, this->shared_from_this()));
  103. }
  104. template<class Protocol, class Executor, class RatePolicy>
  105. void
  106. basic_stream<Protocol, Executor, RatePolicy>::
  107. impl_type::
  108. reset()
  109. {
  110. // If assert goes off, it means that there are
  111. // already read or write (or connect) operations
  112. // outstanding, so there is nothing to apply
  113. // the expiration time to!
  114. //
  115. BOOST_ASSERT(! read.pending || ! write.pending);
  116. if(! read.pending)
  117. BOOST_VERIFY(
  118. read.timer.expires_at(never()) == 0);
  119. if(! write.pending)
  120. BOOST_VERIFY(
  121. write.timer.expires_at(never()) == 0);
  122. }
  123. template<class Protocol, class Executor, class RatePolicy>
  124. void
  125. basic_stream<Protocol, Executor, RatePolicy>::
  126. impl_type::
  127. close() noexcept
  128. {
  129. {
  130. error_code ec;
  131. socket.close(ec);
  132. }
  133. #if !defined(BOOST_NO_EXCEPTIONS)
  134. try
  135. {
  136. timer.cancel();
  137. }
  138. catch(...)
  139. {
  140. }
  141. #else
  142. timer.cancel();
  143. #endif
  144. }
  145. //------------------------------------------------------------------------------
  146. template<class Protocol, class Executor, class RatePolicy>
  147. template<class Executor2>
  148. struct basic_stream<Protocol, Executor, RatePolicy>::
  149. timeout_handler
  150. {
  151. using executor_type = Executor2;
  152. op_state& state;
  153. boost::weak_ptr<impl_type> wp;
  154. tick_type tick;
  155. executor_type ex;
  156. executor_type get_executor() const noexcept
  157. {
  158. return ex;
  159. }
  160. void
  161. operator()(error_code ec)
  162. {
  163. // timer canceled
  164. if(ec == net::error::operation_aborted)
  165. return;
  166. BOOST_ASSERT(! ec);
  167. auto sp = wp.lock();
  168. // stream destroyed
  169. if(! sp)
  170. return;
  171. // stale timer
  172. if(tick < state.tick)
  173. return;
  174. BOOST_ASSERT(tick == state.tick);
  175. // timeout
  176. BOOST_ASSERT(! state.timeout);
  177. sp->close();
  178. state.timeout = true;
  179. }
  180. };
  181. //------------------------------------------------------------------------------
  182. template<class Protocol, class Executor, class RatePolicy>
  183. struct basic_stream<Protocol, Executor, RatePolicy>::ops
  184. {
  185. template<bool isRead, class Buffers, class Handler>
  186. class transfer_op
  187. : public async_base<Handler, Executor>
  188. , public boost::asio::coroutine
  189. {
  190. boost::shared_ptr<impl_type> impl_;
  191. pending_guard pg_;
  192. Buffers b_;
  193. using is_read = std::integral_constant<bool, isRead>;
  194. op_state&
  195. state()
  196. {
  197. if (isRead)
  198. return impl_->read;
  199. else
  200. return impl_->write;
  201. }
  202. std::size_t
  203. available_bytes()
  204. {
  205. if (isRead)
  206. return rate_policy_access::
  207. available_read_bytes(impl_->policy());
  208. else
  209. return rate_policy_access::
  210. available_write_bytes(impl_->policy());
  211. }
  212. void
  213. transfer_bytes(std::size_t n)
  214. {
  215. if (isRead)
  216. rate_policy_access::
  217. transfer_read_bytes(impl_->policy(), n);
  218. else
  219. rate_policy_access::
  220. transfer_write_bytes(impl_->policy(), n);
  221. }
  222. void
  223. async_perform(
  224. std::size_t amount, std::true_type)
  225. {
  226. impl_->socket.async_read_some(
  227. beast::buffers_prefix(amount, b_),
  228. std::move(*this));
  229. }
  230. void
  231. async_perform(
  232. std::size_t amount, std::false_type)
  233. {
  234. impl_->socket.async_write_some(
  235. beast::buffers_prefix(amount, b_),
  236. std::move(*this));
  237. }
  238. static bool never_pending_;
  239. public:
  240. template<class Handler_>
  241. transfer_op(
  242. Handler_&& h,
  243. basic_stream& s,
  244. Buffers const& b)
  245. : async_base<Handler, Executor>(
  246. std::forward<Handler_>(h), s.get_executor())
  247. , impl_(s.impl_)
  248. , pg_()
  249. , b_(b)
  250. {
  251. this->set_allowed_cancellation(net::cancellation_type::all);
  252. if (buffer_bytes(b_) == 0 && state().pending)
  253. {
  254. // Workaround:
  255. // Corner case discovered in https://github.com/boostorg/beast/issues/2065
  256. // Enclosing SSL stream wishes to complete a 0-length write early by
  257. // executing a 0-length read against the underlying stream.
  258. // This can occur even if an existing async_read is in progress.
  259. // In this specific case, we will complete the async op with no error
  260. // in order to prevent assertions and/or internal corruption of the basic_stream
  261. this->complete(false, error_code(), std::size_t{0});
  262. }
  263. else
  264. {
  265. pg_.assign(state().pending);
  266. (*this)({});
  267. }
  268. }
  269. void
  270. operator()(
  271. error_code ec,
  272. std::size_t bytes_transferred = 0)
  273. {
  274. BOOST_ASIO_CORO_REENTER(*this)
  275. {
  276. // apply the timeout manually, otherwise
  277. // behavior varies across platforms.
  278. if(state().timer.expiry() <= now())
  279. {
  280. BOOST_ASIO_CORO_YIELD
  281. {
  282. BOOST_ASIO_HANDLER_LOCATION((
  283. __FILE__, __LINE__,
  284. (isRead ? "basic_stream::async_read_some"
  285. : "basic_stream::async_write_some")));
  286. net::dispatch(this->get_immediate_executor(),
  287. net::append(std::move(*this), ec, 0));
  288. }
  289. impl_->close();
  290. BOOST_BEAST_ASSIGN_EC(ec, beast::error::timeout);
  291. goto upcall;
  292. }
  293. // handle empty buffers
  294. if(detail::buffers_empty(b_))
  295. {
  296. // make sure we perform the no-op
  297. BOOST_ASIO_CORO_YIELD
  298. {
  299. BOOST_ASIO_HANDLER_LOCATION((
  300. __FILE__, __LINE__,
  301. (isRead ? "basic_stream::async_read_some"
  302. : "basic_stream::async_write_some")));
  303. async_perform(0, is_read{});
  304. }
  305. goto upcall;
  306. }
  307. // if a timeout is active, wait on the timer
  308. if(state().timer.expiry() != never())
  309. {
  310. BOOST_ASIO_HANDLER_LOCATION((
  311. __FILE__, __LINE__,
  312. (isRead ? "basic_stream::async_read_some"
  313. : "basic_stream::async_write_some")));
  314. state().timer.async_wait(
  315. timeout_handler<decltype(this->get_executor())>{
  316. state(),
  317. impl_,
  318. state().tick,
  319. this->get_executor()});
  320. }
  321. // check rate limit, maybe wait
  322. std::size_t amount;
  323. amount = available_bytes();
  324. if(amount == 0)
  325. {
  326. ++impl_->waiting;
  327. BOOST_ASIO_CORO_YIELD
  328. {
  329. BOOST_ASIO_HANDLER_LOCATION((
  330. __FILE__, __LINE__,
  331. (isRead ? "basic_stream::async_read_some"
  332. : "basic_stream::async_write_some")));
  333. impl_->timer.async_wait(std::move(*this));
  334. }
  335. if(ec)
  336. {
  337. // socket was closed, or a timeout
  338. BOOST_ASSERT(ec ==
  339. net::error::operation_aborted);
  340. // timeout handler invoked?
  341. if(state().timeout)
  342. {
  343. // yes, socket already closed
  344. BOOST_BEAST_ASSIGN_EC(ec, beast::error::timeout);
  345. state().timeout = false;
  346. }
  347. goto upcall;
  348. }
  349. impl_->on_timer(this->get_executor());
  350. // Allow at least one byte, otherwise
  351. // bytes_transferred could be 0.
  352. amount = std::max<std::size_t>(
  353. available_bytes(), 1);
  354. }
  355. BOOST_ASIO_CORO_YIELD
  356. {
  357. BOOST_ASIO_HANDLER_LOCATION((
  358. __FILE__, __LINE__,
  359. (isRead ? "basic_stream::async_read_some"
  360. : "basic_stream::async_write_some")));
  361. async_perform(amount, is_read{});
  362. }
  363. if(state().timer.expiry() != never())
  364. {
  365. ++state().tick;
  366. // try cancelling timer
  367. auto const n =
  368. state().timer.cancel();
  369. if(n == 0)
  370. {
  371. // timeout handler invoked?
  372. if(state().timeout)
  373. {
  374. // yes, socket already closed
  375. BOOST_BEAST_ASSIGN_EC(ec, beast::error::timeout);
  376. state().timeout = false;
  377. }
  378. }
  379. else
  380. {
  381. BOOST_ASSERT(n == 1);
  382. BOOST_ASSERT(! state().timeout);
  383. }
  384. }
  385. upcall:
  386. pg_.reset();
  387. transfer_bytes(bytes_transferred);
  388. this->complete_now(ec, bytes_transferred);
  389. }
  390. }
  391. };
  392. template<class Handler>
  393. class connect_op
  394. : public async_base<Handler, Executor>
  395. {
  396. boost::shared_ptr<impl_type> impl_;
  397. pending_guard pg0_;
  398. pending_guard pg1_;
  399. op_state&
  400. state() noexcept
  401. {
  402. return impl_->write;
  403. }
  404. public:
  405. template<class Handler_>
  406. connect_op(
  407. Handler_&& h,
  408. basic_stream& s,
  409. endpoint_type ep)
  410. : async_base<Handler, Executor>(
  411. std::forward<Handler_>(h), s.get_executor())
  412. , impl_(s.impl_)
  413. , pg0_(impl_->read.pending)
  414. , pg1_(impl_->write.pending)
  415. {
  416. this->set_allowed_cancellation(net::cancellation_type::all);
  417. if(state().timer.expiry() != stream_base::never())
  418. {
  419. BOOST_ASIO_HANDLER_LOCATION((
  420. __FILE__, __LINE__,
  421. "basic_stream::async_connect"));
  422. impl_->write.timer.async_wait(
  423. timeout_handler<decltype(this->get_executor())>{
  424. state(),
  425. impl_,
  426. state().tick,
  427. this->get_executor()});
  428. }
  429. BOOST_ASIO_HANDLER_LOCATION((
  430. __FILE__, __LINE__,
  431. "basic_stream::async_connect"));
  432. impl_->socket.async_connect(
  433. ep, std::move(*this));
  434. // *this is now moved-from
  435. }
  436. template<
  437. class Endpoints, class Condition,
  438. class Handler_>
  439. connect_op(
  440. Handler_&& h,
  441. basic_stream& s,
  442. Endpoints const& eps,
  443. Condition const& cond)
  444. : async_base<Handler, Executor>(
  445. std::forward<Handler_>(h), s.get_executor())
  446. , impl_(s.impl_)
  447. , pg0_(impl_->read.pending)
  448. , pg1_(impl_->write.pending)
  449. {
  450. this->set_allowed_cancellation(net::cancellation_type::all);
  451. if(state().timer.expiry() != stream_base::never())
  452. {
  453. BOOST_ASIO_HANDLER_LOCATION((
  454. __FILE__, __LINE__,
  455. "basic_stream::async_connect"));
  456. impl_->write.timer.async_wait(
  457. timeout_handler<decltype(this->get_executor())>{
  458. state(),
  459. impl_,
  460. state().tick,
  461. this->get_executor()});
  462. }
  463. BOOST_ASIO_HANDLER_LOCATION((
  464. __FILE__, __LINE__,
  465. "basic_stream::async_connect"));
  466. net::async_connect(impl_->socket,
  467. eps, cond, std::move(*this));
  468. // *this is now moved-from
  469. }
  470. template<
  471. class Iterator, class Condition,
  472. class Handler_>
  473. connect_op(
  474. Handler_&& h,
  475. basic_stream& s,
  476. Iterator begin, Iterator end,
  477. Condition const& cond)
  478. : async_base<Handler, Executor>(
  479. std::forward<Handler_>(h), s.get_executor())
  480. , impl_(s.impl_)
  481. , pg0_(impl_->read.pending)
  482. , pg1_(impl_->write.pending)
  483. {
  484. this->set_allowed_cancellation(net::cancellation_type::all);
  485. if(state().timer.expiry() != stream_base::never())
  486. {
  487. BOOST_ASIO_HANDLER_LOCATION((
  488. __FILE__, __LINE__,
  489. "basic_stream::async_connect"));
  490. impl_->write.timer.async_wait(
  491. timeout_handler<decltype(this->get_executor())>{
  492. state(),
  493. impl_,
  494. state().tick,
  495. this->get_executor()});
  496. }
  497. BOOST_ASIO_HANDLER_LOCATION((
  498. __FILE__, __LINE__,
  499. "basic_stream::async_connect"));
  500. net::async_connect(impl_->socket,
  501. begin, end, cond, std::move(*this));
  502. // *this is now moved-from
  503. }
  504. template<class... Args>
  505. void
  506. operator()(error_code ec, Args&&... args)
  507. {
  508. if(state().timer.expiry() != stream_base::never())
  509. {
  510. ++state().tick;
  511. // try cancelling timer
  512. auto const n =
  513. impl_->write.timer.cancel();
  514. if(n == 0)
  515. {
  516. // timeout handler invoked?
  517. if(state().timeout)
  518. {
  519. // yes, socket already closed
  520. BOOST_BEAST_ASSIGN_EC(ec, beast::error::timeout);
  521. state().timeout = false;
  522. }
  523. }
  524. else
  525. {
  526. BOOST_ASSERT(n == 1);
  527. BOOST_ASSERT(! state().timeout);
  528. }
  529. }
  530. pg0_.reset();
  531. pg1_.reset();
  532. this->complete_now(ec, std::forward<Args>(args)...);
  533. }
  534. };
  535. struct run_read_op
  536. {
  537. basic_stream* self;
  538. using executor_type = typename basic_stream::executor_type;
  539. executor_type
  540. get_executor() const noexcept
  541. {
  542. return self->get_executor();
  543. }
  544. template<class ReadHandler, class Buffers>
  545. void
  546. operator()(
  547. ReadHandler&& h,
  548. Buffers const& b)
  549. {
  550. // If you get an error on the following line it means
  551. // that your handler does not meet the documented type
  552. // requirements for the handler.
  553. static_assert(
  554. detail::is_invocable<ReadHandler,
  555. void(error_code, std::size_t)>::value,
  556. "ReadHandler type requirements not met");
  557. transfer_op<
  558. true,
  559. Buffers,
  560. typename std::decay<ReadHandler>::type>(
  561. std::forward<ReadHandler>(h), *self, b);
  562. }
  563. };
  564. struct run_write_op
  565. {
  566. basic_stream* self;
  567. using executor_type = typename basic_stream::executor_type;
  568. executor_type
  569. get_executor() const noexcept
  570. {
  571. return self->get_executor();
  572. }
  573. template<class WriteHandler, class Buffers>
  574. void
  575. operator()(
  576. WriteHandler&& h,
  577. Buffers const& b)
  578. {
  579. // If you get an error on the following line it means
  580. // that your handler does not meet the documented type
  581. // requirements for the handler.
  582. static_assert(
  583. detail::is_invocable<WriteHandler,
  584. void(error_code, std::size_t)>::value,
  585. "WriteHandler type requirements not met");
  586. transfer_op<
  587. false,
  588. Buffers,
  589. typename std::decay<WriteHandler>::type>(
  590. std::forward<WriteHandler>(h), *self, b);
  591. }
  592. };
  593. struct run_connect_op
  594. {
  595. basic_stream* self;
  596. using executor_type = typename basic_stream::executor_type;
  597. executor_type
  598. get_executor() const noexcept
  599. {
  600. return self->get_executor();
  601. }
  602. template<class ConnectHandler>
  603. void
  604. operator()(
  605. ConnectHandler&& h,
  606. endpoint_type const& ep)
  607. {
  608. // If you get an error on the following line it means
  609. // that your handler does not meet the documented type
  610. // requirements for the handler.
  611. static_assert(
  612. detail::is_invocable<ConnectHandler,
  613. void(error_code)>::value,
  614. "ConnectHandler type requirements not met");
  615. connect_op<typename std::decay<ConnectHandler>::type>(
  616. std::forward<ConnectHandler>(h), *self, ep);
  617. }
  618. };
  619. struct run_connect_range_op
  620. {
  621. basic_stream* self;
  622. using executor_type = typename basic_stream::executor_type;
  623. executor_type
  624. get_executor() const noexcept
  625. {
  626. return self->get_executor();
  627. }
  628. template<
  629. class RangeConnectHandler,
  630. class EndpointSequence,
  631. class Condition>
  632. void
  633. operator()(
  634. RangeConnectHandler&& h,
  635. EndpointSequence const& eps,
  636. Condition const& cond)
  637. {
  638. // If you get an error on the following line it means
  639. // that your handler does not meet the documented type
  640. // requirements for the handler.
  641. static_assert(
  642. detail::is_invocable<RangeConnectHandler,
  643. void(error_code, typename Protocol::endpoint)>::value,
  644. "RangeConnectHandler type requirements not met");
  645. connect_op<typename std::decay<RangeConnectHandler>::type>(
  646. std::forward<RangeConnectHandler>(h), *self, eps, cond);
  647. }
  648. };
  649. struct run_connect_iter_op
  650. {
  651. basic_stream* self;
  652. using executor_type = typename basic_stream::executor_type;
  653. executor_type
  654. get_executor() const noexcept
  655. {
  656. return self->get_executor();
  657. }
  658. template<
  659. class IteratorConnectHandler,
  660. class Iterator,
  661. class Condition>
  662. void
  663. operator()(
  664. IteratorConnectHandler&& h,
  665. Iterator begin, Iterator end,
  666. Condition const& cond)
  667. {
  668. // If you get an error on the following line it means
  669. // that your handler does not meet the documented type
  670. // requirements for the handler.
  671. static_assert(
  672. detail::is_invocable<IteratorConnectHandler,
  673. void(error_code, Iterator)>::value,
  674. "IteratorConnectHandler type requirements not met");
  675. connect_op<typename std::decay<IteratorConnectHandler>::type>(
  676. std::forward<IteratorConnectHandler>(h), *self, begin, end, cond);
  677. }
  678. };
  679. };
  680. //------------------------------------------------------------------------------
  681. template<class Protocol, class Executor, class RatePolicy>
  682. basic_stream<Protocol, Executor, RatePolicy>::
  683. ~basic_stream()
  684. {
  685. // the shared object can outlive *this,
  686. // cancel any operations so the shared
  687. // object is destroyed as soon as possible.
  688. impl_->close();
  689. }
  690. template<class Protocol, class Executor, class RatePolicy>
  691. template<class Arg0, class... Args, class>
  692. basic_stream<Protocol, Executor, RatePolicy>::
  693. basic_stream(Arg0&& arg0, Args&&... args)
  694. : impl_(boost::make_shared<impl_type>(
  695. std::false_type{},
  696. std::forward<Arg0>(arg0),
  697. std::forward<Args>(args)...))
  698. {
  699. }
  700. template<class Protocol, class Executor, class RatePolicy>
  701. template<class RatePolicy_, class Arg0, class... Args, class>
  702. basic_stream<Protocol, Executor, RatePolicy>::
  703. basic_stream(
  704. RatePolicy_&& policy, Arg0&& arg0, Args&&... args)
  705. : impl_(boost::make_shared<impl_type>(
  706. std::true_type{},
  707. std::forward<RatePolicy_>(policy),
  708. std::forward<Arg0>(arg0),
  709. std::forward<Args>(args)...))
  710. {
  711. }
  712. template<class Protocol, class Executor, class RatePolicy>
  713. basic_stream<Protocol, Executor, RatePolicy>::
  714. basic_stream(basic_stream&& other)
  715. : impl_(boost::make_shared<impl_type>(
  716. std::move(*other.impl_)))
  717. {
  718. // Explainer: Asio's sockets provide the guarantee that a moved-from socket
  719. // will be in a state as-if newly created. i.e.:
  720. // * having the same (valid) executor
  721. // * the socket shall not be open
  722. // We provide the same guarantee by moving the impl rather than the pointer
  723. // controlling its lifetime.
  724. }
  725. template<class Protocol, class Executor, class RatePolicy>
  726. template<class Executor_>
  727. basic_stream<Protocol, Executor, RatePolicy>::
  728. basic_stream(basic_stream<Protocol, Executor_, RatePolicy> && other)
  729. : impl_(boost::make_shared<impl_type>(std::false_type{}, std::move(other.impl_->socket)))
  730. {
  731. }
  732. //------------------------------------------------------------------------------
  733. template<class Protocol, class Executor, class RatePolicy>
  734. auto
  735. basic_stream<Protocol, Executor, RatePolicy>::
  736. release_socket() ->
  737. socket_type
  738. {
  739. this->cancel();
  740. return std::move(impl_->socket);
  741. }
  742. template<class Protocol, class Executor, class RatePolicy>
  743. void
  744. basic_stream<Protocol, Executor, RatePolicy>::
  745. expires_after(net::steady_timer::duration expiry_time)
  746. {
  747. // If assert goes off, it means that there are
  748. // already read or write (or connect) operations
  749. // outstanding, so there is nothing to apply
  750. // the expiration time to!
  751. //
  752. BOOST_ASSERT(
  753. ! impl_->read.pending ||
  754. ! impl_->write.pending);
  755. if(! impl_->read.pending)
  756. BOOST_VERIFY(
  757. impl_->read.timer.expires_after(
  758. expiry_time) == 0);
  759. if(! impl_->write.pending)
  760. BOOST_VERIFY(
  761. impl_->write.timer.expires_after(
  762. expiry_time) == 0);
  763. }
  764. template<class Protocol, class Executor, class RatePolicy>
  765. void
  766. basic_stream<Protocol, Executor, RatePolicy>::
  767. expires_at(
  768. net::steady_timer::time_point expiry_time)
  769. {
  770. // If assert goes off, it means that there are
  771. // already read or write (or connect) operations
  772. // outstanding, so there is nothing to apply
  773. // the expiration time to!
  774. //
  775. BOOST_ASSERT(
  776. ! impl_->read.pending ||
  777. ! impl_->write.pending);
  778. if(! impl_->read.pending)
  779. BOOST_VERIFY(
  780. impl_->read.timer.expires_at(
  781. expiry_time) == 0);
  782. if(! impl_->write.pending)
  783. BOOST_VERIFY(
  784. impl_->write.timer.expires_at(
  785. expiry_time) == 0);
  786. }
  787. template<class Protocol, class Executor, class RatePolicy>
  788. void
  789. basic_stream<Protocol, Executor, RatePolicy>::
  790. expires_never()
  791. {
  792. impl_->reset();
  793. }
  794. template<class Protocol, class Executor, class RatePolicy>
  795. void
  796. basic_stream<Protocol, Executor, RatePolicy>::
  797. cancel()
  798. {
  799. error_code ec;
  800. impl_->socket.cancel(ec);
  801. impl_->timer.cancel();
  802. }
  803. template<class Protocol, class Executor, class RatePolicy>
  804. void
  805. basic_stream<Protocol, Executor, RatePolicy>::
  806. close()
  807. {
  808. impl_->close();
  809. }
  810. //------------------------------------------------------------------------------
  811. template<class Protocol, class Executor, class RatePolicy>
  812. template<BOOST_BEAST_ASYNC_TPARAM1 ConnectHandler>
  813. BOOST_BEAST_ASYNC_RESULT1(ConnectHandler)
  814. basic_stream<Protocol, Executor, RatePolicy>::
  815. async_connect(
  816. endpoint_type const& ep,
  817. ConnectHandler&& handler)
  818. {
  819. return net::async_initiate<
  820. ConnectHandler,
  821. void(error_code)>(
  822. typename ops::run_connect_op{this},
  823. handler,
  824. ep);
  825. }
  826. template<class Protocol, class Executor, class RatePolicy>
  827. template<
  828. class EndpointSequence,
  829. BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, typename Protocol::endpoint)) RangeConnectHandler,
  830. class,
  831. class>
  832. BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(RangeConnectHandler,void(error_code, typename Protocol::endpoint))
  833. basic_stream<Protocol, Executor, RatePolicy>::
  834. async_connect(
  835. EndpointSequence const& endpoints,
  836. RangeConnectHandler&& handler)
  837. {
  838. return net::async_initiate<
  839. RangeConnectHandler,
  840. void(error_code, typename Protocol::endpoint)>(
  841. typename ops::run_connect_range_op{this},
  842. handler,
  843. endpoints,
  844. detail::any_endpoint{});
  845. }
  846. template<class Protocol, class Executor, class RatePolicy>
  847. template<
  848. class EndpointSequence,
  849. class ConnectCondition,
  850. BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, typename Protocol::endpoint)) RangeConnectHandler,
  851. class,
  852. class>
  853. BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(RangeConnectHandler,void (error_code, typename Protocol::endpoint))
  854. basic_stream<Protocol, Executor, RatePolicy>::
  855. async_connect(
  856. EndpointSequence const& endpoints,
  857. ConnectCondition connect_condition,
  858. RangeConnectHandler&& handler)
  859. {
  860. return net::async_initiate<
  861. RangeConnectHandler,
  862. void(error_code, typename Protocol::endpoint)>(
  863. typename ops::run_connect_range_op{this},
  864. handler,
  865. endpoints,
  866. connect_condition);
  867. }
  868. template<class Protocol, class Executor, class RatePolicy>
  869. template<
  870. class Iterator,
  871. BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, Iterator)) IteratorConnectHandler,
  872. class>
  873. BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(IteratorConnectHandler,void (error_code, Iterator))
  874. basic_stream<Protocol, Executor, RatePolicy>::
  875. async_connect(
  876. Iterator begin, Iterator end,
  877. IteratorConnectHandler&& handler)
  878. {
  879. return net::async_initiate<
  880. IteratorConnectHandler,
  881. void(error_code, Iterator)>(
  882. typename ops::run_connect_iter_op{this},
  883. handler,
  884. begin, end,
  885. detail::any_endpoint{});
  886. }
  887. template<class Protocol, class Executor, class RatePolicy>
  888. template<
  889. class Iterator,
  890. class ConnectCondition,
  891. BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, Iterator)) IteratorConnectHandler,
  892. class>
  893. BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(IteratorConnectHandler,void (error_code, Iterator))
  894. basic_stream<Protocol, Executor, RatePolicy>::
  895. async_connect(
  896. Iterator begin, Iterator end,
  897. ConnectCondition connect_condition,
  898. IteratorConnectHandler&& handler)
  899. {
  900. return net::async_initiate<
  901. IteratorConnectHandler,
  902. void(error_code, Iterator)>(
  903. typename ops::run_connect_iter_op{this},
  904. handler,
  905. begin, end,
  906. connect_condition);
  907. }
  908. //------------------------------------------------------------------------------
  909. template<class Protocol, class Executor, class RatePolicy>
  910. template<class MutableBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
  911. BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
  912. basic_stream<Protocol, Executor, RatePolicy>::
  913. async_read_some(
  914. MutableBufferSequence const& buffers,
  915. ReadHandler&& handler)
  916. {
  917. static_assert(net::is_mutable_buffer_sequence<
  918. MutableBufferSequence>::value,
  919. "MutableBufferSequence type requirements not met");
  920. return net::async_initiate<
  921. ReadHandler,
  922. void(error_code, std::size_t)>(
  923. typename ops::run_read_op{this},
  924. handler,
  925. buffers);
  926. }
  927. template<class Protocol, class Executor, class RatePolicy>
  928. template<class ConstBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 WriteHandler>
  929. BOOST_BEAST_ASYNC_RESULT2(WriteHandler)
  930. basic_stream<Protocol, Executor, RatePolicy>::
  931. async_write_some(
  932. ConstBufferSequence const& buffers,
  933. WriteHandler&& handler)
  934. {
  935. static_assert(net::is_const_buffer_sequence<
  936. ConstBufferSequence>::value,
  937. "ConstBufferSequence type requirements not met");
  938. return net::async_initiate<
  939. WriteHandler,
  940. void(error_code, std::size_t)>(
  941. typename ops::run_write_op{this},
  942. handler,
  943. buffers);
  944. }
  945. //------------------------------------------------------------------------------
  946. //
  947. // Customization points
  948. //
  949. #if ! BOOST_BEAST_DOXYGEN
  950. template<
  951. class Protocol, class Executor, class RatePolicy>
  952. void
  953. beast_close_socket(
  954. basic_stream<Protocol, Executor, RatePolicy>& stream)
  955. {
  956. error_code ec;
  957. stream.socket().close(ec);
  958. }
  959. template<
  960. class Protocol, class Executor, class RatePolicy>
  961. void
  962. teardown(
  963. role_type role,
  964. basic_stream<Protocol, Executor, RatePolicy>& stream,
  965. error_code& ec)
  966. {
  967. using beast::websocket::teardown;
  968. teardown(role, stream.socket(), ec);
  969. }
  970. template<
  971. class Protocol, class Executor, class RatePolicy,
  972. class TeardownHandler>
  973. void
  974. async_teardown(
  975. role_type role,
  976. basic_stream<Protocol, Executor, RatePolicy>& stream,
  977. TeardownHandler&& handler)
  978. {
  979. using beast::websocket::async_teardown;
  980. async_teardown(role, stream.socket(),
  981. std::forward<TeardownHandler>(handler));
  982. }
  983. #endif
  984. } // beast
  985. } // boost
  986. #endif