stream.ipp 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. //
  2. // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // Official repository: https://github.com/boostorg/beast
  8. //
  9. #ifndef BOOST_BEAST_TEST_IMPL_STREAM_IPP
  10. #define BOOST_BEAST_TEST_IMPL_STREAM_IPP
  11. #include <boost/beast/_experimental/test/stream.hpp>
  12. #include <boost/beast/core/buffer_traits.hpp>
  13. #include <boost/make_shared.hpp>
  14. #include <stdexcept>
  15. #include <vector>
  16. namespace boost {
  17. namespace beast {
  18. namespace test {
  19. //------------------------------------------------------------------------------
  20. template<class Executor>
  21. void basic_stream<Executor>::initiate_read(
  22. boost::shared_ptr<detail::stream_state> const& in_,
  23. std::unique_ptr<detail::stream_read_op_base>&& op,
  24. std::size_t buf_size)
  25. {
  26. std::unique_lock<std::mutex> lock(in_->m);
  27. ++in_->nread;
  28. if(in_->op != nullptr)
  29. BOOST_THROW_EXCEPTION(
  30. std::logic_error{"in_->op != nullptr"});
  31. // test failure
  32. error_code ec;
  33. if(in_->fc && in_->fc->fail(ec))
  34. {
  35. lock.unlock();
  36. (*op)(ec);
  37. return;
  38. }
  39. // A request to read 0 bytes from a stream is a no-op.
  40. if(buf_size == 0 || buffer_bytes(in_->b.data()) > 0)
  41. {
  42. lock.unlock();
  43. (*op)(ec);
  44. return;
  45. }
  46. // deliver error
  47. if(in_->code != detail::stream_status::ok)
  48. {
  49. lock.unlock();
  50. (*op)(net::error::eof);
  51. return;
  52. }
  53. // complete when bytes available or closed
  54. in_->op = std::move(op);
  55. }
  56. //------------------------------------------------------------------------------
  57. template<class Executor>
  58. basic_stream<Executor>::
  59. ~basic_stream()
  60. {
  61. close();
  62. in_->remove();
  63. }
  64. template<class Executor>
  65. basic_stream<Executor>::
  66. basic_stream(basic_stream&& other)
  67. {
  68. auto in = detail::stream_service::make_impl(
  69. other.in_->exec, other.in_->fc);
  70. in_ = std::move(other.in_);
  71. out_ = std::move(other.out_);
  72. other.in_ = in;
  73. }
  74. template<class Executor>
  75. basic_stream<Executor>&
  76. basic_stream<Executor>::
  77. operator=(basic_stream&& other)
  78. {
  79. close();
  80. auto in = detail::stream_service::make_impl(
  81. other.in_->exec, other.in_->fc);
  82. in_->remove();
  83. in_ = std::move(other.in_);
  84. out_ = std::move(other.out_);
  85. other.in_ = in;
  86. return *this;
  87. }
  88. //------------------------------------------------------------------------------
  89. template<class Executor>
  90. basic_stream<Executor>::
  91. basic_stream(executor_type exec)
  92. : in_(detail::stream_service::make_impl(std::move(exec), nullptr))
  93. {
  94. }
  95. template<class Executor>
  96. basic_stream<Executor>::
  97. basic_stream(
  98. net::io_context& ioc,
  99. fail_count& fc)
  100. : in_(detail::stream_service::make_impl(ioc.get_executor(), &fc))
  101. {
  102. }
  103. template<class Executor>
  104. basic_stream<Executor>::
  105. basic_stream(
  106. net::io_context& ioc,
  107. string_view s)
  108. : in_(detail::stream_service::make_impl(ioc.get_executor(), nullptr))
  109. {
  110. in_->b.commit(net::buffer_copy(
  111. in_->b.prepare(s.size()),
  112. net::buffer(s.data(), s.size())));
  113. }
  114. template<class Executor>
  115. basic_stream<Executor>::
  116. basic_stream(
  117. net::io_context& ioc,
  118. fail_count& fc,
  119. string_view s)
  120. : in_(detail::stream_service::make_impl(ioc.get_executor(), &fc))
  121. {
  122. in_->b.commit(net::buffer_copy(
  123. in_->b.prepare(s.size()),
  124. net::buffer(s.data(), s.size())));
  125. }
  126. template<class Executor>
  127. void
  128. basic_stream<Executor>::
  129. connect(basic_stream& remote)
  130. {
  131. BOOST_ASSERT(! out_.lock());
  132. BOOST_ASSERT(! remote.out_.lock());
  133. std::lock(in_->m, remote.in_->m);
  134. std::lock_guard<std::mutex> guard1{in_->m, std::adopt_lock};
  135. std::lock_guard<std::mutex> guard2{remote.in_->m, std::adopt_lock};
  136. out_ = remote.in_;
  137. remote.out_ = in_;
  138. in_->code = detail::stream_status::ok;
  139. remote.in_->code = detail::stream_status::ok;
  140. }
  141. template<class Executor>
  142. string_view
  143. basic_stream<Executor>::
  144. str() const
  145. {
  146. auto const bs = in_->b.data();
  147. if(buffer_bytes(bs) == 0)
  148. return {};
  149. net::const_buffer const b = *net::buffer_sequence_begin(bs);
  150. return {static_cast<char const*>(b.data()), b.size()};
  151. }
  152. template<class Executor>
  153. void
  154. basic_stream<Executor>::
  155. append(string_view s)
  156. {
  157. std::lock_guard<std::mutex> lock{in_->m};
  158. in_->b.commit(net::buffer_copy(
  159. in_->b.prepare(s.size()),
  160. net::buffer(s.data(), s.size())));
  161. }
  162. template<class Executor>
  163. void
  164. basic_stream<Executor>::
  165. clear()
  166. {
  167. std::lock_guard<std::mutex> lock{in_->m};
  168. in_->b.consume(in_->b.size());
  169. }
  170. template<class Executor>
  171. void
  172. basic_stream<Executor>::
  173. close()
  174. {
  175. in_->cancel_read();
  176. // disconnect
  177. {
  178. auto out = out_.lock();
  179. out_.reset();
  180. // notify peer
  181. if(out)
  182. {
  183. std::lock_guard<std::mutex> lock(out->m);
  184. if(out->code == detail::stream_status::ok)
  185. {
  186. out->code = detail::stream_status::eof;
  187. out->notify_read();
  188. }
  189. }
  190. }
  191. }
  192. template<class Executor>
  193. void
  194. basic_stream<Executor>::
  195. close_remote()
  196. {
  197. std::lock_guard<std::mutex> lock{in_->m};
  198. if(in_->code == detail::stream_status::ok)
  199. {
  200. in_->code = detail::stream_status::eof;
  201. in_->notify_read();
  202. }
  203. }
  204. template<class Executor>
  205. void
  206. teardown(
  207. role_type,
  208. basic_stream<Executor>& s,
  209. boost::system::error_code& ec)
  210. {
  211. if( s.in_->fc &&
  212. s.in_->fc->fail(ec))
  213. return;
  214. s.close();
  215. if( s.in_->fc &&
  216. s.in_->fc->fail(ec))
  217. {
  218. BOOST_BEAST_ASSIGN_EC(ec, net::error::eof);
  219. }
  220. else
  221. ec = {};
  222. }
  223. //------------------------------------------------------------------------------
  224. template<class Executor>
  225. basic_stream<Executor>
  226. connect(basic_stream<Executor>& to)
  227. {
  228. basic_stream<Executor> from(to.get_executor());
  229. from.connect(to);
  230. return from;
  231. }
  232. template<class Executor>
  233. void
  234. connect(basic_stream<Executor>& s1, basic_stream<Executor>& s2)
  235. {
  236. s1.connect(s2);
  237. }
  238. } // test
  239. } // beast
  240. } // boost
  241. #endif