stream.hpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515
  1. //
  2. // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // Official repository: https://github.com/boostorg/beast
  8. //
  9. #ifndef BOOST_BEAST_TEST_IMPL_STREAM_HPP
  10. #define BOOST_BEAST_TEST_IMPL_STREAM_HPP
  11. #include <boost/beast/core/buffer_traits.hpp>
  12. #include <boost/beast/core/detail/service_base.hpp>
  13. #include <boost/beast/core/detail/is_invocable.hpp>
  14. #include <boost/asio/any_io_executor.hpp>
  15. #include <boost/asio/append.hpp>
  16. #include <boost/asio/associated_cancellation_slot.hpp>
  17. #include <boost/asio/dispatch.hpp>
  18. #include <boost/asio/post.hpp>
  19. #include <mutex>
  20. #include <stdexcept>
  21. #include <vector>
  22. namespace boost {
  23. namespace beast {
  24. namespace test {
  25. namespace detail
  26. {
  27. template<class To>
  28. struct extract_executor_op
  29. {
  30. To operator()(net::any_io_executor& ex) const
  31. {
  32. assert(ex.template target<To>());
  33. return *ex.template target<To>();
  34. }
  35. };
  36. template<>
  37. struct extract_executor_op<net::any_io_executor>
  38. {
  39. net::any_io_executor operator()(net::any_io_executor& ex) const
  40. {
  41. return ex;
  42. }
  43. };
  44. } // detail
  45. template<class Executor>
  46. template<class Handler, class Buffers>
  47. class basic_stream<Executor>::read_op : public detail::stream_read_op_base
  48. {
  49. struct lambda
  50. {
  51. Handler h_;
  52. boost::weak_ptr<detail::stream_state> wp_;
  53. Buffers b_;
  54. #if defined(BOOST_ASIO_NO_TS_EXECUTORS)
  55. net::any_io_executor wg2_;
  56. #else // defined(BOOST_ASIO_NO_TS_EXECUTORS)
  57. net::executor_work_guard<
  58. net::associated_executor_t<Handler, net::any_io_executor>> wg2_;
  59. #endif // defined(BOOST_ASIO_NO_TS_EXECUTORS)
  60. lambda(lambda&&) = default;
  61. lambda(lambda const&) = default;
  62. template<class Handler_>
  63. lambda(
  64. Handler_&& h,
  65. boost::shared_ptr<detail::stream_state> const& s,
  66. Buffers const& b)
  67. : h_(std::forward<Handler_>(h))
  68. , wp_(s)
  69. , b_(b)
  70. #if defined(BOOST_ASIO_NO_TS_EXECUTORS)
  71. , wg2_(net::prefer(
  72. net::get_associated_executor(
  73. h_, s->exec),
  74. net::execution::outstanding_work.tracked))
  75. #else // defined(BOOST_ASIO_NO_TS_EXECUTORS)
  76. , wg2_(net::get_associated_executor(
  77. h_, s->exec))
  78. #endif // defined(BOOST_ASIO_NO_TS_EXECUTORS)
  79. {
  80. }
  81. using allocator_type = net::associated_allocator_t<Handler>;
  82. allocator_type get_allocator() const noexcept
  83. {
  84. return net::get_associated_allocator(h_);
  85. }
  86. using cancellation_slot_type =
  87. net::associated_cancellation_slot_t<Handler>;
  88. cancellation_slot_type
  89. get_cancellation_slot() const noexcept
  90. {
  91. return net::get_associated_cancellation_slot(h_,
  92. net::cancellation_slot());
  93. }
  94. void
  95. operator()(error_code ec)
  96. {
  97. std::size_t bytes_transferred = 0;
  98. auto sp = wp_.lock();
  99. if(! sp)
  100. {
  101. BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
  102. }
  103. if(! ec)
  104. {
  105. std::lock_guard<std::mutex> lock(sp->m);
  106. BOOST_ASSERT(! sp->op);
  107. if(sp->b.size() > 0)
  108. {
  109. bytes_transferred =
  110. net::buffer_copy(
  111. b_, sp->b.data(), sp->read_max);
  112. sp->b.consume(bytes_transferred);
  113. sp->nread_bytes += bytes_transferred;
  114. }
  115. else if (buffer_bytes(b_) > 0)
  116. {
  117. BOOST_BEAST_ASSIGN_EC(ec, net::error::eof);
  118. }
  119. }
  120. #if defined(BOOST_ASIO_NO_TS_EXECUTORS)
  121. net::dispatch(wg2_,
  122. net::append(std::move(h_), ec, bytes_transferred));
  123. wg2_ = net::any_io_executor(); // probably unnecessary
  124. #else // defined(BOOST_ASIO_NO_TS_EXECUTORS)
  125. net::dispatch(wg2_.get_executor(),
  126. net::append(std::move(h_), ec, bytes_transferred));
  127. wg2_.reset();
  128. #endif // defined(BOOST_ASIO_NO_TS_EXECUTORS)
  129. }
  130. };
  131. lambda fn_;
  132. #if defined(BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT)
  133. net::executor_work_guard<net::any_io_executor> wg1_;
  134. #else
  135. net::any_io_executor wg1_;
  136. #endif
  137. public:
  138. template<class Handler_>
  139. read_op(
  140. Handler_&& h,
  141. boost::shared_ptr<detail::stream_state> const& s,
  142. Buffers const& b)
  143. : fn_(std::forward<Handler_>(h), s, b)
  144. #if defined(BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT)
  145. , wg1_(s->exec)
  146. #else
  147. , wg1_(net::prefer(s->exec,
  148. net::execution::outstanding_work.tracked))
  149. #endif
  150. {
  151. }
  152. void
  153. operator()(error_code ec) override
  154. {
  155. #if defined(BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT)
  156. net::post(wg1_.get_executor(), net::append(std::move(fn_), ec));
  157. wg1_.reset();
  158. #else
  159. net::post(wg1_, net::append(std::move(fn_), ec));
  160. wg1_ = net::any_io_executor(); // probably unnecessary
  161. #endif
  162. }
  163. };
  164. template<class Executor>
  165. struct basic_stream<Executor>::run_read_op
  166. {
  167. boost::shared_ptr<detail::stream_state> const& in;
  168. using executor_type = typename basic_stream::executor_type;
  169. executor_type
  170. get_executor() const noexcept
  171. {
  172. return detail::extract_executor_op<Executor>()(in->exec);
  173. }
  174. template<
  175. class ReadHandler,
  176. class MutableBufferSequence>
  177. void
  178. operator()(
  179. ReadHandler&& h,
  180. MutableBufferSequence const& buffers)
  181. {
  182. // If you get an error on the following line it means
  183. // that your handler does not meet the documented type
  184. // requirements for the handler.
  185. static_assert(
  186. beast::detail::is_invocable<ReadHandler,
  187. void(error_code, std::size_t)>::value,
  188. "ReadHandler type requirements not met");
  189. initiate_read(
  190. in,
  191. std::unique_ptr<detail::stream_read_op_base>{
  192. new read_op<
  193. typename std::decay<ReadHandler>::type,
  194. MutableBufferSequence>(
  195. std::move(h),
  196. in,
  197. buffers)},
  198. buffer_bytes(buffers));
  199. }
  200. };
  201. template<class Executor>
  202. struct basic_stream<Executor>::run_write_op
  203. {
  204. boost::shared_ptr<detail::stream_state> const& in_;
  205. using executor_type = typename basic_stream::executor_type;
  206. executor_type
  207. get_executor() const noexcept
  208. {
  209. return detail::extract_executor_op<Executor>()(in_->exec);
  210. }
  211. template<
  212. class WriteHandler,
  213. class ConstBufferSequence>
  214. void
  215. operator()(
  216. WriteHandler&& h,
  217. boost::weak_ptr<detail::stream_state> out_,
  218. ConstBufferSequence const& buffers)
  219. {
  220. // If you get an error on the following line it means
  221. // that your handler does not meet the documented type
  222. // requirements for the handler.
  223. static_assert(
  224. beast::detail::is_invocable<WriteHandler,
  225. void(error_code, std::size_t)>::value,
  226. "WriteHandler type requirements not met");
  227. ++in_->nwrite;
  228. auto const upcall = [&](error_code ec, std::size_t n)
  229. {
  230. net::post(in_->exec, net::append(std::move(h), ec, n));
  231. };
  232. // test failure
  233. error_code ec;
  234. std::size_t n = 0;
  235. if(in_->fc && in_->fc->fail(ec))
  236. return upcall(ec, n);
  237. // A request to write 0 bytes to a stream is a no-op.
  238. if(buffer_bytes(buffers) == 0)
  239. return upcall(ec, n);
  240. // connection closed
  241. auto out = out_.lock();
  242. if(! out)
  243. return upcall(net::error::connection_reset, n);
  244. // copy buffers
  245. n = std::min<std::size_t>(
  246. buffer_bytes(buffers), in_->write_max);
  247. {
  248. std::lock_guard<std::mutex> lock(out->m);
  249. n = net::buffer_copy(out->b.prepare(n), buffers);
  250. out->b.commit(n);
  251. out->nwrite_bytes += n;
  252. out->notify_read();
  253. }
  254. BOOST_ASSERT(! ec);
  255. upcall(ec, n);
  256. }
  257. };
  258. //------------------------------------------------------------------------------
  259. template<class Executor>
  260. template<class MutableBufferSequence>
  261. std::size_t
  262. basic_stream<Executor>::
  263. read_some(MutableBufferSequence const& buffers)
  264. {
  265. static_assert(net::is_mutable_buffer_sequence<
  266. MutableBufferSequence>::value,
  267. "MutableBufferSequence type requirements not met");
  268. error_code ec;
  269. auto const n = read_some(buffers, ec);
  270. if(ec)
  271. BOOST_THROW_EXCEPTION(system_error{ec});
  272. return n;
  273. }
  274. template<class Executor>
  275. template<class MutableBufferSequence>
  276. std::size_t
  277. basic_stream<Executor>::
  278. read_some(MutableBufferSequence const& buffers,
  279. error_code& ec)
  280. {
  281. static_assert(net::is_mutable_buffer_sequence<
  282. MutableBufferSequence>::value,
  283. "MutableBufferSequence type requirements not met");
  284. ++in_->nread;
  285. // test failure
  286. if(in_->fc && in_->fc->fail(ec))
  287. return 0;
  288. // A request to read 0 bytes from a stream is a no-op.
  289. if(buffer_bytes(buffers) == 0)
  290. {
  291. ec = {};
  292. return 0;
  293. }
  294. std::unique_lock<std::mutex> lock{in_->m};
  295. BOOST_ASSERT(! in_->op);
  296. in_->cv.wait(lock,
  297. [&]()
  298. {
  299. return
  300. in_->b.size() > 0 ||
  301. in_->code != detail::stream_status::ok;
  302. });
  303. // deliver bytes before eof
  304. if(in_->b.size() > 0)
  305. {
  306. auto const n = net::buffer_copy(
  307. buffers, in_->b.data(), in_->read_max);
  308. in_->b.consume(n);
  309. in_->nread_bytes += n;
  310. return n;
  311. }
  312. // deliver error
  313. BOOST_ASSERT(in_->code != detail::stream_status::ok);
  314. BOOST_BEAST_ASSIGN_EC(ec, net::error::eof);
  315. return 0;
  316. }
  317. template<class Executor>
  318. template<class MutableBufferSequence,
  319. BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, std::size_t)) ReadHandler>
  320. BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(ReadHandler, void(error_code, std::size_t))
  321. basic_stream<Executor>::
  322. async_read_some(
  323. MutableBufferSequence const& buffers,
  324. ReadHandler&& handler)
  325. {
  326. static_assert(net::is_mutable_buffer_sequence<
  327. MutableBufferSequence>::value,
  328. "MutableBufferSequence type requirements not met");
  329. return net::async_initiate<
  330. ReadHandler,
  331. void(error_code, std::size_t)>(
  332. run_read_op{in_},
  333. handler,
  334. buffers);
  335. }
  336. template<class Executor>
  337. template<class ConstBufferSequence>
  338. std::size_t
  339. basic_stream<Executor>::
  340. write_some(ConstBufferSequence const& buffers)
  341. {
  342. static_assert(net::is_const_buffer_sequence<
  343. ConstBufferSequence>::value,
  344. "ConstBufferSequence type requirements not met");
  345. error_code ec;
  346. auto const bytes_transferred =
  347. write_some(buffers, ec);
  348. if(ec)
  349. BOOST_THROW_EXCEPTION(system_error{ec});
  350. return bytes_transferred;
  351. }
  352. template<class Executor>
  353. template<class ConstBufferSequence>
  354. std::size_t
  355. basic_stream<Executor>::
  356. write_some(
  357. ConstBufferSequence const& buffers, error_code& ec)
  358. {
  359. static_assert(net::is_const_buffer_sequence<
  360. ConstBufferSequence>::value,
  361. "ConstBufferSequence type requirements not met");
  362. ++in_->nwrite;
  363. // test failure
  364. if(in_->fc && in_->fc->fail(ec))
  365. return 0;
  366. // A request to write 0 bytes to a stream is a no-op.
  367. if(buffer_bytes(buffers) == 0)
  368. {
  369. ec = {};
  370. return 0;
  371. }
  372. // connection closed
  373. auto out = out_.lock();
  374. if(! out)
  375. {
  376. BOOST_BEAST_ASSIGN_EC(ec, net::error::connection_reset);
  377. return 0;
  378. }
  379. // copy buffers
  380. auto n = std::min<std::size_t>(
  381. buffer_bytes(buffers), in_->write_max);
  382. {
  383. std::lock_guard<std::mutex> lock(out->m);
  384. n = net::buffer_copy(out->b.prepare(n), buffers);
  385. out->b.commit(n);
  386. out->nwrite_bytes += n;
  387. out->notify_read();
  388. }
  389. return n;
  390. }
  391. template<class Executor>
  392. template<class ConstBufferSequence,
  393. BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, std::size_t)) WriteHandler>
  394. BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(WriteHandler, void(error_code, std::size_t))
  395. basic_stream<Executor>::
  396. async_write_some(
  397. ConstBufferSequence const& buffers,
  398. WriteHandler&& handler)
  399. {
  400. static_assert(net::is_const_buffer_sequence<
  401. ConstBufferSequence>::value,
  402. "ConstBufferSequence type requirements not met");
  403. return net::async_initiate<
  404. WriteHandler,
  405. void(error_code, std::size_t)>(
  406. run_write_op{in_},
  407. handler,
  408. out_,
  409. buffers);
  410. }
  411. //------------------------------------------------------------------------------
  412. template<class Executor, class TeardownHandler>
  413. void
  414. async_teardown(
  415. role_type,
  416. basic_stream<Executor>& s,
  417. TeardownHandler&& handler)
  418. {
  419. error_code ec;
  420. if( s.in_->fc &&
  421. s.in_->fc->fail(ec))
  422. return net::post(
  423. s.get_executor(),
  424. net::append(std::move(handler), ec));
  425. s.close();
  426. if( s.in_->fc &&
  427. s.in_->fc->fail(ec))
  428. {
  429. BOOST_BEAST_ASSIGN_EC(ec, net::error::eof);
  430. }
  431. else
  432. ec = {};
  433. net::post(s.get_executor(), net::append(std::move(handler), ec));
  434. }
  435. //------------------------------------------------------------------------------
  436. template<class Executor, class Arg1, class... ArgN>
  437. basic_stream<Executor>
  438. connect(stream& to, Arg1&& arg1, ArgN&&... argn)
  439. {
  440. stream from{
  441. std::forward<Arg1>(arg1),
  442. std::forward<ArgN>(argn)...};
  443. from.connect(to);
  444. return from;
  445. }
  446. template<class Executor>
  447. auto basic_stream<Executor>::get_executor() noexcept -> executor_type
  448. {
  449. return detail::extract_executor_op<Executor>()(in_->exec);
  450. }
  451. } // test
  452. } // beast
  453. } // boost
  454. #endif