channel_service.hpp 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620
  1. //
  2. // experimental/detail/impl/channel_service.hpp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef ASIO_EXPERIMENTAL_DETAIL_IMPL_CHANNEL_SERVICE_HPP
  11. #define ASIO_EXPERIMENTAL_DETAIL_IMPL_CHANNEL_SERVICE_HPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include "asio/detail/push_options.hpp"
  16. namespace asio {
  17. namespace experimental {
  18. namespace detail {
  19. template <typename Mutex>
  20. inline channel_service<Mutex>::channel_service(execution_context& ctx)
  21. : asio::detail::execution_context_service_base<channel_service>(ctx),
  22. mutex_(),
  23. impl_list_(0)
  24. {
  25. }
  26. template <typename Mutex>
  27. inline void channel_service<Mutex>::shutdown()
  28. {
  29. // Abandon all pending operations.
  30. asio::detail::op_queue<channel_operation> ops;
  31. asio::detail::mutex::scoped_lock lock(mutex_);
  32. base_implementation_type* impl = impl_list_;
  33. while (impl)
  34. {
  35. ops.push(impl->waiters_);
  36. impl = impl->next_;
  37. }
  38. }
  39. template <typename Mutex>
  40. inline void channel_service<Mutex>::construct(
  41. channel_service<Mutex>::base_implementation_type& impl,
  42. std::size_t max_buffer_size)
  43. {
  44. impl.max_buffer_size_ = max_buffer_size;
  45. impl.receive_state_ = block;
  46. impl.send_state_ = max_buffer_size ? buffer : block;
  47. // Insert implementation into linked list of all implementations.
  48. asio::detail::mutex::scoped_lock lock(mutex_);
  49. impl.next_ = impl_list_;
  50. impl.prev_ = 0;
  51. if (impl_list_)
  52. impl_list_->prev_ = &impl;
  53. impl_list_ = &impl;
  54. }
  55. template <typename Mutex>
  56. template <typename Traits, typename... Signatures>
  57. void channel_service<Mutex>::destroy(
  58. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
  59. {
  60. cancel(impl);
  61. base_destroy(impl);
  62. }
  63. template <typename Mutex>
  64. template <typename Traits, typename... Signatures>
  65. void channel_service<Mutex>::move_construct(
  66. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
  67. channel_service<Mutex>::implementation_type<
  68. Traits, Signatures...>& other_impl)
  69. {
  70. impl.max_buffer_size_ = other_impl.max_buffer_size_;
  71. impl.receive_state_ = other_impl.receive_state_;
  72. other_impl.receive_state_ = block;
  73. impl.send_state_ = other_impl.send_state_;
  74. other_impl.send_state_ = other_impl.max_buffer_size_ ? buffer : block;
  75. impl.buffer_move_from(other_impl);
  76. // Insert implementation into linked list of all implementations.
  77. asio::detail::mutex::scoped_lock lock(mutex_);
  78. impl.next_ = impl_list_;
  79. impl.prev_ = 0;
  80. if (impl_list_)
  81. impl_list_->prev_ = &impl;
  82. impl_list_ = &impl;
  83. }
  84. template <typename Mutex>
  85. template <typename Traits, typename... Signatures>
  86. void channel_service<Mutex>::move_assign(
  87. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
  88. channel_service& other_service,
  89. channel_service<Mutex>::implementation_type<
  90. Traits, Signatures...>& other_impl)
  91. {
  92. cancel(impl);
  93. if (this != &other_service)
  94. {
  95. // Remove implementation from linked list of all implementations.
  96. asio::detail::mutex::scoped_lock lock(mutex_);
  97. if (impl_list_ == &impl)
  98. impl_list_ = impl.next_;
  99. if (impl.prev_)
  100. impl.prev_->next_ = impl.next_;
  101. if (impl.next_)
  102. impl.next_->prev_= impl.prev_;
  103. impl.next_ = 0;
  104. impl.prev_ = 0;
  105. }
  106. impl.max_buffer_size_ = other_impl.max_buffer_size_;
  107. impl.receive_state_ = other_impl.receive_state_;
  108. other_impl.receive_state_ = block;
  109. impl.send_state_ = other_impl.send_state_;
  110. other_impl.send_state_ = other_impl.max_buffer_size_ ? buffer : block;
  111. impl.buffer_move_from(other_impl);
  112. if (this != &other_service)
  113. {
  114. // Insert implementation into linked list of all implementations.
  115. asio::detail::mutex::scoped_lock lock(other_service.mutex_);
  116. impl.next_ = other_service.impl_list_;
  117. impl.prev_ = 0;
  118. if (other_service.impl_list_)
  119. other_service.impl_list_->prev_ = &impl;
  120. other_service.impl_list_ = &impl;
  121. }
  122. }
  123. template <typename Mutex>
  124. inline void channel_service<Mutex>::base_destroy(
  125. channel_service<Mutex>::base_implementation_type& impl)
  126. {
  127. // Remove implementation from linked list of all implementations.
  128. asio::detail::mutex::scoped_lock lock(mutex_);
  129. if (impl_list_ == &impl)
  130. impl_list_ = impl.next_;
  131. if (impl.prev_)
  132. impl.prev_->next_ = impl.next_;
  133. if (impl.next_)
  134. impl.next_->prev_= impl.prev_;
  135. impl.next_ = 0;
  136. impl.prev_ = 0;
  137. }
  138. template <typename Mutex>
  139. inline std::size_t channel_service<Mutex>::capacity(
  140. const channel_service<Mutex>::base_implementation_type& impl)
  141. const noexcept
  142. {
  143. typename Mutex::scoped_lock lock(impl.mutex_);
  144. return impl.max_buffer_size_;
  145. }
  146. template <typename Mutex>
  147. inline bool channel_service<Mutex>::is_open(
  148. const channel_service<Mutex>::base_implementation_type& impl)
  149. const noexcept
  150. {
  151. typename Mutex::scoped_lock lock(impl.mutex_);
  152. return impl.send_state_ != closed;
  153. }
  154. template <typename Mutex>
  155. template <typename Traits, typename... Signatures>
  156. void channel_service<Mutex>::reset(
  157. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
  158. {
  159. cancel(impl);
  160. typename Mutex::scoped_lock lock(impl.mutex_);
  161. impl.receive_state_ = block;
  162. impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
  163. impl.buffer_clear();
  164. }
  165. template <typename Mutex>
  166. template <typename Traits, typename... Signatures>
  167. void channel_service<Mutex>::close(
  168. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
  169. {
  170. typedef typename implementation_type<Traits,
  171. Signatures...>::traits_type traits_type;
  172. typedef typename implementation_type<Traits,
  173. Signatures...>::payload_type payload_type;
  174. typename Mutex::scoped_lock lock(impl.mutex_);
  175. if (impl.receive_state_ == block)
  176. {
  177. while (channel_operation* op = impl.waiters_.front())
  178. {
  179. impl.waiters_.pop();
  180. traits_type::invoke_receive_closed(
  181. post_receive<payload_type,
  182. typename traits_type::receive_closed_signature>(
  183. static_cast<channel_receive<payload_type>*>(op)));
  184. }
  185. }
  186. impl.send_state_ = closed;
  187. if (impl.receive_state_ != buffer)
  188. impl.receive_state_ = closed;
  189. }
  190. template <typename Mutex>
  191. template <typename Traits, typename... Signatures>
  192. void channel_service<Mutex>::cancel(
  193. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
  194. {
  195. typedef typename implementation_type<Traits,
  196. Signatures...>::traits_type traits_type;
  197. typedef typename implementation_type<Traits,
  198. Signatures...>::payload_type payload_type;
  199. typename Mutex::scoped_lock lock(impl.mutex_);
  200. while (channel_operation* op = impl.waiters_.front())
  201. {
  202. if (impl.send_state_ == block)
  203. {
  204. impl.waiters_.pop();
  205. static_cast<channel_send<payload_type>*>(op)->cancel();
  206. }
  207. else
  208. {
  209. impl.waiters_.pop();
  210. traits_type::invoke_receive_cancelled(
  211. post_receive<payload_type,
  212. typename traits_type::receive_cancelled_signature>(
  213. static_cast<channel_receive<payload_type>*>(op)));
  214. }
  215. }
  216. if (impl.receive_state_ == waiter)
  217. impl.receive_state_ = block;
  218. if (impl.send_state_ == waiter)
  219. impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
  220. }
  221. template <typename Mutex>
  222. template <typename Traits, typename... Signatures>
  223. void channel_service<Mutex>::cancel_by_key(
  224. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
  225. void* cancellation_key)
  226. {
  227. typedef typename implementation_type<Traits,
  228. Signatures...>::traits_type traits_type;
  229. typedef typename implementation_type<Traits,
  230. Signatures...>::payload_type payload_type;
  231. typename Mutex::scoped_lock lock(impl.mutex_);
  232. asio::detail::op_queue<channel_operation> other_ops;
  233. while (channel_operation* op = impl.waiters_.front())
  234. {
  235. if (op->cancellation_key_ == cancellation_key)
  236. {
  237. if (impl.send_state_ == block)
  238. {
  239. impl.waiters_.pop();
  240. static_cast<channel_send<payload_type>*>(op)->cancel();
  241. }
  242. else
  243. {
  244. impl.waiters_.pop();
  245. traits_type::invoke_receive_cancelled(
  246. post_receive<payload_type,
  247. typename traits_type::receive_cancelled_signature>(
  248. static_cast<channel_receive<payload_type>*>(op)));
  249. }
  250. }
  251. else
  252. {
  253. impl.waiters_.pop();
  254. other_ops.push(op);
  255. }
  256. }
  257. impl.waiters_.push(other_ops);
  258. if (impl.waiters_.empty())
  259. {
  260. if (impl.receive_state_ == waiter)
  261. impl.receive_state_ = block;
  262. if (impl.send_state_ == waiter)
  263. impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
  264. }
  265. }
  266. template <typename Mutex>
  267. inline bool channel_service<Mutex>::ready(
  268. const channel_service<Mutex>::base_implementation_type& impl)
  269. const noexcept
  270. {
  271. typename Mutex::scoped_lock lock(impl.mutex_);
  272. return impl.receive_state_ != block;
  273. }
  274. template <typename Mutex>
  275. template <typename Message, typename Traits,
  276. typename... Signatures, typename... Args>
  277. bool channel_service<Mutex>::try_send(
  278. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
  279. bool via_dispatch, Args&&... args)
  280. {
  281. typedef typename implementation_type<Traits,
  282. Signatures...>::payload_type payload_type;
  283. typename Mutex::scoped_lock lock(impl.mutex_);
  284. switch (impl.send_state_)
  285. {
  286. case block:
  287. {
  288. return false;
  289. }
  290. case buffer:
  291. {
  292. impl.buffer_push(Message(0, static_cast<Args&&>(args)...));
  293. impl.receive_state_ = buffer;
  294. if (impl.buffer_size() == impl.max_buffer_size_)
  295. impl.send_state_ = block;
  296. return true;
  297. }
  298. case waiter:
  299. {
  300. payload_type payload(Message(0, static_cast<Args&&>(args)...));
  301. channel_receive<payload_type>* receive_op =
  302. static_cast<channel_receive<payload_type>*>(impl.waiters_.front());
  303. impl.waiters_.pop();
  304. if (impl.waiters_.empty())
  305. impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
  306. lock.unlock();
  307. if (via_dispatch)
  308. receive_op->dispatch(static_cast<payload_type&&>(payload));
  309. else
  310. receive_op->post(static_cast<payload_type&&>(payload));
  311. return true;
  312. }
  313. case closed:
  314. default:
  315. {
  316. return false;
  317. }
  318. }
  319. }
  320. template <typename Mutex>
  321. template <typename Message, typename Traits,
  322. typename... Signatures, typename... Args>
  323. std::size_t channel_service<Mutex>::try_send_n(
  324. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
  325. std::size_t count, bool via_dispatch, Args&&... args)
  326. {
  327. typedef typename implementation_type<Traits,
  328. Signatures...>::payload_type payload_type;
  329. typename Mutex::scoped_lock lock(impl.mutex_);
  330. if (count == 0)
  331. return 0;
  332. switch (impl.send_state_)
  333. {
  334. case block:
  335. return 0;
  336. case buffer:
  337. case waiter:
  338. break;
  339. case closed:
  340. default:
  341. return 0;
  342. }
  343. payload_type payload(Message(0, static_cast<Args&&>(args)...));
  344. for (std::size_t i = 0; i < count; ++i)
  345. {
  346. switch (impl.send_state_)
  347. {
  348. case block:
  349. {
  350. return i;
  351. }
  352. case buffer:
  353. {
  354. i += impl.buffer_push_n(count - i,
  355. static_cast<payload_type&&>(payload));
  356. impl.receive_state_ = buffer;
  357. if (impl.buffer_size() == impl.max_buffer_size_)
  358. impl.send_state_ = block;
  359. return i;
  360. }
  361. case waiter:
  362. {
  363. channel_receive<payload_type>* receive_op =
  364. static_cast<channel_receive<payload_type>*>(impl.waiters_.front());
  365. impl.waiters_.pop();
  366. if (impl.waiters_.empty())
  367. impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
  368. lock.unlock();
  369. if (via_dispatch)
  370. receive_op->dispatch(payload);
  371. else
  372. receive_op->post(payload);
  373. break;
  374. }
  375. case closed:
  376. default:
  377. {
  378. return i;
  379. }
  380. }
  381. }
  382. return count;
  383. }
  384. template <typename Mutex>
  385. template <typename Traits, typename... Signatures>
  386. void channel_service<Mutex>::start_send_op(
  387. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
  388. channel_send<typename implementation_type<
  389. Traits, Signatures...>::payload_type>* send_op)
  390. {
  391. typedef typename implementation_type<Traits,
  392. Signatures...>::payload_type payload_type;
  393. typename Mutex::scoped_lock lock(impl.mutex_);
  394. switch (impl.send_state_)
  395. {
  396. case block:
  397. {
  398. impl.waiters_.push(send_op);
  399. if (impl.receive_state_ == block)
  400. impl.receive_state_ = waiter;
  401. return;
  402. }
  403. case buffer:
  404. {
  405. impl.buffer_push(send_op->get_payload());
  406. impl.receive_state_ = buffer;
  407. if (impl.buffer_size() == impl.max_buffer_size_)
  408. impl.send_state_ = block;
  409. send_op->immediate();
  410. break;
  411. }
  412. case waiter:
  413. {
  414. channel_receive<payload_type>* receive_op =
  415. static_cast<channel_receive<payload_type>*>(impl.waiters_.front());
  416. impl.waiters_.pop();
  417. if (impl.waiters_.empty())
  418. impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
  419. receive_op->post(send_op->get_payload());
  420. send_op->immediate();
  421. break;
  422. }
  423. case closed:
  424. default:
  425. {
  426. send_op->close();
  427. break;
  428. }
  429. }
  430. }
  431. template <typename Mutex>
  432. template <typename Traits, typename... Signatures, typename Handler>
  433. bool channel_service<Mutex>::try_receive(
  434. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
  435. Handler&& handler)
  436. {
  437. typedef typename implementation_type<Traits,
  438. Signatures...>::payload_type payload_type;
  439. typename Mutex::scoped_lock lock(impl.mutex_);
  440. switch (impl.receive_state_)
  441. {
  442. case block:
  443. {
  444. return false;
  445. }
  446. case buffer:
  447. {
  448. payload_type payload(impl.buffer_front());
  449. if (channel_send<payload_type>* send_op =
  450. static_cast<channel_send<payload_type>*>(impl.waiters_.front()))
  451. {
  452. impl.buffer_pop();
  453. impl.buffer_push(send_op->get_payload());
  454. impl.waiters_.pop();
  455. send_op->post();
  456. }
  457. else
  458. {
  459. impl.buffer_pop();
  460. if (impl.buffer_size() == 0)
  461. impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
  462. impl.send_state_ = (impl.send_state_ == closed) ? closed : buffer;
  463. }
  464. lock.unlock();
  465. asio::detail::non_const_lvalue<Handler> handler2(handler);
  466. channel_handler<payload_type, decay_t<Handler>>(
  467. static_cast<payload_type&&>(payload), handler2.value)();
  468. return true;
  469. }
  470. case waiter:
  471. {
  472. channel_send<payload_type>* send_op =
  473. static_cast<channel_send<payload_type>*>(impl.waiters_.front());
  474. payload_type payload = send_op->get_payload();
  475. impl.waiters_.pop();
  476. if (impl.waiters_.front() == 0)
  477. impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
  478. send_op->post();
  479. lock.unlock();
  480. asio::detail::non_const_lvalue<Handler> handler2(handler);
  481. channel_handler<payload_type, decay_t<Handler>>(
  482. static_cast<payload_type&&>(payload), handler2.value)();
  483. return true;
  484. }
  485. case closed:
  486. default:
  487. {
  488. return false;
  489. }
  490. }
  491. }
  492. template <typename Mutex>
  493. template <typename Traits, typename... Signatures>
  494. void channel_service<Mutex>::start_receive_op(
  495. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
  496. channel_receive<typename implementation_type<
  497. Traits, Signatures...>::payload_type>* receive_op)
  498. {
  499. typedef typename implementation_type<Traits,
  500. Signatures...>::traits_type traits_type;
  501. typedef typename implementation_type<Traits,
  502. Signatures...>::payload_type payload_type;
  503. typename Mutex::scoped_lock lock(impl.mutex_);
  504. switch (impl.receive_state_)
  505. {
  506. case block:
  507. {
  508. impl.waiters_.push(receive_op);
  509. if (impl.send_state_ != closed)
  510. impl.send_state_ = waiter;
  511. return;
  512. }
  513. case buffer:
  514. {
  515. payload_type payload(
  516. static_cast<payload_type&&>(impl.buffer_front()));
  517. if (channel_send<payload_type>* send_op =
  518. static_cast<channel_send<payload_type>*>(impl.waiters_.front()))
  519. {
  520. impl.buffer_pop();
  521. impl.buffer_push(send_op->get_payload());
  522. impl.waiters_.pop();
  523. send_op->post();
  524. }
  525. else
  526. {
  527. impl.buffer_pop();
  528. if (impl.buffer_size() == 0)
  529. impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
  530. impl.send_state_ = (impl.send_state_ == closed) ? closed : buffer;
  531. }
  532. receive_op->immediate(static_cast<payload_type&&>(payload));
  533. break;
  534. }
  535. case waiter:
  536. {
  537. channel_send<payload_type>* send_op =
  538. static_cast<channel_send<payload_type>*>(impl.waiters_.front());
  539. payload_type payload = send_op->get_payload();
  540. impl.waiters_.pop();
  541. if (impl.waiters_.front() == 0)
  542. impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
  543. send_op->post();
  544. receive_op->immediate(static_cast<payload_type&&>(payload));
  545. break;
  546. }
  547. case closed:
  548. default:
  549. {
  550. traits_type::invoke_receive_closed(
  551. post_receive<payload_type,
  552. typename traits_type::receive_closed_signature>(receive_op));
  553. break;
  554. }
  555. }
  556. }
  557. } // namespace detail
  558. } // namespace experimental
  559. } // namespace asio
  560. #include "asio/detail/pop_options.hpp"
  561. #endif // ASIO_EXPERIMENTAL_DETAIL_IMPL_CHANNEL_SERVICE_HPP