channel_service.hpp 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625
  1. //
  2. // experimental/detail/impl/channel_service.hpp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2024 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef BOOST_ASIO_EXPERIMENTAL_DETAIL_IMPL_CHANNEL_SERVICE_HPP
  11. #define BOOST_ASIO_EXPERIMENTAL_DETAIL_IMPL_CHANNEL_SERVICE_HPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <boost/asio/detail/push_options.hpp>
  16. namespace boost {
  17. namespace asio {
  18. namespace experimental {
  19. namespace detail {
  20. template <typename Mutex>
  21. inline channel_service<Mutex>::channel_service(
  22. boost::asio::execution_context& ctx)
  23. : boost::asio::detail::execution_context_service_base<channel_service>(ctx),
  24. mutex_(),
  25. impl_list_(0)
  26. {
  27. }
  28. template <typename Mutex>
  29. inline void channel_service<Mutex>::shutdown()
  30. {
  31. // Abandon all pending operations.
  32. boost::asio::detail::op_queue<channel_operation> ops;
  33. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  34. base_implementation_type* impl = impl_list_;
  35. while (impl)
  36. {
  37. ops.push(impl->waiters_);
  38. impl = impl->next_;
  39. }
  40. }
  41. template <typename Mutex>
  42. inline void channel_service<Mutex>::construct(
  43. channel_service<Mutex>::base_implementation_type& impl,
  44. std::size_t max_buffer_size)
  45. {
  46. impl.max_buffer_size_ = max_buffer_size;
  47. impl.receive_state_ = block;
  48. impl.send_state_ = max_buffer_size ? buffer : block;
  49. // Insert implementation into linked list of all implementations.
  50. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  51. impl.next_ = impl_list_;
  52. impl.prev_ = 0;
  53. if (impl_list_)
  54. impl_list_->prev_ = &impl;
  55. impl_list_ = &impl;
  56. }
  57. template <typename Mutex>
  58. template <typename Traits, typename... Signatures>
  59. void channel_service<Mutex>::destroy(
  60. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
  61. {
  62. cancel(impl);
  63. base_destroy(impl);
  64. }
  65. template <typename Mutex>
  66. template <typename Traits, typename... Signatures>
  67. void channel_service<Mutex>::move_construct(
  68. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
  69. channel_service<Mutex>::implementation_type<
  70. Traits, Signatures...>& other_impl)
  71. {
  72. impl.max_buffer_size_ = other_impl.max_buffer_size_;
  73. impl.receive_state_ = other_impl.receive_state_;
  74. other_impl.receive_state_ = block;
  75. impl.send_state_ = other_impl.send_state_;
  76. other_impl.send_state_ = other_impl.max_buffer_size_ ? buffer : block;
  77. impl.buffer_move_from(other_impl);
  78. // Insert implementation into linked list of all implementations.
  79. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  80. impl.next_ = impl_list_;
  81. impl.prev_ = 0;
  82. if (impl_list_)
  83. impl_list_->prev_ = &impl;
  84. impl_list_ = &impl;
  85. }
  86. template <typename Mutex>
  87. template <typename Traits, typename... Signatures>
  88. void channel_service<Mutex>::move_assign(
  89. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
  90. channel_service& other_service,
  91. channel_service<Mutex>::implementation_type<
  92. Traits, Signatures...>& other_impl)
  93. {
  94. cancel(impl);
  95. if (this != &other_service)
  96. {
  97. // Remove implementation from linked list of all implementations.
  98. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  99. if (impl_list_ == &impl)
  100. impl_list_ = impl.next_;
  101. if (impl.prev_)
  102. impl.prev_->next_ = impl.next_;
  103. if (impl.next_)
  104. impl.next_->prev_= impl.prev_;
  105. impl.next_ = 0;
  106. impl.prev_ = 0;
  107. }
  108. impl.max_buffer_size_ = other_impl.max_buffer_size_;
  109. impl.receive_state_ = other_impl.receive_state_;
  110. other_impl.receive_state_ = block;
  111. impl.send_state_ = other_impl.send_state_;
  112. other_impl.send_state_ = other_impl.max_buffer_size_ ? buffer : block;
  113. impl.buffer_move_from(other_impl);
  114. if (this != &other_service)
  115. {
  116. // Insert implementation into linked list of all implementations.
  117. boost::asio::detail::mutex::scoped_lock lock(other_service.mutex_);
  118. impl.next_ = other_service.impl_list_;
  119. impl.prev_ = 0;
  120. if (other_service.impl_list_)
  121. other_service.impl_list_->prev_ = &impl;
  122. other_service.impl_list_ = &impl;
  123. }
  124. }
  125. template <typename Mutex>
  126. inline void channel_service<Mutex>::base_destroy(
  127. channel_service<Mutex>::base_implementation_type& impl)
  128. {
  129. // Remove implementation from linked list of all implementations.
  130. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  131. if (impl_list_ == &impl)
  132. impl_list_ = impl.next_;
  133. if (impl.prev_)
  134. impl.prev_->next_ = impl.next_;
  135. if (impl.next_)
  136. impl.next_->prev_= impl.prev_;
  137. impl.next_ = 0;
  138. impl.prev_ = 0;
  139. }
  140. template <typename Mutex>
  141. inline std::size_t channel_service<Mutex>::capacity(
  142. const channel_service<Mutex>::base_implementation_type& impl)
  143. const noexcept
  144. {
  145. typename Mutex::scoped_lock lock(impl.mutex_);
  146. return impl.max_buffer_size_;
  147. }
  148. template <typename Mutex>
  149. inline bool channel_service<Mutex>::is_open(
  150. const channel_service<Mutex>::base_implementation_type& impl)
  151. const noexcept
  152. {
  153. typename Mutex::scoped_lock lock(impl.mutex_);
  154. return impl.send_state_ != closed;
  155. }
  156. template <typename Mutex>
  157. template <typename Traits, typename... Signatures>
  158. void channel_service<Mutex>::reset(
  159. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
  160. {
  161. cancel(impl);
  162. typename Mutex::scoped_lock lock(impl.mutex_);
  163. impl.receive_state_ = block;
  164. impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
  165. impl.buffer_clear();
  166. }
  167. template <typename Mutex>
  168. template <typename Traits, typename... Signatures>
  169. void channel_service<Mutex>::close(
  170. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
  171. {
  172. typedef typename implementation_type<Traits,
  173. Signatures...>::traits_type traits_type;
  174. typedef typename implementation_type<Traits,
  175. Signatures...>::payload_type payload_type;
  176. typename Mutex::scoped_lock lock(impl.mutex_);
  177. if (impl.receive_state_ == block)
  178. {
  179. while (channel_operation* op = impl.waiters_.front())
  180. {
  181. impl.waiters_.pop();
  182. traits_type::invoke_receive_closed(
  183. post_receive<payload_type,
  184. typename traits_type::receive_closed_signature>(
  185. static_cast<channel_receive<payload_type>*>(op)));
  186. }
  187. }
  188. impl.send_state_ = closed;
  189. if (impl.receive_state_ != buffer)
  190. impl.receive_state_ = closed;
  191. }
  192. template <typename Mutex>
  193. template <typename Traits, typename... Signatures>
  194. void channel_service<Mutex>::cancel(
  195. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
  196. {
  197. typedef typename implementation_type<Traits,
  198. Signatures...>::traits_type traits_type;
  199. typedef typename implementation_type<Traits,
  200. Signatures...>::payload_type payload_type;
  201. typename Mutex::scoped_lock lock(impl.mutex_);
  202. while (channel_operation* op = impl.waiters_.front())
  203. {
  204. if (impl.send_state_ == block)
  205. {
  206. impl.waiters_.pop();
  207. static_cast<channel_send<payload_type>*>(op)->cancel();
  208. }
  209. else
  210. {
  211. impl.waiters_.pop();
  212. traits_type::invoke_receive_cancelled(
  213. post_receive<payload_type,
  214. typename traits_type::receive_cancelled_signature>(
  215. static_cast<channel_receive<payload_type>*>(op)));
  216. }
  217. }
  218. if (impl.receive_state_ == waiter)
  219. impl.receive_state_ = block;
  220. if (impl.send_state_ == waiter)
  221. impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
  222. }
  223. template <typename Mutex>
  224. template <typename Traits, typename... Signatures>
  225. void channel_service<Mutex>::cancel_by_key(
  226. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
  227. void* cancellation_key)
  228. {
  229. typedef typename implementation_type<Traits,
  230. Signatures...>::traits_type traits_type;
  231. typedef typename implementation_type<Traits,
  232. Signatures...>::payload_type payload_type;
  233. typename Mutex::scoped_lock lock(impl.mutex_);
  234. boost::asio::detail::op_queue<channel_operation> other_ops;
  235. while (channel_operation* op = impl.waiters_.front())
  236. {
  237. if (op->cancellation_key_ == cancellation_key)
  238. {
  239. if (impl.send_state_ == block)
  240. {
  241. impl.waiters_.pop();
  242. static_cast<channel_send<payload_type>*>(op)->cancel();
  243. }
  244. else
  245. {
  246. impl.waiters_.pop();
  247. traits_type::invoke_receive_cancelled(
  248. post_receive<payload_type,
  249. typename traits_type::receive_cancelled_signature>(
  250. static_cast<channel_receive<payload_type>*>(op)));
  251. }
  252. }
  253. else
  254. {
  255. impl.waiters_.pop();
  256. other_ops.push(op);
  257. }
  258. }
  259. impl.waiters_.push(other_ops);
  260. if (impl.waiters_.empty())
  261. {
  262. if (impl.receive_state_ == waiter)
  263. impl.receive_state_ = block;
  264. if (impl.send_state_ == waiter)
  265. impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
  266. }
  267. }
  268. template <typename Mutex>
  269. inline bool channel_service<Mutex>::ready(
  270. const channel_service<Mutex>::base_implementation_type& impl)
  271. const noexcept
  272. {
  273. typename Mutex::scoped_lock lock(impl.mutex_);
  274. return impl.receive_state_ != block;
  275. }
  276. template <typename Mutex>
  277. template <typename Message, typename Traits,
  278. typename... Signatures, typename... Args>
  279. bool channel_service<Mutex>::try_send(
  280. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
  281. bool via_dispatch, Args&&... args)
  282. {
  283. typedef typename implementation_type<Traits,
  284. Signatures...>::payload_type payload_type;
  285. typename Mutex::scoped_lock lock(impl.mutex_);
  286. switch (impl.send_state_)
  287. {
  288. case block:
  289. {
  290. return false;
  291. }
  292. case buffer:
  293. {
  294. impl.buffer_push(Message(0, static_cast<Args&&>(args)...));
  295. impl.receive_state_ = buffer;
  296. if (impl.buffer_size() == impl.max_buffer_size_)
  297. impl.send_state_ = block;
  298. return true;
  299. }
  300. case waiter:
  301. {
  302. payload_type payload(Message(0, static_cast<Args&&>(args)...));
  303. channel_receive<payload_type>* receive_op =
  304. static_cast<channel_receive<payload_type>*>(impl.waiters_.front());
  305. impl.waiters_.pop();
  306. if (impl.waiters_.empty())
  307. impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
  308. lock.unlock();
  309. if (via_dispatch)
  310. receive_op->dispatch(static_cast<payload_type&&>(payload));
  311. else
  312. receive_op->post(static_cast<payload_type&&>(payload));
  313. return true;
  314. }
  315. case closed:
  316. default:
  317. {
  318. return false;
  319. }
  320. }
  321. }
  322. template <typename Mutex>
  323. template <typename Message, typename Traits,
  324. typename... Signatures, typename... Args>
  325. std::size_t channel_service<Mutex>::try_send_n(
  326. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
  327. std::size_t count, bool via_dispatch, Args&&... args)
  328. {
  329. typedef typename implementation_type<Traits,
  330. Signatures...>::payload_type payload_type;
  331. typename Mutex::scoped_lock lock(impl.mutex_);
  332. if (count == 0)
  333. return 0;
  334. switch (impl.send_state_)
  335. {
  336. case block:
  337. return 0;
  338. case buffer:
  339. case waiter:
  340. break;
  341. case closed:
  342. default:
  343. return 0;
  344. }
  345. payload_type payload(Message(0, static_cast<Args&&>(args)...));
  346. for (std::size_t i = 0; i < count; ++i)
  347. {
  348. switch (impl.send_state_)
  349. {
  350. case block:
  351. {
  352. return i;
  353. }
  354. case buffer:
  355. {
  356. i += impl.buffer_push_n(count - i,
  357. static_cast<payload_type&&>(payload));
  358. impl.receive_state_ = buffer;
  359. if (impl.buffer_size() == impl.max_buffer_size_)
  360. impl.send_state_ = block;
  361. return i;
  362. }
  363. case waiter:
  364. {
  365. channel_receive<payload_type>* receive_op =
  366. static_cast<channel_receive<payload_type>*>(impl.waiters_.front());
  367. impl.waiters_.pop();
  368. if (impl.waiters_.empty())
  369. impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
  370. lock.unlock();
  371. if (via_dispatch)
  372. receive_op->dispatch(payload);
  373. else
  374. receive_op->post(payload);
  375. break;
  376. }
  377. case closed:
  378. default:
  379. {
  380. return i;
  381. }
  382. }
  383. }
  384. return count;
  385. }
  386. template <typename Mutex>
  387. template <typename Traits, typename... Signatures>
  388. void channel_service<Mutex>::start_send_op(
  389. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
  390. channel_send<typename implementation_type<
  391. Traits, Signatures...>::payload_type>* send_op)
  392. {
  393. typedef typename implementation_type<Traits,
  394. Signatures...>::payload_type payload_type;
  395. typename Mutex::scoped_lock lock(impl.mutex_);
  396. switch (impl.send_state_)
  397. {
  398. case block:
  399. {
  400. impl.waiters_.push(send_op);
  401. if (impl.receive_state_ == block)
  402. impl.receive_state_ = waiter;
  403. return;
  404. }
  405. case buffer:
  406. {
  407. impl.buffer_push(send_op->get_payload());
  408. impl.receive_state_ = buffer;
  409. if (impl.buffer_size() == impl.max_buffer_size_)
  410. impl.send_state_ = block;
  411. send_op->immediate();
  412. break;
  413. }
  414. case waiter:
  415. {
  416. channel_receive<payload_type>* receive_op =
  417. static_cast<channel_receive<payload_type>*>(impl.waiters_.front());
  418. impl.waiters_.pop();
  419. if (impl.waiters_.empty())
  420. impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
  421. receive_op->post(send_op->get_payload());
  422. send_op->immediate();
  423. break;
  424. }
  425. case closed:
  426. default:
  427. {
  428. send_op->close();
  429. break;
  430. }
  431. }
  432. }
  433. template <typename Mutex>
  434. template <typename Traits, typename... Signatures, typename Handler>
  435. bool channel_service<Mutex>::try_receive(
  436. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
  437. Handler&& handler)
  438. {
  439. typedef typename implementation_type<Traits,
  440. Signatures...>::payload_type payload_type;
  441. typename Mutex::scoped_lock lock(impl.mutex_);
  442. switch (impl.receive_state_)
  443. {
  444. case block:
  445. {
  446. return false;
  447. }
  448. case buffer:
  449. {
  450. payload_type payload(impl.buffer_front());
  451. if (channel_send<payload_type>* send_op =
  452. static_cast<channel_send<payload_type>*>(impl.waiters_.front()))
  453. {
  454. impl.buffer_pop();
  455. impl.buffer_push(send_op->get_payload());
  456. impl.waiters_.pop();
  457. send_op->post();
  458. }
  459. else
  460. {
  461. impl.buffer_pop();
  462. if (impl.buffer_size() == 0)
  463. impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
  464. impl.send_state_ = (impl.send_state_ == closed) ? closed : buffer;
  465. }
  466. lock.unlock();
  467. boost::asio::detail::non_const_lvalue<Handler> handler2(handler);
  468. boost::asio::detail::completion_payload_handler<
  469. payload_type, decay_t<Handler>>(
  470. static_cast<payload_type&&>(payload), handler2.value)();
  471. return true;
  472. }
  473. case waiter:
  474. {
  475. channel_send<payload_type>* send_op =
  476. static_cast<channel_send<payload_type>*>(impl.waiters_.front());
  477. payload_type payload = send_op->get_payload();
  478. impl.waiters_.pop();
  479. if (impl.waiters_.front() == 0)
  480. impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
  481. send_op->post();
  482. lock.unlock();
  483. boost::asio::detail::non_const_lvalue<Handler> handler2(handler);
  484. boost::asio::detail::completion_payload_handler<
  485. payload_type, decay_t<Handler>>(
  486. static_cast<payload_type&&>(payload), handler2.value)();
  487. return true;
  488. }
  489. case closed:
  490. default:
  491. {
  492. return false;
  493. }
  494. }
  495. }
  496. template <typename Mutex>
  497. template <typename Traits, typename... Signatures>
  498. void channel_service<Mutex>::start_receive_op(
  499. channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
  500. channel_receive<typename implementation_type<
  501. Traits, Signatures...>::payload_type>* receive_op)
  502. {
  503. typedef typename implementation_type<Traits,
  504. Signatures...>::traits_type traits_type;
  505. typedef typename implementation_type<Traits,
  506. Signatures...>::payload_type payload_type;
  507. typename Mutex::scoped_lock lock(impl.mutex_);
  508. switch (impl.receive_state_)
  509. {
  510. case block:
  511. {
  512. impl.waiters_.push(receive_op);
  513. if (impl.send_state_ != closed)
  514. impl.send_state_ = waiter;
  515. return;
  516. }
  517. case buffer:
  518. {
  519. payload_type payload(
  520. static_cast<payload_type&&>(impl.buffer_front()));
  521. if (channel_send<payload_type>* send_op =
  522. static_cast<channel_send<payload_type>*>(impl.waiters_.front()))
  523. {
  524. impl.buffer_pop();
  525. impl.buffer_push(send_op->get_payload());
  526. impl.waiters_.pop();
  527. send_op->post();
  528. }
  529. else
  530. {
  531. impl.buffer_pop();
  532. if (impl.buffer_size() == 0)
  533. impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
  534. impl.send_state_ = (impl.send_state_ == closed) ? closed : buffer;
  535. }
  536. receive_op->immediate(static_cast<payload_type&&>(payload));
  537. break;
  538. }
  539. case waiter:
  540. {
  541. channel_send<payload_type>* send_op =
  542. static_cast<channel_send<payload_type>*>(impl.waiters_.front());
  543. payload_type payload = send_op->get_payload();
  544. impl.waiters_.pop();
  545. if (impl.waiters_.front() == 0)
  546. impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
  547. send_op->post();
  548. receive_op->immediate(static_cast<payload_type&&>(payload));
  549. break;
  550. }
  551. case closed:
  552. default:
  553. {
  554. traits_type::invoke_receive_closed(
  555. post_receive<payload_type,
  556. typename traits_type::receive_closed_signature>(receive_op));
  557. break;
  558. }
  559. }
  560. }
  561. } // namespace detail
  562. } // namespace experimental
  563. } // namespace asio
  564. } // namespace boost
  565. #include <boost/asio/detail/pop_options.hpp>
  566. #endif // BOOST_ASIO_EXPERIMENTAL_DETAIL_IMPL_CHANNEL_SERVICE_HPP