parallel_group.hpp 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790
  1. //
  2. // experimental/impl/parallel_group.hpp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2024 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef BOOST_ASIO_IMPL_EXPERIMENTAL_PARALLEL_GROUP_HPP
  11. #define BOOST_ASIO_IMPL_EXPERIMENTAL_PARALLEL_GROUP_HPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <boost/asio/detail/config.hpp>
  16. #include <atomic>
  17. #include <deque>
  18. #include <memory>
  19. #include <new>
  20. #include <tuple>
  21. #include <boost/asio/associated_cancellation_slot.hpp>
  22. #include <boost/asio/detail/recycling_allocator.hpp>
  23. #include <boost/asio/detail/type_traits.hpp>
  24. #include <boost/asio/dispatch.hpp>
  25. #include <boost/asio/detail/push_options.hpp>
  26. namespace boost {
  27. namespace asio {
  28. namespace experimental {
  29. namespace detail {
  30. // Stores the result from an individual asynchronous operation.
  31. template <typename T, typename = void>
  32. struct parallel_group_op_result
  33. {
  34. public:
  35. parallel_group_op_result()
  36. : has_value_(false)
  37. {
  38. }
  39. parallel_group_op_result(parallel_group_op_result&& other)
  40. : has_value_(other.has_value_)
  41. {
  42. if (has_value_)
  43. new (&u_.value_) T(std::move(other.get()));
  44. }
  45. ~parallel_group_op_result()
  46. {
  47. if (has_value_)
  48. u_.value_.~T();
  49. }
  50. T& get() noexcept
  51. {
  52. return u_.value_;
  53. }
  54. template <typename... Args>
  55. void emplace(Args&&... args)
  56. {
  57. new (&u_.value_) T(std::forward<Args>(args)...);
  58. has_value_ = true;
  59. }
  60. private:
  61. union u
  62. {
  63. u() {}
  64. ~u() {}
  65. char c_;
  66. T value_;
  67. } u_;
  68. bool has_value_;
  69. };
  70. // Proxy completion handler for the group of parallel operatations. Unpacks and
  71. // concatenates the individual operations' results, and invokes the user's
  72. // completion handler.
  73. template <typename Handler, typename... Ops>
  74. struct parallel_group_completion_handler
  75. {
  76. typedef decay_t<
  77. prefer_result_t<
  78. associated_executor_t<Handler>,
  79. execution::outstanding_work_t::tracked_t
  80. >
  81. > executor_type;
  82. parallel_group_completion_handler(Handler&& h)
  83. : handler_(std::move(h)),
  84. executor_(
  85. boost::asio::prefer(
  86. boost::asio::get_associated_executor(handler_),
  87. execution::outstanding_work.tracked))
  88. {
  89. }
  90. executor_type get_executor() const noexcept
  91. {
  92. return executor_;
  93. }
  94. void operator()()
  95. {
  96. this->invoke(boost::asio::detail::make_index_sequence<sizeof...(Ops)>());
  97. }
  98. template <std::size_t... I>
  99. void invoke(boost::asio::detail::index_sequence<I...>)
  100. {
  101. this->invoke(std::tuple_cat(std::move(std::get<I>(args_).get())...));
  102. }
  103. template <typename... Args>
  104. void invoke(std::tuple<Args...>&& args)
  105. {
  106. this->invoke(std::move(args),
  107. boost::asio::detail::index_sequence_for<Args...>());
  108. }
  109. template <typename... Args, std::size_t... I>
  110. void invoke(std::tuple<Args...>&& args,
  111. boost::asio::detail::index_sequence<I...>)
  112. {
  113. std::move(handler_)(completion_order_, std::move(std::get<I>(args))...);
  114. }
  115. Handler handler_;
  116. executor_type executor_;
  117. std::array<std::size_t, sizeof...(Ops)> completion_order_{};
  118. std::tuple<
  119. parallel_group_op_result<
  120. typename parallel_op_signature_as_tuple<
  121. completion_signature_of_t<Ops>
  122. >::type
  123. >...
  124. > args_{};
  125. };
  126. // Shared state for the parallel group.
  127. template <typename Condition, typename Handler, typename... Ops>
  128. struct parallel_group_state
  129. {
  130. parallel_group_state(Condition&& c, Handler&& h)
  131. : cancellation_condition_(std::move(c)),
  132. handler_(std::move(h))
  133. {
  134. }
  135. // The number of operations that have completed so far. Used to determine the
  136. // order of completion.
  137. std::atomic<unsigned int> completed_{0};
  138. // The non-none cancellation type that resulted from a cancellation condition.
  139. // Stored here for use by the group's initiating function.
  140. std::atomic<cancellation_type_t> cancel_type_{cancellation_type::none};
  141. // The number of cancellations that have been requested, either on completion
  142. // of the operations within the group, or via the cancellation slot for the
  143. // group operation. Initially set to the number of operations to prevent
  144. // cancellation signals from being emitted until after all of the group's
  145. // operations' initiating functions have completed.
  146. std::atomic<unsigned int> cancellations_requested_{sizeof...(Ops)};
  147. // The number of operations that are yet to complete. Used to determine when
  148. // it is safe to invoke the user's completion handler.
  149. std::atomic<unsigned int> outstanding_{sizeof...(Ops)};
  150. // The cancellation signals for each operation in the group.
  151. boost::asio::cancellation_signal cancellation_signals_[sizeof...(Ops)];
  152. // The cancellation condition is used to determine whether the results from an
  153. // individual operation warrant a cancellation request for the whole group.
  154. Condition cancellation_condition_;
  155. // The proxy handler to be invoked once all operations in the group complete.
  156. parallel_group_completion_handler<Handler, Ops...> handler_;
  157. };
  158. // Handler for an individual operation within the parallel group.
  159. template <std::size_t I, typename Condition, typename Handler, typename... Ops>
  160. struct parallel_group_op_handler
  161. {
  162. typedef boost::asio::cancellation_slot cancellation_slot_type;
  163. parallel_group_op_handler(
  164. std::shared_ptr<parallel_group_state<Condition, Handler, Ops...>> state)
  165. : state_(std::move(state))
  166. {
  167. }
  168. cancellation_slot_type get_cancellation_slot() const noexcept
  169. {
  170. return state_->cancellation_signals_[I].slot();
  171. }
  172. template <typename... Args>
  173. void operator()(Args... args)
  174. {
  175. // Capture this operation into the completion order.
  176. state_->handler_.completion_order_[state_->completed_++] = I;
  177. // Determine whether the results of this operation require cancellation of
  178. // the whole group.
  179. cancellation_type_t cancel_type = state_->cancellation_condition_(args...);
  180. // Capture the result of the operation into the proxy completion handler.
  181. std::get<I>(state_->handler_.args_).emplace(std::move(args)...);
  182. if (cancel_type != cancellation_type::none)
  183. {
  184. // Save the type for potential use by the group's initiating function.
  185. state_->cancel_type_ = cancel_type;
  186. // If we are the first operation to request cancellation, emit a signal
  187. // for each operation in the group.
  188. if (state_->cancellations_requested_++ == 0)
  189. for (std::size_t i = 0; i < sizeof...(Ops); ++i)
  190. if (i != I)
  191. state_->cancellation_signals_[i].emit(cancel_type);
  192. }
  193. // If this is the last outstanding operation, invoke the user's handler.
  194. if (--state_->outstanding_ == 0)
  195. boost::asio::dispatch(std::move(state_->handler_));
  196. }
  197. std::shared_ptr<parallel_group_state<Condition, Handler, Ops...>> state_;
  198. };
  199. // Handler for an individual operation within the parallel group that has an
  200. // explicitly specified executor.
  201. template <typename Executor, std::size_t I,
  202. typename Condition, typename Handler, typename... Ops>
  203. struct parallel_group_op_handler_with_executor :
  204. parallel_group_op_handler<I, Condition, Handler, Ops...>
  205. {
  206. typedef parallel_group_op_handler<I, Condition, Handler, Ops...> base_type;
  207. typedef boost::asio::cancellation_slot cancellation_slot_type;
  208. typedef Executor executor_type;
  209. parallel_group_op_handler_with_executor(
  210. std::shared_ptr<parallel_group_state<Condition, Handler, Ops...>> state,
  211. executor_type ex)
  212. : parallel_group_op_handler<I, Condition, Handler, Ops...>(std::move(state))
  213. {
  214. cancel_proxy_ =
  215. &this->state_->cancellation_signals_[I].slot().template
  216. emplace<cancel_proxy>(this->state_, std::move(ex));
  217. }
  218. cancellation_slot_type get_cancellation_slot() const noexcept
  219. {
  220. return cancel_proxy_->signal_.slot();
  221. }
  222. executor_type get_executor() const noexcept
  223. {
  224. return cancel_proxy_->executor_;
  225. }
  226. // Proxy handler that forwards the emitted signal to the correct executor.
  227. struct cancel_proxy
  228. {
  229. cancel_proxy(
  230. std::shared_ptr<parallel_group_state<
  231. Condition, Handler, Ops...>> state,
  232. executor_type ex)
  233. : state_(std::move(state)),
  234. executor_(std::move(ex))
  235. {
  236. }
  237. void operator()(cancellation_type_t type)
  238. {
  239. if (auto state = state_.lock())
  240. {
  241. boost::asio::cancellation_signal* sig = &signal_;
  242. boost::asio::dispatch(executor_,
  243. [state, sig, type]{ sig->emit(type); });
  244. }
  245. }
  246. std::weak_ptr<parallel_group_state<Condition, Handler, Ops...>> state_;
  247. boost::asio::cancellation_signal signal_;
  248. executor_type executor_;
  249. };
  250. cancel_proxy* cancel_proxy_;
  251. };
  252. // Helper to launch an operation using the correct executor, if any.
  253. template <std::size_t I, typename Op, typename = void>
  254. struct parallel_group_op_launcher
  255. {
  256. template <typename Condition, typename Handler, typename... Ops>
  257. static void launch(Op& op,
  258. const std::shared_ptr<parallel_group_state<
  259. Condition, Handler, Ops...>>& state)
  260. {
  261. typedef associated_executor_t<Op> ex_type;
  262. ex_type ex = boost::asio::get_associated_executor(op);
  263. std::move(op)(
  264. parallel_group_op_handler_with_executor<ex_type, I,
  265. Condition, Handler, Ops...>(state, std::move(ex)));
  266. }
  267. };
  268. // Specialised launcher for operations that specify no executor.
  269. template <std::size_t I, typename Op>
  270. struct parallel_group_op_launcher<I, Op,
  271. enable_if_t<
  272. is_same<
  273. typename associated_executor<
  274. Op>::asio_associated_executor_is_unspecialised,
  275. void
  276. >::value
  277. >>
  278. {
  279. template <typename Condition, typename Handler, typename... Ops>
  280. static void launch(Op& op,
  281. const std::shared_ptr<parallel_group_state<
  282. Condition, Handler, Ops...>>& state)
  283. {
  284. std::move(op)(
  285. parallel_group_op_handler<I, Condition, Handler, Ops...>(state));
  286. }
  287. };
  288. template <typename Condition, typename Handler, typename... Ops>
  289. struct parallel_group_cancellation_handler
  290. {
  291. parallel_group_cancellation_handler(
  292. std::shared_ptr<parallel_group_state<Condition, Handler, Ops...>> state)
  293. : state_(std::move(state))
  294. {
  295. }
  296. void operator()(cancellation_type_t cancel_type)
  297. {
  298. // If we are the first place to request cancellation, i.e. no operation has
  299. // yet completed and requested cancellation, emit a signal for each
  300. // operation in the group.
  301. if (cancel_type != cancellation_type::none)
  302. if (auto state = state_.lock())
  303. if (state->cancellations_requested_++ == 0)
  304. for (std::size_t i = 0; i < sizeof...(Ops); ++i)
  305. state->cancellation_signals_[i].emit(cancel_type);
  306. }
  307. std::weak_ptr<parallel_group_state<Condition, Handler, Ops...>> state_;
  308. };
  309. template <typename Condition, typename Handler,
  310. typename... Ops, std::size_t... I>
  311. void parallel_group_launch(Condition cancellation_condition, Handler handler,
  312. std::tuple<Ops...>& ops, boost::asio::detail::index_sequence<I...>)
  313. {
  314. // Get the user's completion handler's cancellation slot, so that we can allow
  315. // cancellation of the entire group.
  316. associated_cancellation_slot_t<Handler> slot
  317. = boost::asio::get_associated_cancellation_slot(handler);
  318. // Create the shared state for the operation.
  319. typedef parallel_group_state<Condition, Handler, Ops...> state_type;
  320. std::shared_ptr<state_type> state = std::allocate_shared<state_type>(
  321. boost::asio::detail::recycling_allocator<state_type,
  322. boost::asio::detail::thread_info_base::parallel_group_tag>(),
  323. std::move(cancellation_condition), std::move(handler));
  324. // Initiate each individual operation in the group.
  325. int fold[] = { 0,
  326. ( parallel_group_op_launcher<I, Ops>::launch(std::get<I>(ops), state),
  327. 0 )...
  328. };
  329. (void)fold;
  330. // Check if any of the operations has already requested cancellation, and if
  331. // so, emit a signal for each operation in the group.
  332. if ((state->cancellations_requested_ -= sizeof...(Ops)) > 0)
  333. for (auto& signal : state->cancellation_signals_)
  334. signal.emit(state->cancel_type_);
  335. // Register a handler with the user's completion handler's cancellation slot.
  336. if (slot.is_connected())
  337. slot.template emplace<
  338. parallel_group_cancellation_handler<
  339. Condition, Handler, Ops...>>(state);
  340. }
  341. // Proxy completion handler for the ranged group of parallel operatations.
  342. // Unpacks and recombines the individual operations' results, and invokes the
  343. // user's completion handler.
  344. template <typename Handler, typename Op, typename Allocator>
  345. struct ranged_parallel_group_completion_handler
  346. {
  347. typedef decay_t<
  348. prefer_result_t<
  349. associated_executor_t<Handler>,
  350. execution::outstanding_work_t::tracked_t
  351. >
  352. > executor_type;
  353. typedef typename parallel_op_signature_as_tuple<
  354. completion_signature_of_t<Op>
  355. >::type op_tuple_type;
  356. typedef parallel_group_op_result<op_tuple_type> op_result_type;
  357. ranged_parallel_group_completion_handler(Handler&& h,
  358. std::size_t size, const Allocator& allocator)
  359. : handler_(std::move(h)),
  360. executor_(
  361. boost::asio::prefer(
  362. boost::asio::get_associated_executor(handler_),
  363. execution::outstanding_work.tracked)),
  364. allocator_(allocator),
  365. completion_order_(size, 0,
  366. BOOST_ASIO_REBIND_ALLOC(Allocator, std::size_t)(allocator)),
  367. args_(BOOST_ASIO_REBIND_ALLOC(Allocator, op_result_type)(allocator))
  368. {
  369. for (std::size_t i = 0; i < size; ++i)
  370. args_.emplace_back();
  371. }
  372. executor_type get_executor() const noexcept
  373. {
  374. return executor_;
  375. }
  376. void operator()()
  377. {
  378. this->invoke(
  379. boost::asio::detail::make_index_sequence<
  380. std::tuple_size<op_tuple_type>::value>());
  381. }
  382. template <std::size_t... I>
  383. void invoke(boost::asio::detail::index_sequence<I...>)
  384. {
  385. typedef typename parallel_op_signature_as_tuple<
  386. typename ranged_parallel_group_signature<
  387. completion_signature_of_t<Op>,
  388. Allocator
  389. >::raw_type
  390. >::type vectors_type;
  391. // Construct all result vectors using the supplied allocator.
  392. vectors_type vectors{
  393. typename std::tuple_element<I, vectors_type>::type(
  394. BOOST_ASIO_REBIND_ALLOC(Allocator, int)(allocator_))...};
  395. // Reserve sufficient space in each of the result vectors.
  396. int reserve_fold[] = { 0,
  397. ( std::get<I>(vectors).reserve(completion_order_.size()),
  398. 0 )...
  399. };
  400. (void)reserve_fold;
  401. // Copy the results from all operations into the result vectors.
  402. for (std::size_t idx = 0; idx < completion_order_.size(); ++idx)
  403. {
  404. int pushback_fold[] = { 0,
  405. ( std::get<I>(vectors).push_back(
  406. std::move(std::get<I>(args_[idx].get()))),
  407. 0 )...
  408. };
  409. (void)pushback_fold;
  410. }
  411. std::move(handler_)(std::move(completion_order_),
  412. std::move(std::get<I>(vectors))...);
  413. }
  414. Handler handler_;
  415. executor_type executor_;
  416. Allocator allocator_;
  417. std::vector<std::size_t,
  418. BOOST_ASIO_REBIND_ALLOC(Allocator, std::size_t)> completion_order_;
  419. std::deque<op_result_type,
  420. BOOST_ASIO_REBIND_ALLOC(Allocator, op_result_type)> args_;
  421. };
  422. // Shared state for the parallel group.
  423. template <typename Condition, typename Handler, typename Op, typename Allocator>
  424. struct ranged_parallel_group_state
  425. {
  426. ranged_parallel_group_state(Condition&& c, Handler&& h,
  427. std::size_t size, const Allocator& allocator)
  428. : cancellations_requested_(size),
  429. outstanding_(size),
  430. cancellation_signals_(
  431. BOOST_ASIO_REBIND_ALLOC(Allocator,
  432. boost::asio::cancellation_signal)(allocator)),
  433. cancellation_condition_(std::move(c)),
  434. handler_(std::move(h), size, allocator)
  435. {
  436. for (std::size_t i = 0; i < size; ++i)
  437. cancellation_signals_.emplace_back();
  438. }
  439. // The number of operations that have completed so far. Used to determine the
  440. // order of completion.
  441. std::atomic<unsigned int> completed_{0};
  442. // The non-none cancellation type that resulted from a cancellation condition.
  443. // Stored here for use by the group's initiating function.
  444. std::atomic<cancellation_type_t> cancel_type_{cancellation_type::none};
  445. // The number of cancellations that have been requested, either on completion
  446. // of the operations within the group, or via the cancellation slot for the
  447. // group operation. Initially set to the number of operations to prevent
  448. // cancellation signals from being emitted until after all of the group's
  449. // operations' initiating functions have completed.
  450. std::atomic<unsigned int> cancellations_requested_;
  451. // The number of operations that are yet to complete. Used to determine when
  452. // it is safe to invoke the user's completion handler.
  453. std::atomic<unsigned int> outstanding_;
  454. // The cancellation signals for each operation in the group.
  455. std::deque<boost::asio::cancellation_signal,
  456. BOOST_ASIO_REBIND_ALLOC(Allocator, boost::asio::cancellation_signal)>
  457. cancellation_signals_;
  458. // The cancellation condition is used to determine whether the results from an
  459. // individual operation warrant a cancellation request for the whole group.
  460. Condition cancellation_condition_;
  461. // The proxy handler to be invoked once all operations in the group complete.
  462. ranged_parallel_group_completion_handler<Handler, Op, Allocator> handler_;
  463. };
  464. // Handler for an individual operation within the parallel group.
  465. template <typename Condition, typename Handler, typename Op, typename Allocator>
  466. struct ranged_parallel_group_op_handler
  467. {
  468. typedef boost::asio::cancellation_slot cancellation_slot_type;
  469. ranged_parallel_group_op_handler(
  470. std::shared_ptr<ranged_parallel_group_state<
  471. Condition, Handler, Op, Allocator>> state,
  472. std::size_t idx)
  473. : state_(std::move(state)),
  474. idx_(idx)
  475. {
  476. }
  477. cancellation_slot_type get_cancellation_slot() const noexcept
  478. {
  479. return state_->cancellation_signals_[idx_].slot();
  480. }
  481. template <typename... Args>
  482. void operator()(Args... args)
  483. {
  484. // Capture this operation into the completion order.
  485. state_->handler_.completion_order_[state_->completed_++] = idx_;
  486. // Determine whether the results of this operation require cancellation of
  487. // the whole group.
  488. cancellation_type_t cancel_type = state_->cancellation_condition_(args...);
  489. // Capture the result of the operation into the proxy completion handler.
  490. state_->handler_.args_[idx_].emplace(std::move(args)...);
  491. if (cancel_type != cancellation_type::none)
  492. {
  493. // Save the type for potential use by the group's initiating function.
  494. state_->cancel_type_ = cancel_type;
  495. // If we are the first operation to request cancellation, emit a signal
  496. // for each operation in the group.
  497. if (state_->cancellations_requested_++ == 0)
  498. for (std::size_t i = 0; i < state_->cancellation_signals_.size(); ++i)
  499. if (i != idx_)
  500. state_->cancellation_signals_[i].emit(cancel_type);
  501. }
  502. // If this is the last outstanding operation, invoke the user's handler.
  503. if (--state_->outstanding_ == 0)
  504. boost::asio::dispatch(std::move(state_->handler_));
  505. }
  506. std::shared_ptr<ranged_parallel_group_state<
  507. Condition, Handler, Op, Allocator>> state_;
  508. std::size_t idx_;
  509. };
  510. // Handler for an individual operation within the parallel group that has an
  511. // explicitly specified executor.
  512. template <typename Executor, typename Condition,
  513. typename Handler, typename Op, typename Allocator>
  514. struct ranged_parallel_group_op_handler_with_executor :
  515. ranged_parallel_group_op_handler<Condition, Handler, Op, Allocator>
  516. {
  517. typedef ranged_parallel_group_op_handler<
  518. Condition, Handler, Op, Allocator> base_type;
  519. typedef boost::asio::cancellation_slot cancellation_slot_type;
  520. typedef Executor executor_type;
  521. ranged_parallel_group_op_handler_with_executor(
  522. std::shared_ptr<ranged_parallel_group_state<
  523. Condition, Handler, Op, Allocator>> state,
  524. executor_type ex, std::size_t idx)
  525. : ranged_parallel_group_op_handler<Condition, Handler, Op, Allocator>(
  526. std::move(state), idx)
  527. {
  528. cancel_proxy_ =
  529. &this->state_->cancellation_signals_[idx].slot().template
  530. emplace<cancel_proxy>(this->state_, std::move(ex));
  531. }
  532. cancellation_slot_type get_cancellation_slot() const noexcept
  533. {
  534. return cancel_proxy_->signal_.slot();
  535. }
  536. executor_type get_executor() const noexcept
  537. {
  538. return cancel_proxy_->executor_;
  539. }
  540. // Proxy handler that forwards the emitted signal to the correct executor.
  541. struct cancel_proxy
  542. {
  543. cancel_proxy(
  544. std::shared_ptr<ranged_parallel_group_state<
  545. Condition, Handler, Op, Allocator>> state,
  546. executor_type ex)
  547. : state_(std::move(state)),
  548. executor_(std::move(ex))
  549. {
  550. }
  551. void operator()(cancellation_type_t type)
  552. {
  553. if (auto state = state_.lock())
  554. {
  555. boost::asio::cancellation_signal* sig = &signal_;
  556. boost::asio::dispatch(executor_,
  557. [state, sig, type]{ sig->emit(type); });
  558. }
  559. }
  560. std::weak_ptr<ranged_parallel_group_state<
  561. Condition, Handler, Op, Allocator>> state_;
  562. boost::asio::cancellation_signal signal_;
  563. executor_type executor_;
  564. };
  565. cancel_proxy* cancel_proxy_;
  566. };
  567. template <typename Condition, typename Handler, typename Op, typename Allocator>
  568. struct ranged_parallel_group_cancellation_handler
  569. {
  570. ranged_parallel_group_cancellation_handler(
  571. std::shared_ptr<ranged_parallel_group_state<
  572. Condition, Handler, Op, Allocator>> state)
  573. : state_(std::move(state))
  574. {
  575. }
  576. void operator()(cancellation_type_t cancel_type)
  577. {
  578. // If we are the first place to request cancellation, i.e. no operation has
  579. // yet completed and requested cancellation, emit a signal for each
  580. // operation in the group.
  581. if (cancel_type != cancellation_type::none)
  582. if (auto state = state_.lock())
  583. if (state->cancellations_requested_++ == 0)
  584. for (std::size_t i = 0; i < state->cancellation_signals_.size(); ++i)
  585. state->cancellation_signals_[i].emit(cancel_type);
  586. }
  587. std::weak_ptr<ranged_parallel_group_state<
  588. Condition, Handler, Op, Allocator>> state_;
  589. };
  590. template <typename Condition, typename Handler,
  591. typename Range, typename Allocator>
  592. void ranged_parallel_group_launch(Condition cancellation_condition,
  593. Handler handler, Range&& range, const Allocator& allocator)
  594. {
  595. // Get the user's completion handler's cancellation slot, so that we can allow
  596. // cancellation of the entire group.
  597. associated_cancellation_slot_t<Handler> slot
  598. = boost::asio::get_associated_cancellation_slot(handler);
  599. // The type of the asynchronous operation.
  600. typedef decay_t<decltype(*declval<typename Range::iterator>())> op_type;
  601. // Create the shared state for the operation.
  602. typedef ranged_parallel_group_state<Condition,
  603. Handler, op_type, Allocator> state_type;
  604. std::shared_ptr<state_type> state = std::allocate_shared<state_type>(
  605. boost::asio::detail::recycling_allocator<state_type,
  606. boost::asio::detail::thread_info_base::parallel_group_tag>(),
  607. std::move(cancellation_condition),
  608. std::move(handler), range.size(), allocator);
  609. std::size_t idx = 0;
  610. for (auto&& op : std::forward<Range>(range))
  611. {
  612. typedef associated_executor_t<op_type> ex_type;
  613. ex_type ex = boost::asio::get_associated_executor(op);
  614. std::move(op)(
  615. ranged_parallel_group_op_handler_with_executor<
  616. ex_type, Condition, Handler, op_type, Allocator>(
  617. state, std::move(ex), idx++));
  618. }
  619. // Check if any of the operations has already requested cancellation, and if
  620. // so, emit a signal for each operation in the group.
  621. if ((state->cancellations_requested_ -= range.size()) > 0)
  622. for (auto& signal : state->cancellation_signals_)
  623. signal.emit(state->cancel_type_);
  624. // Register a handler with the user's completion handler's cancellation slot.
  625. if (slot.is_connected())
  626. slot.template emplace<
  627. ranged_parallel_group_cancellation_handler<
  628. Condition, Handler, op_type, Allocator>>(state);
  629. }
  630. } // namespace detail
  631. } // namespace experimental
  632. template <template <typename, typename> class Associator,
  633. typename Handler, typename... Ops, typename DefaultCandidate>
  634. struct associator<Associator,
  635. experimental::detail::parallel_group_completion_handler<Handler, Ops...>,
  636. DefaultCandidate>
  637. : Associator<Handler, DefaultCandidate>
  638. {
  639. static typename Associator<Handler, DefaultCandidate>::type get(
  640. const experimental::detail::parallel_group_completion_handler<
  641. Handler, Ops...>& h) noexcept
  642. {
  643. return Associator<Handler, DefaultCandidate>::get(h.handler_);
  644. }
  645. static auto get(
  646. const experimental::detail::parallel_group_completion_handler<
  647. Handler, Ops...>& h,
  648. const DefaultCandidate& c) noexcept
  649. -> decltype(Associator<Handler, DefaultCandidate>::get(h.handler_, c))
  650. {
  651. return Associator<Handler, DefaultCandidate>::get(h.handler_, c);
  652. }
  653. };
  654. template <template <typename, typename> class Associator, typename Handler,
  655. typename Op, typename Allocator, typename DefaultCandidate>
  656. struct associator<Associator,
  657. experimental::detail::ranged_parallel_group_completion_handler<
  658. Handler, Op, Allocator>,
  659. DefaultCandidate>
  660. : Associator<Handler, DefaultCandidate>
  661. {
  662. static typename Associator<Handler, DefaultCandidate>::type get(
  663. const experimental::detail::ranged_parallel_group_completion_handler<
  664. Handler, Op, Allocator>& h) noexcept
  665. {
  666. return Associator<Handler, DefaultCandidate>::get(h.handler_);
  667. }
  668. static auto get(
  669. const experimental::detail::ranged_parallel_group_completion_handler<
  670. Handler, Op, Allocator>& h,
  671. const DefaultCandidate& c) noexcept
  672. -> decltype(Associator<Handler, DefaultCandidate>::get(h.handler_, c))
  673. {
  674. return Associator<Handler, DefaultCandidate>::get(h.handler_, c);
  675. }
  676. };
  677. } // namespace asio
  678. } // namespace boost
  679. #include <boost/asio/detail/pop_options.hpp>
  680. #endif // BOOST_ASIO_IMPL_EXPERIMENTAL_PARALLEL_GROUP_HPP