channel_service.hpp 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677
  1. //
  2. // experimental/detail/channel_service.hpp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef ASIO_EXPERIMENTAL_DETAIL_CHANNEL_SERVICE_HPP
  11. #define ASIO_EXPERIMENTAL_DETAIL_CHANNEL_SERVICE_HPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include "asio/detail/config.hpp"
  16. #include "asio/associated_cancellation_slot.hpp"
  17. #include "asio/cancellation_type.hpp"
  18. #include "asio/detail/mutex.hpp"
  19. #include "asio/detail/op_queue.hpp"
  20. #include "asio/execution_context.hpp"
  21. #include "asio/experimental/detail/channel_message.hpp"
  22. #include "asio/experimental/detail/channel_receive_op.hpp"
  23. #include "asio/experimental/detail/channel_send_op.hpp"
  24. #include "asio/experimental/detail/has_signature.hpp"
  25. #include "asio/detail/push_options.hpp"
  26. namespace asio {
  27. namespace experimental {
  28. namespace detail {
  29. template <typename Mutex>
  30. class channel_service
  31. : public asio::detail::execution_context_service_base<
  32. channel_service<Mutex>>
  33. {
  34. public:
  35. // Possible states for a channel end.
  36. enum state
  37. {
  38. buffer = 0,
  39. waiter = 1,
  40. block = 2,
  41. closed = 3
  42. };
  43. // The base implementation type of all channels.
  44. struct base_implementation_type
  45. {
  46. // Default constructor.
  47. base_implementation_type()
  48. : receive_state_(block),
  49. send_state_(block),
  50. max_buffer_size_(0),
  51. next_(0),
  52. prev_(0)
  53. {
  54. }
  55. // The current state of the channel.
  56. state receive_state_ : 16;
  57. state send_state_ : 16;
  58. // The maximum number of elements that may be buffered in the channel.
  59. std::size_t max_buffer_size_;
  60. // The operations that are waiting on the channel.
  61. asio::detail::op_queue<channel_operation> waiters_;
  62. // Pointers to adjacent channel implementations in linked list.
  63. base_implementation_type* next_;
  64. base_implementation_type* prev_;
  65. // The mutex type to protect the internal implementation.
  66. mutable Mutex mutex_;
  67. };
  68. // The implementation for a specific value type.
  69. template <typename Traits, typename... Signatures>
  70. struct implementation_type;
  71. // Constructor.
  72. channel_service(execution_context& ctx);
  73. // Destroy all user-defined handler objects owned by the service.
  74. void shutdown();
  75. // Construct a new channel implementation.
  76. void construct(base_implementation_type& impl, std::size_t max_buffer_size);
  77. // Destroy a channel implementation.
  78. template <typename Traits, typename... Signatures>
  79. void destroy(implementation_type<Traits, Signatures...>& impl);
  80. // Move-construct a new channel implementation.
  81. template <typename Traits, typename... Signatures>
  82. void move_construct(implementation_type<Traits, Signatures...>& impl,
  83. implementation_type<Traits, Signatures...>& other_impl);
  84. // Move-assign from another channel implementation.
  85. template <typename Traits, typename... Signatures>
  86. void move_assign(implementation_type<Traits, Signatures...>& impl,
  87. channel_service& other_service,
  88. implementation_type<Traits, Signatures...>& other_impl);
  89. // Get the capacity of the channel.
  90. std::size_t capacity(
  91. const base_implementation_type& impl) const noexcept;
  92. // Determine whether the channel is open.
  93. bool is_open(const base_implementation_type& impl) const noexcept;
  94. // Reset the channel to its initial state.
  95. template <typename Traits, typename... Signatures>
  96. void reset(implementation_type<Traits, Signatures...>& impl);
  97. // Close the channel.
  98. template <typename Traits, typename... Signatures>
  99. void close(implementation_type<Traits, Signatures...>& impl);
  100. // Cancel all operations associated with the channel.
  101. template <typename Traits, typename... Signatures>
  102. void cancel(implementation_type<Traits, Signatures...>& impl);
  103. // Cancel the operation associated with the channel that has the given key.
  104. template <typename Traits, typename... Signatures>
  105. void cancel_by_key(implementation_type<Traits, Signatures...>& impl,
  106. void* cancellation_key);
  107. // Determine whether a value can be read from the channel without blocking.
  108. bool ready(const base_implementation_type& impl) const noexcept;
  109. // Synchronously send a new value into the channel.
  110. template <typename Message, typename Traits,
  111. typename... Signatures, typename... Args>
  112. bool try_send(implementation_type<Traits, Signatures...>& impl,
  113. bool via_dispatch, Args&&... args);
  114. // Synchronously send a number of new values into the channel.
  115. template <typename Message, typename Traits,
  116. typename... Signatures, typename... Args>
  117. std::size_t try_send_n(implementation_type<Traits, Signatures...>& impl,
  118. std::size_t count, bool via_dispatch, Args&&... args);
  119. // Asynchronously send a new value into the channel.
  120. template <typename Traits, typename... Signatures,
  121. typename Handler, typename IoExecutor>
  122. void async_send(implementation_type<Traits, Signatures...>& impl,
  123. typename implementation_type<Traits,
  124. Signatures...>::payload_type&& payload,
  125. Handler& handler, const IoExecutor& io_ex)
  126. {
  127. associated_cancellation_slot_t<Handler> slot
  128. = asio::get_associated_cancellation_slot(handler);
  129. // Allocate and construct an operation to wrap the handler.
  130. typedef channel_send_op<
  131. typename implementation_type<Traits, Signatures...>::payload_type,
  132. Handler, IoExecutor> op;
  133. typename op::ptr p = { asio::detail::addressof(handler),
  134. op::ptr::allocate(handler), 0 };
  135. p.p = new (p.v) op(static_cast<typename implementation_type<
  136. Traits, Signatures...>::payload_type&&>(payload), handler, io_ex);
  137. // Optionally register for per-operation cancellation.
  138. if (slot.is_connected())
  139. {
  140. p.p->cancellation_key_ =
  141. &slot.template emplace<op_cancellation<Traits, Signatures...>>(
  142. this, &impl);
  143. }
  144. ASIO_HANDLER_CREATION((this->context(), *p.p,
  145. "channel", &impl, 0, "async_send"));
  146. start_send_op(impl, p.p);
  147. p.v = p.p = 0;
  148. }
  149. // Synchronously receive a value from the channel.
  150. template <typename Traits, typename... Signatures, typename Handler>
  151. bool try_receive(implementation_type<Traits, Signatures...>& impl,
  152. Handler&& handler);
  153. // Asynchronously receive a value from the channel.
  154. template <typename Traits, typename... Signatures,
  155. typename Handler, typename IoExecutor>
  156. void async_receive(implementation_type<Traits, Signatures...>& impl,
  157. Handler& handler, const IoExecutor& io_ex)
  158. {
  159. associated_cancellation_slot_t<Handler> slot
  160. = asio::get_associated_cancellation_slot(handler);
  161. // Allocate and construct an operation to wrap the handler.
  162. typedef channel_receive_op<
  163. typename implementation_type<Traits, Signatures...>::payload_type,
  164. Handler, IoExecutor> op;
  165. typename op::ptr p = { asio::detail::addressof(handler),
  166. op::ptr::allocate(handler), 0 };
  167. p.p = new (p.v) op(handler, io_ex);
  168. // Optionally register for per-operation cancellation.
  169. if (slot.is_connected())
  170. {
  171. p.p->cancellation_key_ =
  172. &slot.template emplace<op_cancellation<Traits, Signatures...>>(
  173. this, &impl);
  174. }
  175. ASIO_HANDLER_CREATION((this->context(), *p.p,
  176. "channel", &impl, 0, "async_receive"));
  177. start_receive_op(impl, p.p);
  178. p.v = p.p = 0;
  179. }
  180. private:
  181. // Helper function object to handle a closed notification.
  182. template <typename Payload, typename Signature>
  183. struct post_receive
  184. {
  185. explicit post_receive(channel_receive<Payload>* op)
  186. : op_(op)
  187. {
  188. }
  189. template <typename... Args>
  190. void operator()(Args&&... args)
  191. {
  192. op_->post(
  193. channel_message<Signature>(0,
  194. static_cast<Args&&>(args)...));
  195. }
  196. channel_receive<Payload>* op_;
  197. };
  198. // Destroy a base channel implementation.
  199. void base_destroy(base_implementation_type& impl);
  200. // Helper function to start an asynchronous put operation.
  201. template <typename Traits, typename... Signatures>
  202. void start_send_op(implementation_type<Traits, Signatures...>& impl,
  203. channel_send<typename implementation_type<
  204. Traits, Signatures...>::payload_type>* send_op);
  205. // Helper function to start an asynchronous get operation.
  206. template <typename Traits, typename... Signatures>
  207. void start_receive_op(implementation_type<Traits, Signatures...>& impl,
  208. channel_receive<typename implementation_type<
  209. Traits, Signatures...>::payload_type>* receive_op);
  210. // Helper class used to implement per-operation cancellation.
  211. template <typename Traits, typename... Signatures>
  212. class op_cancellation
  213. {
  214. public:
  215. op_cancellation(channel_service* s,
  216. implementation_type<Traits, Signatures...>* impl)
  217. : service_(s),
  218. impl_(impl)
  219. {
  220. }
  221. void operator()(cancellation_type_t type)
  222. {
  223. if (!!(type &
  224. (cancellation_type::terminal
  225. | cancellation_type::partial
  226. | cancellation_type::total)))
  227. {
  228. service_->cancel_by_key(*impl_, this);
  229. }
  230. }
  231. private:
  232. channel_service* service_;
  233. implementation_type<Traits, Signatures...>* impl_;
  234. };
  235. // Mutex to protect access to the linked list of implementations.
  236. asio::detail::mutex mutex_;
  237. // The head of a linked list of all implementations.
  238. base_implementation_type* impl_list_;
  239. };
  240. // The implementation for a specific value type.
  241. template <typename Mutex>
  242. template <typename Traits, typename... Signatures>
  243. struct channel_service<Mutex>::implementation_type : base_implementation_type
  244. {
  245. // The traits type associated with the channel.
  246. typedef typename Traits::template rebind<Signatures...>::other traits_type;
  247. // Type of an element stored in the buffer.
  248. typedef conditional_t<
  249. has_signature<
  250. typename traits_type::receive_cancelled_signature,
  251. Signatures...
  252. >::value,
  253. conditional_t<
  254. has_signature<
  255. typename traits_type::receive_closed_signature,
  256. Signatures...
  257. >::value,
  258. channel_payload<Signatures...>,
  259. channel_payload<
  260. Signatures...,
  261. typename traits_type::receive_closed_signature
  262. >
  263. >,
  264. conditional_t<
  265. has_signature<
  266. typename traits_type::receive_closed_signature,
  267. Signatures...,
  268. typename traits_type::receive_cancelled_signature
  269. >::value,
  270. channel_payload<
  271. Signatures...,
  272. typename traits_type::receive_cancelled_signature
  273. >,
  274. channel_payload<
  275. Signatures...,
  276. typename traits_type::receive_cancelled_signature,
  277. typename traits_type::receive_closed_signature
  278. >
  279. >
  280. > payload_type;
  281. // Move from another buffer.
  282. void buffer_move_from(implementation_type& other)
  283. {
  284. buffer_ = static_cast<
  285. typename traits_type::template container<payload_type>::type&&>(
  286. other.buffer_);
  287. other.buffer_clear();
  288. }
  289. // Get number of buffered elements.
  290. std::size_t buffer_size() const
  291. {
  292. return buffer_.size();
  293. }
  294. // Push a new value to the back of the buffer.
  295. void buffer_push(payload_type payload)
  296. {
  297. buffer_.push_back(static_cast<payload_type&&>(payload));
  298. }
  299. // Push new values to the back of the buffer.
  300. std::size_t buffer_push_n(std::size_t count, payload_type payload)
  301. {
  302. std::size_t i = 0;
  303. for (; i < count && buffer_.size() < this->max_buffer_size_; ++i)
  304. buffer_.push_back(payload);
  305. return i;
  306. }
  307. // Get the element at the front of the buffer.
  308. payload_type buffer_front()
  309. {
  310. return static_cast<payload_type&&>(buffer_.front());
  311. }
  312. // Pop a value from the front of the buffer.
  313. void buffer_pop()
  314. {
  315. buffer_.pop_front();
  316. }
  317. // Clear all buffered values.
  318. void buffer_clear()
  319. {
  320. buffer_.clear();
  321. }
  322. private:
  323. // Buffered values.
  324. typename traits_type::template container<payload_type>::type buffer_;
  325. };
  326. // The implementation for a void value type.
  327. template <typename Mutex>
  328. template <typename Traits, typename R>
  329. struct channel_service<Mutex>::implementation_type<Traits, R()>
  330. : channel_service::base_implementation_type
  331. {
  332. // The traits type associated with the channel.
  333. typedef typename Traits::template rebind<R()>::other traits_type;
  334. // Type of an element stored in the buffer.
  335. typedef conditional_t<
  336. has_signature<
  337. typename traits_type::receive_cancelled_signature,
  338. R()
  339. >::value,
  340. conditional_t<
  341. has_signature<
  342. typename traits_type::receive_closed_signature,
  343. R()
  344. >::value,
  345. channel_payload<R()>,
  346. channel_payload<
  347. R(),
  348. typename traits_type::receive_closed_signature
  349. >
  350. >,
  351. conditional_t<
  352. has_signature<
  353. typename traits_type::receive_closed_signature,
  354. R(),
  355. typename traits_type::receive_cancelled_signature
  356. >::value,
  357. channel_payload<
  358. R(),
  359. typename traits_type::receive_cancelled_signature
  360. >,
  361. channel_payload<
  362. R(),
  363. typename traits_type::receive_cancelled_signature,
  364. typename traits_type::receive_closed_signature
  365. >
  366. >
  367. > payload_type;
  368. // Construct with empty buffer.
  369. implementation_type()
  370. : buffer_(0)
  371. {
  372. }
  373. // Move from another buffer.
  374. void buffer_move_from(implementation_type& other)
  375. {
  376. buffer_ = other.buffer_;
  377. other.buffer_ = 0;
  378. }
  379. // Get number of buffered elements.
  380. std::size_t buffer_size() const
  381. {
  382. return buffer_;
  383. }
  384. // Push a new value to the back of the buffer.
  385. void buffer_push(payload_type)
  386. {
  387. ++buffer_;
  388. }
  389. // Push new values to the back of the buffer.
  390. std::size_t buffer_push_n(std::size_t count, payload_type)
  391. {
  392. std::size_t available = this->max_buffer_size_ - buffer_;
  393. count = (count < available) ? count : available;
  394. buffer_ += count;
  395. return count;
  396. }
  397. // Get the element at the front of the buffer.
  398. payload_type buffer_front()
  399. {
  400. return payload_type(channel_message<R()>(0));
  401. }
  402. // Pop a value from the front of the buffer.
  403. void buffer_pop()
  404. {
  405. --buffer_;
  406. }
  407. // Clear all values from the buffer.
  408. void buffer_clear()
  409. {
  410. buffer_ = 0;
  411. }
  412. private:
  413. // Number of buffered "values".
  414. std::size_t buffer_;
  415. };
  416. // The implementation for an error_code signature.
  417. template <typename Mutex>
  418. template <typename Traits, typename R>
  419. struct channel_service<Mutex>::implementation_type<
  420. Traits, R(asio::error_code)>
  421. : channel_service::base_implementation_type
  422. {
  423. // The traits type associated with the channel.
  424. typedef typename Traits::template rebind<R(asio::error_code)>::other
  425. traits_type;
  426. // Type of an element stored in the buffer.
  427. typedef conditional_t<
  428. has_signature<
  429. typename traits_type::receive_cancelled_signature,
  430. R(asio::error_code)
  431. >::value,
  432. conditional_t<
  433. has_signature<
  434. typename traits_type::receive_closed_signature,
  435. R(asio::error_code)
  436. >::value,
  437. channel_payload<R(asio::error_code)>,
  438. channel_payload<
  439. R(asio::error_code),
  440. typename traits_type::receive_closed_signature
  441. >
  442. >,
  443. conditional_t<
  444. has_signature<
  445. typename traits_type::receive_closed_signature,
  446. R(asio::error_code),
  447. typename traits_type::receive_cancelled_signature
  448. >::value,
  449. channel_payload<
  450. R(asio::error_code),
  451. typename traits_type::receive_cancelled_signature
  452. >,
  453. channel_payload<
  454. R(asio::error_code),
  455. typename traits_type::receive_cancelled_signature,
  456. typename traits_type::receive_closed_signature
  457. >
  458. >
  459. > payload_type;
  460. // Construct with empty buffer.
  461. implementation_type()
  462. : size_(0)
  463. {
  464. first_.count_ = 0;
  465. }
  466. // Move from another buffer.
  467. void buffer_move_from(implementation_type& other)
  468. {
  469. size_ = other.buffer_;
  470. other.size_ = 0;
  471. first_ = other.first_;
  472. other.first.count_ = 0;
  473. rest_ = static_cast<
  474. typename traits_type::template container<buffered_value>::type&&>(
  475. other.rest_);
  476. other.buffer_clear();
  477. }
  478. // Get number of buffered elements.
  479. std::size_t buffer_size() const
  480. {
  481. return size_;
  482. }
  483. // Push a new value to the back of the buffer.
  484. void buffer_push(payload_type payload)
  485. {
  486. buffered_value& last = rest_.empty() ? first_ : rest_.back();
  487. if (last.count_ == 0)
  488. {
  489. value_handler handler{last.value_};
  490. payload.receive(handler);
  491. last.count_ = 1;
  492. }
  493. else
  494. {
  495. asio::error_code value{last.value_};
  496. value_handler handler{value};
  497. payload.receive(handler);
  498. if (last.value_ == value)
  499. ++last.count_;
  500. else
  501. rest_.push_back({value, 1});
  502. }
  503. ++size_;
  504. }
  505. // Push new values to the back of the buffer.
  506. std::size_t buffer_push_n(std::size_t count, payload_type payload)
  507. {
  508. std::size_t available = this->max_buffer_size_ - size_;
  509. count = (count < available) ? count : available;
  510. if (count > 0)
  511. {
  512. buffered_value& last = rest_.empty() ? first_ : rest_.back();
  513. if (last.count_ == 0)
  514. {
  515. payload.receive(value_handler{last.value_});
  516. last.count_ = count;
  517. }
  518. else
  519. {
  520. asio::error_code value{last.value_};
  521. payload.receive(value_handler{value});
  522. if (last.value_ == value)
  523. last.count_ += count;
  524. else
  525. rest_.push_back({value, count});
  526. }
  527. size_ += count;
  528. }
  529. return count;
  530. }
  531. // Get the element at the front of the buffer.
  532. payload_type buffer_front()
  533. {
  534. return payload_type({0, first_.value_});
  535. }
  536. // Pop a value from the front of the buffer.
  537. void buffer_pop()
  538. {
  539. --size_;
  540. if (--first_.count_ == 0 && !rest_.empty())
  541. {
  542. first_ = rest_.front();
  543. rest_.pop_front();
  544. }
  545. }
  546. // Clear all values from the buffer.
  547. void buffer_clear()
  548. {
  549. size_ = 0;
  550. first_.count_ == 0;
  551. rest_.clear();
  552. }
  553. private:
  554. struct buffered_value
  555. {
  556. asio::error_code value_;
  557. std::size_t count_;
  558. };
  559. struct value_handler
  560. {
  561. asio::error_code& target_;
  562. template <typename... Args>
  563. void operator()(const asio::error_code& value, Args&&...)
  564. {
  565. target_ = value;
  566. }
  567. };
  568. buffered_value& last_value()
  569. {
  570. return rest_.empty() ? first_ : rest_.back();
  571. }
  572. // Total number of buffered values.
  573. std::size_t size_;
  574. // The first buffered value is maintained as a separate data member to avoid
  575. // allocating space in the container in the common case.
  576. buffered_value first_;
  577. // The rest of the buffered values.
  578. typename traits_type::template container<buffered_value>::type rest_;
  579. };
  580. } // namespace detail
  581. } // namespace experimental
  582. } // namespace asio
  583. #include "asio/detail/pop_options.hpp"
  584. #include "asio/experimental/detail/impl/channel_service.hpp"
  585. #endif // ASIO_EXPERIMENTAL_DETAIL_CHANNEL_SERVICE_HPP