post_cp.hpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. */
  10. #ifndef __ASIO2_POST_COMPONENT_HPP__
  11. #define __ASIO2_POST_COMPONENT_HPP__
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. #pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <set>
  16. #include <future>
  17. #include <functional>
  18. #include <asio2/base/iopool.hpp>
  19. #include <asio2/base/detail/allocator.hpp>
  20. namespace asio2::detail
  21. {
  22. template<class derived_t, class args_t = void>
  23. class post_cp
  24. {
  25. public:
  26. /**
  27. * @brief constructor
  28. */
  29. post_cp() {}
  30. /**
  31. * @brief destructor
  32. */
  33. ~post_cp() = default;
  34. public:
  35. /**
  36. * @brief Submits a completion token or function object for execution.
  37. * This function submits an object for execution using the object's associated
  38. * executor. The function object is queued for execution, and is never called
  39. * from the current thread prior to returning from <tt>post()</tt>.
  40. */
  41. template<typename Function>
  42. inline derived_t & post(Function&& fn)
  43. {
  44. derived_t& derive = static_cast<derived_t&>(*this);
  45. // if use call post, but the user callback "fn" has't hold the session_ptr,
  46. // it maybe cause crash, so we need hold the session_ptr again at here.
  47. // if the session_ptr is already destroyed, the selfptr() will cause crash.
  48. asio::post(derive.io_->context(), make_allocator(derive.wallocator(),
  49. [p = derive.selfptr(), fn = std::forward<Function>(fn)]() mutable
  50. {
  51. detail::ignore_unused(p);
  52. fn();
  53. }));
  54. return derive;
  55. }
  56. /**
  57. * @brief Submits a completion token or function object for execution after specified delay time.
  58. * This function submits an object for execution using the object's associated
  59. * executor. The function object is queued for execution, and is never called
  60. * from the current thread prior to returning from <tt>post()</tt>.
  61. */
  62. template<typename Function, typename Rep, typename Period>
  63. inline derived_t & post(Function&& fn, std::chrono::duration<Rep, Period> delay)
  64. {
  65. derived_t& derive = static_cast<derived_t&>(*this);
  66. // note : need test.
  67. // has't check where the server or client is stopped, if the server is stopping, but the
  68. // iopool's wait_for_io_context_stopped() has't compelete and just at sleep, then user
  69. // call post but don't call stop_all_timed_tasks, it maybe cause the
  70. // wait_for_io_context_stopped() can't compelete forever,and the server.stop or client.stop
  71. // never compeleted.
  72. if (delay > std::chrono::duration_cast<
  73. std::chrono::duration<Rep, Period>>((asio::steady_timer::duration::max)()))
  74. delay = std::chrono::duration_cast<std::chrono::duration<Rep, Period>>(
  75. (asio::steady_timer::duration::max)());
  76. asio::post(derive.io_->context(), make_allocator(derive.wallocator(),
  77. [this, &derive, p = derive.selfptr(), fn = std::forward<Function>(fn), delay]() mutable
  78. {
  79. std::unique_ptr<asio::steady_timer> timer = std::make_unique<
  80. asio::steady_timer>(derive.io_->context());
  81. this->timed_tasks_.emplace(timer.get());
  82. derive.io_->timers().emplace(timer.get());
  83. timer->expires_after(delay);
  84. timer->async_wait(
  85. [this, &derive, p = std::move(p), timer = std::move(timer), fn = std::move(fn)]
  86. (const error_code& ec) mutable
  87. {
  88. ASIO2_ASSERT((!ec) || ec == asio::error::operation_aborted);
  89. derive.io_->timers().erase(timer.get());
  90. detail::ignore_unused(p);
  91. set_last_error(ec);
  92. #if defined(ASIO2_ENABLE_TIMER_CALLBACK_WHEN_ERROR)
  93. fn();
  94. #else
  95. if (!ec)
  96. {
  97. fn();
  98. }
  99. #endif
  100. this->timed_tasks_.erase(timer.get());
  101. });
  102. }));
  103. return derive;
  104. }
  105. /**
  106. * @brief Submits a completion token or function object for execution.
  107. * This function submits an object for execution using the object's associated
  108. * executor. The function object is queued for execution, and is never called
  109. * from the current thread prior to returning from <tt>post()</tt>.
  110. * note : Never call future's waiting function(eg:wait,get) in a communication(eg:on_recv)
  111. * thread, it will cause dead lock;
  112. */
  113. template<typename Function, typename Allocator>
  114. inline auto post(Function&& fn, asio::use_future_t<Allocator>) ->
  115. std::future<std::invoke_result_t<Function>>
  116. {
  117. using return_type = std::invoke_result_t<Function>;
  118. derived_t& derive = static_cast<derived_t&>(*this);
  119. std::packaged_task<return_type()> task(std::forward<Function>(fn));
  120. std::future<return_type> future = task.get_future();
  121. asio::post(derive.io_->context(), make_allocator(derive.wallocator(),
  122. [p = derive.selfptr(), t = std::move(task)]() mutable
  123. {
  124. detail::ignore_unused(p);
  125. t();
  126. }));
  127. return future;
  128. }
  129. /**
  130. * @brief Submits a completion token or function object for execution after specified delay time.
  131. * This function submits an object for execution using the object's associated
  132. * executor. The function object is queued for execution, and is never called
  133. * from the current thread prior to returning from <tt>post()</tt>.
  134. * note : Never call future's waiting function(eg:wait,get) in a communication(eg:on_recv)
  135. * thread, it will cause dead lock;
  136. */
  137. template<typename Function, typename Rep, typename Period, typename Allocator>
  138. inline auto post(Function&& fn, std::chrono::duration<Rep, Period> delay, asio::use_future_t<Allocator>)
  139. -> std::future<std::invoke_result_t<Function>>
  140. {
  141. using return_type = std::invoke_result_t<Function>;
  142. derived_t& derive = static_cast<derived_t&>(*this);
  143. if (delay > std::chrono::duration_cast<
  144. std::chrono::duration<Rep, Period>>((asio::steady_timer::duration::max)()))
  145. delay = std::chrono::duration_cast<std::chrono::duration<Rep, Period>>(
  146. (asio::steady_timer::duration::max)());
  147. std::packaged_task<return_type()> task(std::forward<Function>(fn));
  148. std::future<return_type> future = task.get_future();
  149. asio::post(derive.io_->context(), make_allocator(derive.wallocator(),
  150. [this, &derive, p = derive.selfptr(), t = std::move(task), delay]() mutable
  151. {
  152. std::unique_ptr<asio::steady_timer> timer = std::make_unique<
  153. asio::steady_timer>(derive.io_->context());
  154. this->timed_tasks_.emplace(timer.get());
  155. derive.io_->timers().emplace(timer.get());
  156. timer->expires_after(delay);
  157. timer->async_wait(
  158. [this, &derive, p = std::move(p), timer = std::move(timer), t = std::move(t)]
  159. (const error_code& ec) mutable
  160. {
  161. ASIO2_ASSERT((!ec) || ec == asio::error::operation_aborted);
  162. derive.io_->timers().erase(timer.get());
  163. detail::ignore_unused(p);
  164. set_last_error(ec);
  165. #if defined(ASIO2_ENABLE_TIMER_CALLBACK_WHEN_ERROR)
  166. t();
  167. #else
  168. if (!ec)
  169. {
  170. t();
  171. }
  172. #endif
  173. this->timed_tasks_.erase(timer.get());
  174. });
  175. }));
  176. return future;
  177. }
  178. /**
  179. * @brief Submits a completion token or function object for execution.
  180. * This function submits an object for execution using the object's associated
  181. * executor. The function object is queued for execution. if current thread is
  182. * the io_context's thread, the function will be executed immediately, otherwise
  183. * the task will be asynchronously post to io_context to execute.
  184. */
  185. template<typename Function>
  186. inline derived_t & dispatch(Function&& fn)
  187. {
  188. derived_t& derive = static_cast<derived_t&>(*this);
  189. asio::dispatch(derive.io_->context(), make_allocator(derive.wallocator(),
  190. [p = derive.selfptr(), fn = std::forward<Function>(fn)]() mutable
  191. {
  192. detail::ignore_unused(p);
  193. fn();
  194. }));
  195. return derive;
  196. }
  197. /**
  198. * @brief Submits a completion token or function object for execution.
  199. * This function submits an object for execution using the object's associated
  200. * executor. The function object is queued for execution. if current thread is
  201. * the io_context's thread, the function will be executed immediately, otherwise
  202. * the task will be asynchronously post to io_context to execute.
  203. * note : Never call future's waiting function(eg:wait,get) in a communication(eg:on_recv)
  204. * thread, it will cause dead lock;
  205. */
  206. template<typename Function, typename Allocator>
  207. inline auto dispatch(Function&& fn, asio::use_future_t<Allocator>) ->
  208. std::future<std::invoke_result_t<Function>>
  209. {
  210. using return_type = std::invoke_result_t<Function>;
  211. derived_t& derive = static_cast<derived_t&>(*this);
  212. std::packaged_task<return_type()> task(std::forward<Function>(fn));
  213. std::future<return_type> future = task.get_future();
  214. // Make sure we run on the io_context thread
  215. asio::dispatch(derive.io_->context(), make_allocator(derive.wallocator(),
  216. [p = derive.selfptr(), t = std::move(task)]() mutable
  217. {
  218. detail::ignore_unused(p);
  219. t();
  220. }));
  221. return future;
  222. }
  223. /**
  224. * @brief Stop all timed events which you posted with a delay duration.
  225. */
  226. inline derived_t& stop_all_timed_events()
  227. {
  228. derived_t& derive = static_cast<derived_t&>(*this);
  229. asio::post(derive.io_->context(), make_allocator(derive.wallocator(),
  230. [this, p = derive.selfptr()]() mutable
  231. {
  232. detail::ignore_unused(p);
  233. for (asio::steady_timer* timer : this->timed_tasks_)
  234. {
  235. detail::cancel_timer(*timer);
  236. }
  237. }));
  238. return derive;
  239. }
  240. /**
  241. * @brief Stop all timed tasks which you posted with a delay duration.
  242. * This function is the same as stop_all_timed_events.
  243. */
  244. inline derived_t& stop_all_timed_tasks()
  245. {
  246. derived_t& derive = static_cast<derived_t&>(*this);
  247. return derive.stop_all_timed_events();
  248. }
  249. protected:
  250. /**
  251. * @brief Stop all timed events which you posted with a delay duration.
  252. * Use dispatch instead of post, this function is used for inner.
  253. */
  254. inline derived_t& _dispatch_stop_all_timed_events()
  255. {
  256. derived_t& derive = static_cast<derived_t&>(*this);
  257. asio::dispatch(derive.io_->context(), make_allocator(derive.wallocator(),
  258. [this, p = derive.selfptr()]() mutable
  259. {
  260. detail::ignore_unused(p);
  261. for (asio::steady_timer* timer : this->timed_tasks_)
  262. {
  263. detail::cancel_timer(*timer);
  264. }
  265. }));
  266. return derive;
  267. }
  268. protected:
  269. /// Used to exit the timer tasks when component is ready to stop.
  270. std::set<asio::steady_timer*> timed_tasks_;
  271. };
  272. }
  273. #endif // !__ASIO2_POST_COMPONENT_HPP__