parallel_group.hpp 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787
  1. //
  2. // experimental/impl/parallel_group.hpp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef ASIO_IMPL_EXPERIMENTAL_PARALLEL_GROUP_HPP
  11. #define ASIO_IMPL_EXPERIMENTAL_PARALLEL_GROUP_HPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include "asio/detail/config.hpp"
  16. #include <atomic>
  17. #include <deque>
  18. #include <memory>
  19. #include <new>
  20. #include <tuple>
  21. #include "asio/associated_cancellation_slot.hpp"
  22. #include "asio/detail/recycling_allocator.hpp"
  23. #include "asio/detail/type_traits.hpp"
  24. #include "asio/dispatch.hpp"
  25. #include "asio/detail/push_options.hpp"
  26. namespace asio {
  27. namespace experimental {
  28. namespace detail {
  29. // Stores the result from an individual asynchronous operation.
  30. template <typename T, typename = void>
  31. struct parallel_group_op_result
  32. {
  33. public:
  34. parallel_group_op_result()
  35. : has_value_(false)
  36. {
  37. }
  38. parallel_group_op_result(parallel_group_op_result&& other)
  39. : has_value_(other.has_value_)
  40. {
  41. if (has_value_)
  42. new (&u_.value_) T(std::move(other.get()));
  43. }
  44. ~parallel_group_op_result()
  45. {
  46. if (has_value_)
  47. u_.value_.~T();
  48. }
  49. T& get() noexcept
  50. {
  51. return u_.value_;
  52. }
  53. template <typename... Args>
  54. void emplace(Args&&... args)
  55. {
  56. new (&u_.value_) T(std::forward<Args>(args)...);
  57. has_value_ = true;
  58. }
  59. private:
  60. union u
  61. {
  62. u() {}
  63. ~u() {}
  64. char c_;
  65. T value_;
  66. } u_;
  67. bool has_value_;
  68. };
  69. // Proxy completion handler for the group of parallel operatations. Unpacks and
  70. // concatenates the individual operations' results, and invokes the user's
  71. // completion handler.
  72. template <typename Handler, typename... Ops>
  73. struct parallel_group_completion_handler
  74. {
  75. typedef decay_t<
  76. prefer_result_t<
  77. associated_executor_t<Handler>,
  78. execution::outstanding_work_t::tracked_t
  79. >
  80. > executor_type;
  81. parallel_group_completion_handler(Handler&& h)
  82. : handler_(std::move(h)),
  83. executor_(
  84. asio::prefer(
  85. asio::get_associated_executor(handler_),
  86. execution::outstanding_work.tracked))
  87. {
  88. }
  89. executor_type get_executor() const noexcept
  90. {
  91. return executor_;
  92. }
  93. void operator()()
  94. {
  95. this->invoke(asio::detail::make_index_sequence<sizeof...(Ops)>());
  96. }
  97. template <std::size_t... I>
  98. void invoke(asio::detail::index_sequence<I...>)
  99. {
  100. this->invoke(std::tuple_cat(std::move(std::get<I>(args_).get())...));
  101. }
  102. template <typename... Args>
  103. void invoke(std::tuple<Args...>&& args)
  104. {
  105. this->invoke(std::move(args),
  106. asio::detail::index_sequence_for<Args...>());
  107. }
  108. template <typename... Args, std::size_t... I>
  109. void invoke(std::tuple<Args...>&& args,
  110. asio::detail::index_sequence<I...>)
  111. {
  112. std::move(handler_)(completion_order_, std::move(std::get<I>(args))...);
  113. }
  114. Handler handler_;
  115. executor_type executor_;
  116. std::array<std::size_t, sizeof...(Ops)> completion_order_{};
  117. std::tuple<
  118. parallel_group_op_result<
  119. typename parallel_op_signature_as_tuple<
  120. completion_signature_of_t<Ops>
  121. >::type
  122. >...
  123. > args_{};
  124. };
  125. // Shared state for the parallel group.
  126. template <typename Condition, typename Handler, typename... Ops>
  127. struct parallel_group_state
  128. {
  129. parallel_group_state(Condition&& c, Handler&& h)
  130. : cancellation_condition_(std::move(c)),
  131. handler_(std::move(h))
  132. {
  133. }
  134. // The number of operations that have completed so far. Used to determine the
  135. // order of completion.
  136. std::atomic<unsigned int> completed_{0};
  137. // The non-none cancellation type that resulted from a cancellation condition.
  138. // Stored here for use by the group's initiating function.
  139. std::atomic<cancellation_type_t> cancel_type_{cancellation_type::none};
  140. // The number of cancellations that have been requested, either on completion
  141. // of the operations within the group, or via the cancellation slot for the
  142. // group operation. Initially set to the number of operations to prevent
  143. // cancellation signals from being emitted until after all of the group's
  144. // operations' initiating functions have completed.
  145. std::atomic<unsigned int> cancellations_requested_{sizeof...(Ops)};
  146. // The number of operations that are yet to complete. Used to determine when
  147. // it is safe to invoke the user's completion handler.
  148. std::atomic<unsigned int> outstanding_{sizeof...(Ops)};
  149. // The cancellation signals for each operation in the group.
  150. asio::cancellation_signal cancellation_signals_[sizeof...(Ops)];
  151. // The cancellation condition is used to determine whether the results from an
  152. // individual operation warrant a cancellation request for the whole group.
  153. Condition cancellation_condition_;
  154. // The proxy handler to be invoked once all operations in the group complete.
  155. parallel_group_completion_handler<Handler, Ops...> handler_;
  156. };
  157. // Handler for an individual operation within the parallel group.
  158. template <std::size_t I, typename Condition, typename Handler, typename... Ops>
  159. struct parallel_group_op_handler
  160. {
  161. typedef asio::cancellation_slot cancellation_slot_type;
  162. parallel_group_op_handler(
  163. std::shared_ptr<parallel_group_state<Condition, Handler, Ops...>> state)
  164. : state_(std::move(state))
  165. {
  166. }
  167. cancellation_slot_type get_cancellation_slot() const noexcept
  168. {
  169. return state_->cancellation_signals_[I].slot();
  170. }
  171. template <typename... Args>
  172. void operator()(Args... args)
  173. {
  174. // Capture this operation into the completion order.
  175. state_->handler_.completion_order_[state_->completed_++] = I;
  176. // Determine whether the results of this operation require cancellation of
  177. // the whole group.
  178. cancellation_type_t cancel_type = state_->cancellation_condition_(args...);
  179. // Capture the result of the operation into the proxy completion handler.
  180. std::get<I>(state_->handler_.args_).emplace(std::move(args)...);
  181. if (cancel_type != cancellation_type::none)
  182. {
  183. // Save the type for potential use by the group's initiating function.
  184. state_->cancel_type_ = cancel_type;
  185. // If we are the first operation to request cancellation, emit a signal
  186. // for each operation in the group.
  187. if (state_->cancellations_requested_++ == 0)
  188. for (std::size_t i = 0; i < sizeof...(Ops); ++i)
  189. if (i != I)
  190. state_->cancellation_signals_[i].emit(cancel_type);
  191. }
  192. // If this is the last outstanding operation, invoke the user's handler.
  193. if (--state_->outstanding_ == 0)
  194. asio::dispatch(std::move(state_->handler_));
  195. }
  196. std::shared_ptr<parallel_group_state<Condition, Handler, Ops...>> state_;
  197. };
  198. // Handler for an individual operation within the parallel group that has an
  199. // explicitly specified executor.
  200. template <typename Executor, std::size_t I,
  201. typename Condition, typename Handler, typename... Ops>
  202. struct parallel_group_op_handler_with_executor :
  203. parallel_group_op_handler<I, Condition, Handler, Ops...>
  204. {
  205. typedef parallel_group_op_handler<I, Condition, Handler, Ops...> base_type;
  206. typedef asio::cancellation_slot cancellation_slot_type;
  207. typedef Executor executor_type;
  208. parallel_group_op_handler_with_executor(
  209. std::shared_ptr<parallel_group_state<Condition, Handler, Ops...>> state,
  210. executor_type ex)
  211. : parallel_group_op_handler<I, Condition, Handler, Ops...>(std::move(state))
  212. {
  213. cancel_proxy_ =
  214. &this->state_->cancellation_signals_[I].slot().template
  215. emplace<cancel_proxy>(this->state_, std::move(ex));
  216. }
  217. cancellation_slot_type get_cancellation_slot() const noexcept
  218. {
  219. return cancel_proxy_->signal_.slot();
  220. }
  221. executor_type get_executor() const noexcept
  222. {
  223. return cancel_proxy_->executor_;
  224. }
  225. // Proxy handler that forwards the emitted signal to the correct executor.
  226. struct cancel_proxy
  227. {
  228. cancel_proxy(
  229. std::shared_ptr<parallel_group_state<
  230. Condition, Handler, Ops...>> state,
  231. executor_type ex)
  232. : state_(std::move(state)),
  233. executor_(std::move(ex))
  234. {
  235. }
  236. void operator()(cancellation_type_t type)
  237. {
  238. if (auto state = state_.lock())
  239. {
  240. asio::cancellation_signal* sig = &signal_;
  241. asio::dispatch(executor_,
  242. [state, sig, type]{ sig->emit(type); });
  243. }
  244. }
  245. std::weak_ptr<parallel_group_state<Condition, Handler, Ops...>> state_;
  246. asio::cancellation_signal signal_;
  247. executor_type executor_;
  248. };
  249. cancel_proxy* cancel_proxy_;
  250. };
  251. // Helper to launch an operation using the correct executor, if any.
  252. template <std::size_t I, typename Op, typename = void>
  253. struct parallel_group_op_launcher
  254. {
  255. template <typename Condition, typename Handler, typename... Ops>
  256. static void launch(Op& op,
  257. const std::shared_ptr<parallel_group_state<
  258. Condition, Handler, Ops...>>& state)
  259. {
  260. typedef associated_executor_t<Op> ex_type;
  261. ex_type ex = asio::get_associated_executor(op);
  262. std::move(op)(
  263. parallel_group_op_handler_with_executor<ex_type, I,
  264. Condition, Handler, Ops...>(state, std::move(ex)));
  265. }
  266. };
  267. // Specialised launcher for operations that specify no executor.
  268. template <std::size_t I, typename Op>
  269. struct parallel_group_op_launcher<I, Op,
  270. enable_if_t<
  271. is_same<
  272. typename associated_executor<
  273. Op>::asio_associated_executor_is_unspecialised,
  274. void
  275. >::value
  276. >>
  277. {
  278. template <typename Condition, typename Handler, typename... Ops>
  279. static void launch(Op& op,
  280. const std::shared_ptr<parallel_group_state<
  281. Condition, Handler, Ops...>>& state)
  282. {
  283. std::move(op)(
  284. parallel_group_op_handler<I, Condition, Handler, Ops...>(state));
  285. }
  286. };
  287. template <typename Condition, typename Handler, typename... Ops>
  288. struct parallel_group_cancellation_handler
  289. {
  290. parallel_group_cancellation_handler(
  291. std::shared_ptr<parallel_group_state<Condition, Handler, Ops...>> state)
  292. : state_(std::move(state))
  293. {
  294. }
  295. void operator()(cancellation_type_t cancel_type)
  296. {
  297. // If we are the first place to request cancellation, i.e. no operation has
  298. // yet completed and requested cancellation, emit a signal for each
  299. // operation in the group.
  300. if (cancel_type != cancellation_type::none)
  301. if (auto state = state_.lock())
  302. if (state->cancellations_requested_++ == 0)
  303. for (std::size_t i = 0; i < sizeof...(Ops); ++i)
  304. state->cancellation_signals_[i].emit(cancel_type);
  305. }
  306. std::weak_ptr<parallel_group_state<Condition, Handler, Ops...>> state_;
  307. };
  308. template <typename Condition, typename Handler,
  309. typename... Ops, std::size_t... I>
  310. void parallel_group_launch(Condition cancellation_condition, Handler handler,
  311. std::tuple<Ops...>& ops, asio::detail::index_sequence<I...>)
  312. {
  313. // Get the user's completion handler's cancellation slot, so that we can allow
  314. // cancellation of the entire group.
  315. associated_cancellation_slot_t<Handler> slot
  316. = asio::get_associated_cancellation_slot(handler);
  317. // Create the shared state for the operation.
  318. typedef parallel_group_state<Condition, Handler, Ops...> state_type;
  319. std::shared_ptr<state_type> state = std::allocate_shared<state_type>(
  320. asio::detail::recycling_allocator<state_type,
  321. asio::detail::thread_info_base::parallel_group_tag>(),
  322. std::move(cancellation_condition), std::move(handler));
  323. // Initiate each individual operation in the group.
  324. int fold[] = { 0,
  325. ( parallel_group_op_launcher<I, Ops>::launch(std::get<I>(ops), state),
  326. 0 )...
  327. };
  328. (void)fold;
  329. // Check if any of the operations has already requested cancellation, and if
  330. // so, emit a signal for each operation in the group.
  331. if ((state->cancellations_requested_ -= sizeof...(Ops)) > 0)
  332. for (auto& signal : state->cancellation_signals_)
  333. signal.emit(state->cancel_type_);
  334. // Register a handler with the user's completion handler's cancellation slot.
  335. if (slot.is_connected())
  336. slot.template emplace<
  337. parallel_group_cancellation_handler<
  338. Condition, Handler, Ops...>>(state);
  339. }
  340. // Proxy completion handler for the ranged group of parallel operatations.
  341. // Unpacks and recombines the individual operations' results, and invokes the
  342. // user's completion handler.
  343. template <typename Handler, typename Op, typename Allocator>
  344. struct ranged_parallel_group_completion_handler
  345. {
  346. typedef decay_t<
  347. prefer_result_t<
  348. associated_executor_t<Handler>,
  349. execution::outstanding_work_t::tracked_t
  350. >
  351. > executor_type;
  352. typedef typename parallel_op_signature_as_tuple<
  353. completion_signature_of_t<Op>
  354. >::type op_tuple_type;
  355. typedef parallel_group_op_result<op_tuple_type> op_result_type;
  356. ranged_parallel_group_completion_handler(Handler&& h,
  357. std::size_t size, const Allocator& allocator)
  358. : handler_(std::move(h)),
  359. executor_(
  360. asio::prefer(
  361. asio::get_associated_executor(handler_),
  362. execution::outstanding_work.tracked)),
  363. allocator_(allocator),
  364. completion_order_(size, 0,
  365. ASIO_REBIND_ALLOC(Allocator, std::size_t)(allocator)),
  366. args_(ASIO_REBIND_ALLOC(Allocator, op_result_type)(allocator))
  367. {
  368. for (std::size_t i = 0; i < size; ++i)
  369. args_.emplace_back();
  370. }
  371. executor_type get_executor() const noexcept
  372. {
  373. return executor_;
  374. }
  375. void operator()()
  376. {
  377. this->invoke(
  378. asio::detail::make_index_sequence<
  379. std::tuple_size<op_tuple_type>::value>());
  380. }
  381. template <std::size_t... I>
  382. void invoke(asio::detail::index_sequence<I...>)
  383. {
  384. typedef typename parallel_op_signature_as_tuple<
  385. typename ranged_parallel_group_signature<
  386. completion_signature_of_t<Op>,
  387. Allocator
  388. >::raw_type
  389. >::type vectors_type;
  390. // Construct all result vectors using the supplied allocator.
  391. vectors_type vectors{
  392. typename std::tuple_element<I, vectors_type>::type(
  393. ASIO_REBIND_ALLOC(Allocator, int)(allocator_))...};
  394. // Reserve sufficient space in each of the result vectors.
  395. int reserve_fold[] = { 0,
  396. ( std::get<I>(vectors).reserve(completion_order_.size()),
  397. 0 )...
  398. };
  399. (void)reserve_fold;
  400. // Copy the results from all operations into the result vectors.
  401. for (std::size_t idx = 0; idx < completion_order_.size(); ++idx)
  402. {
  403. int pushback_fold[] = { 0,
  404. ( std::get<I>(vectors).push_back(
  405. std::move(std::get<I>(args_[idx].get()))),
  406. 0 )...
  407. };
  408. (void)pushback_fold;
  409. }
  410. std::move(handler_)(completion_order_, std::move(std::get<I>(vectors))...);
  411. }
  412. Handler handler_;
  413. executor_type executor_;
  414. Allocator allocator_;
  415. std::vector<std::size_t,
  416. ASIO_REBIND_ALLOC(Allocator, std::size_t)> completion_order_;
  417. std::deque<op_result_type,
  418. ASIO_REBIND_ALLOC(Allocator, op_result_type)> args_;
  419. };
  420. // Shared state for the parallel group.
  421. template <typename Condition, typename Handler, typename Op, typename Allocator>
  422. struct ranged_parallel_group_state
  423. {
  424. ranged_parallel_group_state(Condition&& c, Handler&& h,
  425. std::size_t size, const Allocator& allocator)
  426. : cancellations_requested_(size),
  427. outstanding_(size),
  428. cancellation_signals_(
  429. ASIO_REBIND_ALLOC(Allocator,
  430. asio::cancellation_signal)(allocator)),
  431. cancellation_condition_(std::move(c)),
  432. handler_(std::move(h), size, allocator)
  433. {
  434. for (std::size_t i = 0; i < size; ++i)
  435. cancellation_signals_.emplace_back();
  436. }
  437. // The number of operations that have completed so far. Used to determine the
  438. // order of completion.
  439. std::atomic<unsigned int> completed_{0};
  440. // The non-none cancellation type that resulted from a cancellation condition.
  441. // Stored here for use by the group's initiating function.
  442. std::atomic<cancellation_type_t> cancel_type_{cancellation_type::none};
  443. // The number of cancellations that have been requested, either on completion
  444. // of the operations within the group, or via the cancellation slot for the
  445. // group operation. Initially set to the number of operations to prevent
  446. // cancellation signals from being emitted until after all of the group's
  447. // operations' initiating functions have completed.
  448. std::atomic<unsigned int> cancellations_requested_;
  449. // The number of operations that are yet to complete. Used to determine when
  450. // it is safe to invoke the user's completion handler.
  451. std::atomic<unsigned int> outstanding_;
  452. // The cancellation signals for each operation in the group.
  453. std::deque<asio::cancellation_signal,
  454. ASIO_REBIND_ALLOC(Allocator, asio::cancellation_signal)>
  455. cancellation_signals_;
  456. // The cancellation condition is used to determine whether the results from an
  457. // individual operation warrant a cancellation request for the whole group.
  458. Condition cancellation_condition_;
  459. // The proxy handler to be invoked once all operations in the group complete.
  460. ranged_parallel_group_completion_handler<Handler, Op, Allocator> handler_;
  461. };
  462. // Handler for an individual operation within the parallel group.
  463. template <typename Condition, typename Handler, typename Op, typename Allocator>
  464. struct ranged_parallel_group_op_handler
  465. {
  466. typedef asio::cancellation_slot cancellation_slot_type;
  467. ranged_parallel_group_op_handler(
  468. std::shared_ptr<ranged_parallel_group_state<
  469. Condition, Handler, Op, Allocator>> state,
  470. std::size_t idx)
  471. : state_(std::move(state)),
  472. idx_(idx)
  473. {
  474. }
  475. cancellation_slot_type get_cancellation_slot() const noexcept
  476. {
  477. return state_->cancellation_signals_[idx_].slot();
  478. }
  479. template <typename... Args>
  480. void operator()(Args... args)
  481. {
  482. // Capture this operation into the completion order.
  483. state_->handler_.completion_order_[state_->completed_++] = idx_;
  484. // Determine whether the results of this operation require cancellation of
  485. // the whole group.
  486. cancellation_type_t cancel_type = state_->cancellation_condition_(args...);
  487. // Capture the result of the operation into the proxy completion handler.
  488. state_->handler_.args_[idx_].emplace(std::move(args)...);
  489. if (cancel_type != cancellation_type::none)
  490. {
  491. // Save the type for potential use by the group's initiating function.
  492. state_->cancel_type_ = cancel_type;
  493. // If we are the first operation to request cancellation, emit a signal
  494. // for each operation in the group.
  495. if (state_->cancellations_requested_++ == 0)
  496. for (std::size_t i = 0; i < state_->cancellation_signals_.size(); ++i)
  497. if (i != idx_)
  498. state_->cancellation_signals_[i].emit(cancel_type);
  499. }
  500. // If this is the last outstanding operation, invoke the user's handler.
  501. if (--state_->outstanding_ == 0)
  502. asio::dispatch(std::move(state_->handler_));
  503. }
  504. std::shared_ptr<ranged_parallel_group_state<
  505. Condition, Handler, Op, Allocator>> state_;
  506. std::size_t idx_;
  507. };
  508. // Handler for an individual operation within the parallel group that has an
  509. // explicitly specified executor.
  510. template <typename Executor, typename Condition,
  511. typename Handler, typename Op, typename Allocator>
  512. struct ranged_parallel_group_op_handler_with_executor :
  513. ranged_parallel_group_op_handler<Condition, Handler, Op, Allocator>
  514. {
  515. typedef ranged_parallel_group_op_handler<
  516. Condition, Handler, Op, Allocator> base_type;
  517. typedef asio::cancellation_slot cancellation_slot_type;
  518. typedef Executor executor_type;
  519. ranged_parallel_group_op_handler_with_executor(
  520. std::shared_ptr<ranged_parallel_group_state<
  521. Condition, Handler, Op, Allocator>> state,
  522. executor_type ex, std::size_t idx)
  523. : ranged_parallel_group_op_handler<Condition, Handler, Op, Allocator>(
  524. std::move(state), idx)
  525. {
  526. cancel_proxy_ =
  527. &this->state_->cancellation_signals_[idx].slot().template
  528. emplace<cancel_proxy>(this->state_, std::move(ex));
  529. }
  530. cancellation_slot_type get_cancellation_slot() const noexcept
  531. {
  532. return cancel_proxy_->signal_.slot();
  533. }
  534. executor_type get_executor() const noexcept
  535. {
  536. return cancel_proxy_->executor_;
  537. }
  538. // Proxy handler that forwards the emitted signal to the correct executor.
  539. struct cancel_proxy
  540. {
  541. cancel_proxy(
  542. std::shared_ptr<ranged_parallel_group_state<
  543. Condition, Handler, Op, Allocator>> state,
  544. executor_type ex)
  545. : state_(std::move(state)),
  546. executor_(std::move(ex))
  547. {
  548. }
  549. void operator()(cancellation_type_t type)
  550. {
  551. if (auto state = state_.lock())
  552. {
  553. asio::cancellation_signal* sig = &signal_;
  554. asio::dispatch(executor_,
  555. [state, sig, type]{ sig->emit(type); });
  556. }
  557. }
  558. std::weak_ptr<ranged_parallel_group_state<
  559. Condition, Handler, Op, Allocator>> state_;
  560. asio::cancellation_signal signal_;
  561. executor_type executor_;
  562. };
  563. cancel_proxy* cancel_proxy_;
  564. };
  565. template <typename Condition, typename Handler, typename Op, typename Allocator>
  566. struct ranged_parallel_group_cancellation_handler
  567. {
  568. ranged_parallel_group_cancellation_handler(
  569. std::shared_ptr<ranged_parallel_group_state<
  570. Condition, Handler, Op, Allocator>> state)
  571. : state_(std::move(state))
  572. {
  573. }
  574. void operator()(cancellation_type_t cancel_type)
  575. {
  576. // If we are the first place to request cancellation, i.e. no operation has
  577. // yet completed and requested cancellation, emit a signal for each
  578. // operation in the group.
  579. if (cancel_type != cancellation_type::none)
  580. if (auto state = state_.lock())
  581. if (state->cancellations_requested_++ == 0)
  582. for (std::size_t i = 0; i < state->cancellation_signals_.size(); ++i)
  583. state->cancellation_signals_[i].emit(cancel_type);
  584. }
  585. std::weak_ptr<ranged_parallel_group_state<
  586. Condition, Handler, Op, Allocator>> state_;
  587. };
  588. template <typename Condition, typename Handler,
  589. typename Range, typename Allocator>
  590. void ranged_parallel_group_launch(Condition cancellation_condition,
  591. Handler handler, Range&& range, const Allocator& allocator)
  592. {
  593. // Get the user's completion handler's cancellation slot, so that we can allow
  594. // cancellation of the entire group.
  595. associated_cancellation_slot_t<Handler> slot
  596. = asio::get_associated_cancellation_slot(handler);
  597. // The type of the asynchronous operation.
  598. typedef decay_t<decltype(*declval<typename Range::iterator>())> op_type;
  599. // Create the shared state for the operation.
  600. typedef ranged_parallel_group_state<Condition,
  601. Handler, op_type, Allocator> state_type;
  602. std::shared_ptr<state_type> state = std::allocate_shared<state_type>(
  603. asio::detail::recycling_allocator<state_type,
  604. asio::detail::thread_info_base::parallel_group_tag>(),
  605. std::move(cancellation_condition),
  606. std::move(handler), range.size(), allocator);
  607. std::size_t idx = 0;
  608. for (auto&& op : std::forward<Range>(range))
  609. {
  610. typedef associated_executor_t<op_type> ex_type;
  611. ex_type ex = asio::get_associated_executor(op);
  612. std::move(op)(
  613. ranged_parallel_group_op_handler_with_executor<
  614. ex_type, Condition, Handler, op_type, Allocator>(
  615. state, std::move(ex), idx++));
  616. }
  617. // Check if any of the operations has already requested cancellation, and if
  618. // so, emit a signal for each operation in the group.
  619. if ((state->cancellations_requested_ -= range.size()) > 0)
  620. for (auto& signal : state->cancellation_signals_)
  621. signal.emit(state->cancel_type_);
  622. // Register a handler with the user's completion handler's cancellation slot.
  623. if (slot.is_connected())
  624. slot.template emplace<
  625. ranged_parallel_group_cancellation_handler<
  626. Condition, Handler, op_type, Allocator>>(state);
  627. }
  628. } // namespace detail
  629. } // namespace experimental
  630. template <template <typename, typename> class Associator,
  631. typename Handler, typename... Ops, typename DefaultCandidate>
  632. struct associator<Associator,
  633. experimental::detail::parallel_group_completion_handler<Handler, Ops...>,
  634. DefaultCandidate>
  635. : Associator<Handler, DefaultCandidate>
  636. {
  637. static typename Associator<Handler, DefaultCandidate>::type get(
  638. const experimental::detail::parallel_group_completion_handler<
  639. Handler, Ops...>& h) noexcept
  640. {
  641. return Associator<Handler, DefaultCandidate>::get(h.handler_);
  642. }
  643. static auto get(
  644. const experimental::detail::parallel_group_completion_handler<
  645. Handler, Ops...>& h,
  646. const DefaultCandidate& c) noexcept
  647. -> decltype(Associator<Handler, DefaultCandidate>::get(h.handler_, c))
  648. {
  649. return Associator<Handler, DefaultCandidate>::get(h.handler_, c);
  650. }
  651. };
  652. template <template <typename, typename> class Associator, typename Handler,
  653. typename Op, typename Allocator, typename DefaultCandidate>
  654. struct associator<Associator,
  655. experimental::detail::ranged_parallel_group_completion_handler<
  656. Handler, Op, Allocator>,
  657. DefaultCandidate>
  658. : Associator<Handler, DefaultCandidate>
  659. {
  660. static typename Associator<Handler, DefaultCandidate>::type get(
  661. const experimental::detail::ranged_parallel_group_completion_handler<
  662. Handler, Op, Allocator>& h) noexcept
  663. {
  664. return Associator<Handler, DefaultCandidate>::get(h.handler_);
  665. }
  666. static auto get(
  667. const experimental::detail::ranged_parallel_group_completion_handler<
  668. Handler, Op, Allocator>& h,
  669. const DefaultCandidate& c) noexcept
  670. -> decltype(Associator<Handler, DefaultCandidate>::get(h.handler_, c))
  671. {
  672. return Associator<Handler, DefaultCandidate>::get(h.handler_, c);
  673. }
  674. };
  675. } // namespace asio
  676. #include "asio/detail/pop_options.hpp"
  677. #endif // ASIO_IMPL_EXPERIMENTAL_PARALLEL_GROUP_HPP