strand_executor_service.hpp 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. //
  2. // detail/impl/strand_executor_service.hpp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef ASIO_DETAIL_IMPL_STRAND_EXECUTOR_SERVICE_HPP
  11. #define ASIO_DETAIL_IMPL_STRAND_EXECUTOR_SERVICE_HPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include "asio/detail/fenced_block.hpp"
  16. #include "asio/detail/recycling_allocator.hpp"
  17. #include "asio/executor_work_guard.hpp"
  18. #include "asio/defer.hpp"
  19. #include "asio/dispatch.hpp"
  20. #include "asio/post.hpp"
  21. #include "asio/detail/push_options.hpp"
  22. namespace asio {
  23. namespace detail {
  24. template <typename F, typename Allocator>
  25. class strand_executor_service::allocator_binder
  26. {
  27. public:
  28. typedef Allocator allocator_type;
  29. allocator_binder(F&& f, const Allocator& a)
  30. : f_(static_cast<F&&>(f)),
  31. allocator_(a)
  32. {
  33. }
  34. allocator_binder(const allocator_binder& other)
  35. : f_(other.f_),
  36. allocator_(other.allocator_)
  37. {
  38. }
  39. allocator_binder(allocator_binder&& other)
  40. : f_(static_cast<F&&>(other.f_)),
  41. allocator_(static_cast<allocator_type&&>(other.allocator_))
  42. {
  43. }
  44. allocator_type get_allocator() const noexcept
  45. {
  46. return allocator_;
  47. }
  48. void operator()()
  49. {
  50. f_();
  51. }
  52. private:
  53. F f_;
  54. allocator_type allocator_;
  55. };
  56. template <typename Executor>
  57. class strand_executor_service::invoker<Executor,
  58. enable_if_t<
  59. execution::is_executor<Executor>::value
  60. >>
  61. {
  62. public:
  63. invoker(const implementation_type& impl, Executor& ex)
  64. : impl_(impl),
  65. executor_(asio::prefer(ex, execution::outstanding_work.tracked))
  66. {
  67. }
  68. invoker(const invoker& other)
  69. : impl_(other.impl_),
  70. executor_(other.executor_)
  71. {
  72. }
  73. invoker(invoker&& other)
  74. : impl_(static_cast<implementation_type&&>(other.impl_)),
  75. executor_(static_cast<executor_type&&>(other.executor_))
  76. {
  77. }
  78. struct on_invoker_exit
  79. {
  80. invoker* this_;
  81. ~on_invoker_exit()
  82. {
  83. if (push_waiting_to_ready(this_->impl_))
  84. {
  85. recycling_allocator<void> allocator;
  86. executor_type ex = this_->executor_;
  87. asio::prefer(
  88. asio::require(
  89. static_cast<executor_type&&>(ex),
  90. execution::blocking.never),
  91. execution::allocator(allocator)
  92. ).execute(static_cast<invoker&&>(*this_));
  93. }
  94. }
  95. };
  96. void operator()()
  97. {
  98. // Ensure the next handler, if any, is scheduled on block exit.
  99. on_invoker_exit on_exit = { this };
  100. (void)on_exit;
  101. run_ready_handlers(impl_);
  102. }
  103. private:
  104. typedef decay_t<
  105. prefer_result_t<
  106. Executor,
  107. execution::outstanding_work_t::tracked_t
  108. >
  109. > executor_type;
  110. implementation_type impl_;
  111. executor_type executor_;
  112. };
  113. #if !defined(ASIO_NO_TS_EXECUTORS)
  114. template <typename Executor>
  115. class strand_executor_service::invoker<Executor,
  116. enable_if_t<
  117. !execution::is_executor<Executor>::value
  118. >>
  119. {
  120. public:
  121. invoker(const implementation_type& impl, Executor& ex)
  122. : impl_(impl),
  123. work_(ex)
  124. {
  125. }
  126. invoker(const invoker& other)
  127. : impl_(other.impl_),
  128. work_(other.work_)
  129. {
  130. }
  131. invoker(invoker&& other)
  132. : impl_(static_cast<implementation_type&&>(other.impl_)),
  133. work_(static_cast<executor_work_guard<Executor>&&>(other.work_))
  134. {
  135. }
  136. struct on_invoker_exit
  137. {
  138. invoker* this_;
  139. ~on_invoker_exit()
  140. {
  141. if (push_waiting_to_ready(this_->impl_))
  142. {
  143. Executor ex(this_->work_.get_executor());
  144. recycling_allocator<void> allocator;
  145. ex.post(static_cast<invoker&&>(*this_), allocator);
  146. }
  147. }
  148. };
  149. void operator()()
  150. {
  151. // Ensure the next handler, if any, is scheduled on block exit.
  152. on_invoker_exit on_exit = { this };
  153. (void)on_exit;
  154. run_ready_handlers(impl_);
  155. }
  156. private:
  157. implementation_type impl_;
  158. executor_work_guard<Executor> work_;
  159. };
  160. #endif // !defined(ASIO_NO_TS_EXECUTORS)
  161. template <typename Executor, typename Function>
  162. inline void strand_executor_service::execute(const implementation_type& impl,
  163. Executor& ex, Function&& function,
  164. enable_if_t<
  165. can_query<Executor, execution::allocator_t<void>>::value
  166. >*)
  167. {
  168. return strand_executor_service::do_execute(impl, ex,
  169. static_cast<Function&&>(function),
  170. asio::query(ex, execution::allocator));
  171. }
  172. template <typename Executor, typename Function>
  173. inline void strand_executor_service::execute(const implementation_type& impl,
  174. Executor& ex, Function&& function,
  175. enable_if_t<
  176. !can_query<Executor, execution::allocator_t<void>>::value
  177. >*)
  178. {
  179. return strand_executor_service::do_execute(impl, ex,
  180. static_cast<Function&&>(function),
  181. std::allocator<void>());
  182. }
  183. template <typename Executor, typename Function, typename Allocator>
  184. void strand_executor_service::do_execute(const implementation_type& impl,
  185. Executor& ex, Function&& function, const Allocator& a)
  186. {
  187. typedef decay_t<Function> function_type;
  188. // If the executor is not never-blocking, and we are already in the strand,
  189. // then the function can run immediately.
  190. if (asio::query(ex, execution::blocking) != execution::blocking.never
  191. && running_in_this_thread(impl))
  192. {
  193. // Make a local, non-const copy of the function.
  194. function_type tmp(static_cast<Function&&>(function));
  195. fenced_block b(fenced_block::full);
  196. static_cast<function_type&&>(tmp)();
  197. return;
  198. }
  199. // Allocate and construct an operation to wrap the function.
  200. typedef executor_op<function_type, Allocator> op;
  201. typename op::ptr p = { detail::addressof(a), op::ptr::allocate(a), 0 };
  202. p.p = new (p.v) op(static_cast<Function&&>(function), a);
  203. ASIO_HANDLER_CREATION((impl->service_->context(), *p.p,
  204. "strand_executor", impl.get(), 0, "execute"));
  205. // Add the function to the strand and schedule the strand if required.
  206. bool first = enqueue(impl, p.p);
  207. p.v = p.p = 0;
  208. if (first)
  209. {
  210. ex.execute(invoker<Executor>(impl, ex));
  211. }
  212. }
  213. template <typename Executor, typename Function, typename Allocator>
  214. void strand_executor_service::dispatch(const implementation_type& impl,
  215. Executor& ex, Function&& function, const Allocator& a)
  216. {
  217. typedef decay_t<Function> function_type;
  218. // If we are already in the strand then the function can run immediately.
  219. if (running_in_this_thread(impl))
  220. {
  221. // Make a local, non-const copy of the function.
  222. function_type tmp(static_cast<Function&&>(function));
  223. fenced_block b(fenced_block::full);
  224. static_cast<function_type&&>(tmp)();
  225. return;
  226. }
  227. // Allocate and construct an operation to wrap the function.
  228. typedef executor_op<function_type, Allocator> op;
  229. typename op::ptr p = { detail::addressof(a), op::ptr::allocate(a), 0 };
  230. p.p = new (p.v) op(static_cast<Function&&>(function), a);
  231. ASIO_HANDLER_CREATION((impl->service_->context(), *p.p,
  232. "strand_executor", impl.get(), 0, "dispatch"));
  233. // Add the function to the strand and schedule the strand if required.
  234. bool first = enqueue(impl, p.p);
  235. p.v = p.p = 0;
  236. if (first)
  237. {
  238. asio::dispatch(ex,
  239. allocator_binder<invoker<Executor>, Allocator>(
  240. invoker<Executor>(impl, ex), a));
  241. }
  242. }
  243. // Request invocation of the given function and return immediately.
  244. template <typename Executor, typename Function, typename Allocator>
  245. void strand_executor_service::post(const implementation_type& impl,
  246. Executor& ex, Function&& function, const Allocator& a)
  247. {
  248. typedef decay_t<Function> function_type;
  249. // Allocate and construct an operation to wrap the function.
  250. typedef executor_op<function_type, Allocator> op;
  251. typename op::ptr p = { detail::addressof(a), op::ptr::allocate(a), 0 };
  252. p.p = new (p.v) op(static_cast<Function&&>(function), a);
  253. ASIO_HANDLER_CREATION((impl->service_->context(), *p.p,
  254. "strand_executor", impl.get(), 0, "post"));
  255. // Add the function to the strand and schedule the strand if required.
  256. bool first = enqueue(impl, p.p);
  257. p.v = p.p = 0;
  258. if (first)
  259. {
  260. asio::post(ex,
  261. allocator_binder<invoker<Executor>, Allocator>(
  262. invoker<Executor>(impl, ex), a));
  263. }
  264. }
  265. // Request invocation of the given function and return immediately.
  266. template <typename Executor, typename Function, typename Allocator>
  267. void strand_executor_service::defer(const implementation_type& impl,
  268. Executor& ex, Function&& function, const Allocator& a)
  269. {
  270. typedef decay_t<Function> function_type;
  271. // Allocate and construct an operation to wrap the function.
  272. typedef executor_op<function_type, Allocator> op;
  273. typename op::ptr p = { detail::addressof(a), op::ptr::allocate(a), 0 };
  274. p.p = new (p.v) op(static_cast<Function&&>(function), a);
  275. ASIO_HANDLER_CREATION((impl->service_->context(), *p.p,
  276. "strand_executor", impl.get(), 0, "defer"));
  277. // Add the function to the strand and schedule the strand if required.
  278. bool first = enqueue(impl, p.p);
  279. p.v = p.p = 0;
  280. if (first)
  281. {
  282. asio::defer(ex,
  283. allocator_binder<invoker<Executor>, Allocator>(
  284. invoker<Executor>(impl, ex), a));
  285. }
  286. }
  287. } // namespace detail
  288. } // namespace asio
  289. #include "asio/detail/pop_options.hpp"
  290. #endif // ASIO_DETAIL_IMPL_STRAND_EXECUTOR_SERVICE_HPP