channel_service.hpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681
  1. //
  2. // experimental/detail/channel_service.hpp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2024 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef BOOST_ASIO_EXPERIMENTAL_DETAIL_CHANNEL_SERVICE_HPP
  11. #define BOOST_ASIO_EXPERIMENTAL_DETAIL_CHANNEL_SERVICE_HPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <boost/asio/detail/config.hpp>
  16. #include <boost/asio/associated_cancellation_slot.hpp>
  17. #include <boost/asio/cancellation_type.hpp>
  18. #include <boost/asio/detail/completion_message.hpp>
  19. #include <boost/asio/detail/completion_payload.hpp>
  20. #include <boost/asio/detail/completion_payload_handler.hpp>
  21. #include <boost/asio/detail/mutex.hpp>
  22. #include <boost/asio/detail/op_queue.hpp>
  23. #include <boost/asio/execution_context.hpp>
  24. #include <boost/asio/experimental/detail/channel_receive_op.hpp>
  25. #include <boost/asio/experimental/detail/channel_send_op.hpp>
  26. #include <boost/asio/experimental/detail/has_signature.hpp>
  27. #include <boost/asio/detail/push_options.hpp>
  28. namespace boost {
  29. namespace asio {
  30. namespace experimental {
  31. namespace detail {
  32. template <typename Mutex>
  33. class channel_service
  34. : public boost::asio::detail::execution_context_service_base<
  35. channel_service<Mutex>>
  36. {
  37. public:
  38. // Possible states for a channel end.
  39. enum state
  40. {
  41. buffer = 0,
  42. waiter = 1,
  43. block = 2,
  44. closed = 3
  45. };
  46. // The base implementation type of all channels.
  47. struct base_implementation_type
  48. {
  49. // Default constructor.
  50. base_implementation_type()
  51. : receive_state_(block),
  52. send_state_(block),
  53. max_buffer_size_(0),
  54. next_(0),
  55. prev_(0)
  56. {
  57. }
  58. // The current state of the channel.
  59. state receive_state_ : 16;
  60. state send_state_ : 16;
  61. // The maximum number of elements that may be buffered in the channel.
  62. std::size_t max_buffer_size_;
  63. // The operations that are waiting on the channel.
  64. boost::asio::detail::op_queue<channel_operation> waiters_;
  65. // Pointers to adjacent channel implementations in linked list.
  66. base_implementation_type* next_;
  67. base_implementation_type* prev_;
  68. // The mutex type to protect the internal implementation.
  69. mutable Mutex mutex_;
  70. };
  71. // The implementation for a specific value type.
  72. template <typename Traits, typename... Signatures>
  73. struct implementation_type;
  74. // Constructor.
  75. channel_service(boost::asio::execution_context& ctx);
  76. // Destroy all user-defined handler objects owned by the service.
  77. void shutdown();
  78. // Construct a new channel implementation.
  79. void construct(base_implementation_type& impl, std::size_t max_buffer_size);
  80. // Destroy a channel implementation.
  81. template <typename Traits, typename... Signatures>
  82. void destroy(implementation_type<Traits, Signatures...>& impl);
  83. // Move-construct a new channel implementation.
  84. template <typename Traits, typename... Signatures>
  85. void move_construct(implementation_type<Traits, Signatures...>& impl,
  86. implementation_type<Traits, Signatures...>& other_impl);
  87. // Move-assign from another channel implementation.
  88. template <typename Traits, typename... Signatures>
  89. void move_assign(implementation_type<Traits, Signatures...>& impl,
  90. channel_service& other_service,
  91. implementation_type<Traits, Signatures...>& other_impl);
  92. // Get the capacity of the channel.
  93. std::size_t capacity(
  94. const base_implementation_type& impl) const noexcept;
  95. // Determine whether the channel is open.
  96. bool is_open(const base_implementation_type& impl) const noexcept;
  97. // Reset the channel to its initial state.
  98. template <typename Traits, typename... Signatures>
  99. void reset(implementation_type<Traits, Signatures...>& impl);
  100. // Close the channel.
  101. template <typename Traits, typename... Signatures>
  102. void close(implementation_type<Traits, Signatures...>& impl);
  103. // Cancel all operations associated with the channel.
  104. template <typename Traits, typename... Signatures>
  105. void cancel(implementation_type<Traits, Signatures...>& impl);
  106. // Cancel the operation associated with the channel that has the given key.
  107. template <typename Traits, typename... Signatures>
  108. void cancel_by_key(implementation_type<Traits, Signatures...>& impl,
  109. void* cancellation_key);
  110. // Determine whether a value can be read from the channel without blocking.
  111. bool ready(const base_implementation_type& impl) const noexcept;
  112. // Synchronously send a new value into the channel.
  113. template <typename Message, typename Traits,
  114. typename... Signatures, typename... Args>
  115. bool try_send(implementation_type<Traits, Signatures...>& impl,
  116. bool via_dispatch, Args&&... args);
  117. // Synchronously send a number of new values into the channel.
  118. template <typename Message, typename Traits,
  119. typename... Signatures, typename... Args>
  120. std::size_t try_send_n(implementation_type<Traits, Signatures...>& impl,
  121. std::size_t count, bool via_dispatch, Args&&... args);
  122. // Asynchronously send a new value into the channel.
  123. template <typename Traits, typename... Signatures,
  124. typename Handler, typename IoExecutor>
  125. void async_send(implementation_type<Traits, Signatures...>& impl,
  126. typename implementation_type<Traits,
  127. Signatures...>::payload_type&& payload,
  128. Handler& handler, const IoExecutor& io_ex)
  129. {
  130. associated_cancellation_slot_t<Handler> slot
  131. = boost::asio::get_associated_cancellation_slot(handler);
  132. // Allocate and construct an operation to wrap the handler.
  133. typedef channel_send_op<
  134. typename implementation_type<Traits, Signatures...>::payload_type,
  135. Handler, IoExecutor> op;
  136. typename op::ptr p = { boost::asio::detail::addressof(handler),
  137. op::ptr::allocate(handler), 0 };
  138. p.p = new (p.v) op(static_cast<typename implementation_type<
  139. Traits, Signatures...>::payload_type&&>(payload), handler, io_ex);
  140. // Optionally register for per-operation cancellation.
  141. if (slot.is_connected())
  142. {
  143. p.p->cancellation_key_ =
  144. &slot.template emplace<op_cancellation<Traits, Signatures...>>(
  145. this, &impl);
  146. }
  147. BOOST_ASIO_HANDLER_CREATION((this->context(), *p.p,
  148. "channel", &impl, 0, "async_send"));
  149. start_send_op(impl, p.p);
  150. p.v = p.p = 0;
  151. }
  152. // Synchronously receive a value from the channel.
  153. template <typename Traits, typename... Signatures, typename Handler>
  154. bool try_receive(implementation_type<Traits, Signatures...>& impl,
  155. Handler&& handler);
  156. // Asynchronously receive a value from the channel.
  157. template <typename Traits, typename... Signatures,
  158. typename Handler, typename IoExecutor>
  159. void async_receive(implementation_type<Traits, Signatures...>& impl,
  160. Handler& handler, const IoExecutor& io_ex)
  161. {
  162. associated_cancellation_slot_t<Handler> slot
  163. = boost::asio::get_associated_cancellation_slot(handler);
  164. // Allocate and construct an operation to wrap the handler.
  165. typedef channel_receive_op<
  166. typename implementation_type<Traits, Signatures...>::payload_type,
  167. Handler, IoExecutor> op;
  168. typename op::ptr p = { boost::asio::detail::addressof(handler),
  169. op::ptr::allocate(handler), 0 };
  170. p.p = new (p.v) op(handler, io_ex);
  171. // Optionally register for per-operation cancellation.
  172. if (slot.is_connected())
  173. {
  174. p.p->cancellation_key_ =
  175. &slot.template emplace<op_cancellation<Traits, Signatures...>>(
  176. this, &impl);
  177. }
  178. BOOST_ASIO_HANDLER_CREATION((this->context(), *p.p,
  179. "channel", &impl, 0, "async_receive"));
  180. start_receive_op(impl, p.p);
  181. p.v = p.p = 0;
  182. }
  183. private:
  184. // Helper function object to handle a closed notification.
  185. template <typename Payload, typename Signature>
  186. struct post_receive
  187. {
  188. explicit post_receive(channel_receive<Payload>* op)
  189. : op_(op)
  190. {
  191. }
  192. template <typename... Args>
  193. void operator()(Args&&... args)
  194. {
  195. op_->post(
  196. boost::asio::detail::completion_message<Signature>(0,
  197. static_cast<Args&&>(args)...));
  198. }
  199. channel_receive<Payload>* op_;
  200. };
  201. // Destroy a base channel implementation.
  202. void base_destroy(base_implementation_type& impl);
  203. // Helper function to start an asynchronous put operation.
  204. template <typename Traits, typename... Signatures>
  205. void start_send_op(implementation_type<Traits, Signatures...>& impl,
  206. channel_send<typename implementation_type<
  207. Traits, Signatures...>::payload_type>* send_op);
  208. // Helper function to start an asynchronous get operation.
  209. template <typename Traits, typename... Signatures>
  210. void start_receive_op(implementation_type<Traits, Signatures...>& impl,
  211. channel_receive<typename implementation_type<
  212. Traits, Signatures...>::payload_type>* receive_op);
  213. // Helper class used to implement per-operation cancellation.
  214. template <typename Traits, typename... Signatures>
  215. class op_cancellation
  216. {
  217. public:
  218. op_cancellation(channel_service* s,
  219. implementation_type<Traits, Signatures...>* impl)
  220. : service_(s),
  221. impl_(impl)
  222. {
  223. }
  224. void operator()(cancellation_type_t type)
  225. {
  226. if (!!(type &
  227. (cancellation_type::terminal
  228. | cancellation_type::partial
  229. | cancellation_type::total)))
  230. {
  231. service_->cancel_by_key(*impl_, this);
  232. }
  233. }
  234. private:
  235. channel_service* service_;
  236. implementation_type<Traits, Signatures...>* impl_;
  237. };
  238. // Mutex to protect access to the linked list of implementations.
  239. boost::asio::detail::mutex mutex_;
  240. // The head of a linked list of all implementations.
  241. base_implementation_type* impl_list_;
  242. };
  243. // The implementation for a specific value type.
  244. template <typename Mutex>
  245. template <typename Traits, typename... Signatures>
  246. struct channel_service<Mutex>::implementation_type : base_implementation_type
  247. {
  248. // The traits type associated with the channel.
  249. typedef typename Traits::template rebind<Signatures...>::other traits_type;
  250. // Type of an element stored in the buffer.
  251. typedef conditional_t<
  252. has_signature<
  253. typename traits_type::receive_cancelled_signature,
  254. Signatures...
  255. >::value,
  256. conditional_t<
  257. has_signature<
  258. typename traits_type::receive_closed_signature,
  259. Signatures...
  260. >::value,
  261. boost::asio::detail::completion_payload<Signatures...>,
  262. boost::asio::detail::completion_payload<
  263. Signatures...,
  264. typename traits_type::receive_closed_signature
  265. >
  266. >,
  267. conditional_t<
  268. has_signature<
  269. typename traits_type::receive_closed_signature,
  270. Signatures...,
  271. typename traits_type::receive_cancelled_signature
  272. >::value,
  273. boost::asio::detail::completion_payload<
  274. Signatures...,
  275. typename traits_type::receive_cancelled_signature
  276. >,
  277. boost::asio::detail::completion_payload<
  278. Signatures...,
  279. typename traits_type::receive_cancelled_signature,
  280. typename traits_type::receive_closed_signature
  281. >
  282. >
  283. > payload_type;
  284. // Move from another buffer.
  285. void buffer_move_from(implementation_type& other)
  286. {
  287. buffer_ = static_cast<
  288. typename traits_type::template container<payload_type>::type&&>(
  289. other.buffer_);
  290. other.buffer_clear();
  291. }
  292. // Get number of buffered elements.
  293. std::size_t buffer_size() const
  294. {
  295. return buffer_.size();
  296. }
  297. // Push a new value to the back of the buffer.
  298. void buffer_push(payload_type payload)
  299. {
  300. buffer_.push_back(static_cast<payload_type&&>(payload));
  301. }
  302. // Push new values to the back of the buffer.
  303. std::size_t buffer_push_n(std::size_t count, payload_type payload)
  304. {
  305. std::size_t i = 0;
  306. for (; i < count && buffer_.size() < this->max_buffer_size_; ++i)
  307. buffer_.push_back(payload);
  308. return i;
  309. }
  310. // Get the element at the front of the buffer.
  311. payload_type buffer_front()
  312. {
  313. return static_cast<payload_type&&>(buffer_.front());
  314. }
  315. // Pop a value from the front of the buffer.
  316. void buffer_pop()
  317. {
  318. buffer_.pop_front();
  319. }
  320. // Clear all buffered values.
  321. void buffer_clear()
  322. {
  323. buffer_.clear();
  324. }
  325. private:
  326. // Buffered values.
  327. typename traits_type::template container<payload_type>::type buffer_;
  328. };
  329. // The implementation for a void value type.
  330. template <typename Mutex>
  331. template <typename Traits, typename R>
  332. struct channel_service<Mutex>::implementation_type<Traits, R()>
  333. : channel_service::base_implementation_type
  334. {
  335. // The traits type associated with the channel.
  336. typedef typename Traits::template rebind<R()>::other traits_type;
  337. // Type of an element stored in the buffer.
  338. typedef conditional_t<
  339. has_signature<
  340. typename traits_type::receive_cancelled_signature,
  341. R()
  342. >::value,
  343. conditional_t<
  344. has_signature<
  345. typename traits_type::receive_closed_signature,
  346. R()
  347. >::value,
  348. boost::asio::detail::completion_payload<R()>,
  349. boost::asio::detail::completion_payload<
  350. R(),
  351. typename traits_type::receive_closed_signature
  352. >
  353. >,
  354. conditional_t<
  355. has_signature<
  356. typename traits_type::receive_closed_signature,
  357. R(),
  358. typename traits_type::receive_cancelled_signature
  359. >::value,
  360. boost::asio::detail::completion_payload<
  361. R(),
  362. typename traits_type::receive_cancelled_signature
  363. >,
  364. boost::asio::detail::completion_payload<
  365. R(),
  366. typename traits_type::receive_cancelled_signature,
  367. typename traits_type::receive_closed_signature
  368. >
  369. >
  370. > payload_type;
  371. // Construct with empty buffer.
  372. implementation_type()
  373. : buffer_(0)
  374. {
  375. }
  376. // Move from another buffer.
  377. void buffer_move_from(implementation_type& other)
  378. {
  379. buffer_ = other.buffer_;
  380. other.buffer_ = 0;
  381. }
  382. // Get number of buffered elements.
  383. std::size_t buffer_size() const
  384. {
  385. return buffer_;
  386. }
  387. // Push a new value to the back of the buffer.
  388. void buffer_push(payload_type)
  389. {
  390. ++buffer_;
  391. }
  392. // Push new values to the back of the buffer.
  393. std::size_t buffer_push_n(std::size_t count, payload_type)
  394. {
  395. std::size_t available = this->max_buffer_size_ - buffer_;
  396. count = (count < available) ? count : available;
  397. buffer_ += count;
  398. return count;
  399. }
  400. // Get the element at the front of the buffer.
  401. payload_type buffer_front()
  402. {
  403. return payload_type(boost::asio::detail::completion_message<R()>(0));
  404. }
  405. // Pop a value from the front of the buffer.
  406. void buffer_pop()
  407. {
  408. --buffer_;
  409. }
  410. // Clear all values from the buffer.
  411. void buffer_clear()
  412. {
  413. buffer_ = 0;
  414. }
  415. private:
  416. // Number of buffered "values".
  417. std::size_t buffer_;
  418. };
  419. // The implementation for an error_code signature.
  420. template <typename Mutex>
  421. template <typename Traits, typename R>
  422. struct channel_service<Mutex>::implementation_type<
  423. Traits, R(boost::system::error_code)>
  424. : channel_service::base_implementation_type
  425. {
  426. // The traits type associated with the channel.
  427. typedef typename Traits::template rebind<R(boost::system::error_code)>::other
  428. traits_type;
  429. // Type of an element stored in the buffer.
  430. typedef conditional_t<
  431. has_signature<
  432. typename traits_type::receive_cancelled_signature,
  433. R(boost::system::error_code)
  434. >::value,
  435. conditional_t<
  436. has_signature<
  437. typename traits_type::receive_closed_signature,
  438. R(boost::system::error_code)
  439. >::value,
  440. boost::asio::detail::completion_payload<R(boost::system::error_code)>,
  441. boost::asio::detail::completion_payload<
  442. R(boost::system::error_code),
  443. typename traits_type::receive_closed_signature
  444. >
  445. >,
  446. conditional_t<
  447. has_signature<
  448. typename traits_type::receive_closed_signature,
  449. R(boost::system::error_code),
  450. typename traits_type::receive_cancelled_signature
  451. >::value,
  452. boost::asio::detail::completion_payload<
  453. R(boost::system::error_code),
  454. typename traits_type::receive_cancelled_signature
  455. >,
  456. boost::asio::detail::completion_payload<
  457. R(boost::system::error_code),
  458. typename traits_type::receive_cancelled_signature,
  459. typename traits_type::receive_closed_signature
  460. >
  461. >
  462. > payload_type;
  463. // Construct with empty buffer.
  464. implementation_type()
  465. : size_(0)
  466. {
  467. first_.count_ = 0;
  468. }
  469. // Move from another buffer.
  470. void buffer_move_from(implementation_type& other)
  471. {
  472. size_ = other.buffer_;
  473. other.size_ = 0;
  474. first_ = other.first_;
  475. other.first.count_ = 0;
  476. rest_ = static_cast<
  477. typename traits_type::template container<buffered_value>::type&&>(
  478. other.rest_);
  479. other.buffer_clear();
  480. }
  481. // Get number of buffered elements.
  482. std::size_t buffer_size() const
  483. {
  484. return size_;
  485. }
  486. // Push a new value to the back of the buffer.
  487. void buffer_push(payload_type payload)
  488. {
  489. buffered_value& last = rest_.empty() ? first_ : rest_.back();
  490. if (last.count_ == 0)
  491. {
  492. value_handler handler{last.value_};
  493. payload.receive(handler);
  494. last.count_ = 1;
  495. }
  496. else
  497. {
  498. boost::system::error_code value{last.value_};
  499. value_handler handler{value};
  500. payload.receive(handler);
  501. if (last.value_ == value)
  502. ++last.count_;
  503. else
  504. rest_.push_back({value, 1});
  505. }
  506. ++size_;
  507. }
  508. // Push new values to the back of the buffer.
  509. std::size_t buffer_push_n(std::size_t count, payload_type payload)
  510. {
  511. std::size_t available = this->max_buffer_size_ - size_;
  512. count = (count < available) ? count : available;
  513. if (count > 0)
  514. {
  515. buffered_value& last = rest_.empty() ? first_ : rest_.back();
  516. if (last.count_ == 0)
  517. {
  518. payload.receive(value_handler{last.value_});
  519. last.count_ = count;
  520. }
  521. else
  522. {
  523. boost::system::error_code value{last.value_};
  524. payload.receive(value_handler{value});
  525. if (last.value_ == value)
  526. last.count_ += count;
  527. else
  528. rest_.push_back({value, count});
  529. }
  530. size_ += count;
  531. }
  532. return count;
  533. }
  534. // Get the element at the front of the buffer.
  535. payload_type buffer_front()
  536. {
  537. return payload_type({0, first_.value_});
  538. }
  539. // Pop a value from the front of the buffer.
  540. void buffer_pop()
  541. {
  542. --size_;
  543. if (--first_.count_ == 0 && !rest_.empty())
  544. {
  545. first_ = rest_.front();
  546. rest_.pop_front();
  547. }
  548. }
  549. // Clear all values from the buffer.
  550. void buffer_clear()
  551. {
  552. size_ = 0;
  553. first_.count_ == 0;
  554. rest_.clear();
  555. }
  556. private:
  557. struct buffered_value
  558. {
  559. boost::system::error_code value_;
  560. std::size_t count_;
  561. };
  562. struct value_handler
  563. {
  564. boost::system::error_code& target_;
  565. template <typename... Args>
  566. void operator()(const boost::system::error_code& value, Args&&...)
  567. {
  568. target_ = value;
  569. }
  570. };
  571. buffered_value& last_value()
  572. {
  573. return rest_.empty() ? first_ : rest_.back();
  574. }
  575. // Total number of buffered values.
  576. std::size_t size_;
  577. // The first buffered value is maintained as a separate data member to avoid
  578. // allocating space in the container in the common case.
  579. buffered_value first_;
  580. // The rest of the buffered values.
  581. typename traits_type::template container<buffered_value>::type rest_;
  582. };
  583. } // namespace detail
  584. } // namespace experimental
  585. } // namespace asio
  586. } // namespace boost
  587. #include <boost/asio/detail/pop_options.hpp>
  588. #include <boost/asio/experimental/detail/impl/channel_service.hpp>
  589. #endif // BOOST_ASIO_EXPERIMENTAL_DETAIL_CHANNEL_SERVICE_HPP