event_queue_cp.hpp 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690
  1. /*
  2. * Copyright (c) 2017-2023 zhllxt
  3. *
  4. * author : zhllxt
  5. * email : 37792738@qq.com
  6. *
  7. * Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. * file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. */
  10. #ifndef __ASIO2_EVENT_QUEUE_COMPONENT_HPP__
  11. #define __ASIO2_EVENT_QUEUE_COMPONENT_HPP__
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. #pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <cstdint>
  16. #include <memory>
  17. #include <functional>
  18. #include <string>
  19. #include <future>
  20. #include <queue>
  21. #include <tuple>
  22. #include <utility>
  23. #include <string_view>
  24. #include <asio2/base/iopool.hpp>
  25. #include <asio2/base/detail/util.hpp>
  26. #include <asio2/base/detail/function_traits.hpp>
  27. #include <asio2/base/detail/buffer_wrap.hpp>
  28. #include <asio2/base/detail/function.hpp>
  29. #ifndef ASIO2_EVENT_QUEUE_STACK_MAX_SIZE
  30. #define ASIO2_EVENT_QUEUE_STACK_MAX_SIZE 256
  31. #endif
  32. namespace asio2::detail
  33. {
  34. template <class, class> class event_queue_cp;
  35. template <class... > class defer_event;
  36. template<class derived_t>
  37. class event_queue_guard
  38. {
  39. template <class, class> friend class event_queue_cp;
  40. template <class... > friend class defer_event;
  41. public:
  42. explicit event_queue_guard() noexcept
  43. {
  44. }
  45. explicit event_queue_guard(std::nullptr_t) noexcept
  46. {
  47. }
  48. protected:
  49. // the valid guard can only be created by event_queue_cp
  50. explicit event_queue_guard(derived_t& d) noexcept
  51. : derive(std::addressof(d)), derive_ptr_(d.selfptr()), valid_(true)
  52. {
  53. }
  54. public:
  55. inline event_queue_guard(event_queue_guard&& o) noexcept
  56. : derive(o.derive), derive_ptr_(std::move(o.derive_ptr_)), valid_(o.valid_)
  57. {
  58. o.valid_ = false;
  59. }
  60. event_queue_guard(const event_queue_guard&) = delete;
  61. event_queue_guard& operator=(const event_queue_guard&) = delete;
  62. event_queue_guard& operator=(event_queue_guard&&) = delete;
  63. ~event_queue_guard() noexcept
  64. {
  65. if (this->valid_)
  66. {
  67. derive->next_event(std::move(*this));
  68. ASIO2_ASSERT(this->valid_ == false);
  69. }
  70. }
  71. inline bool empty() const noexcept { return (!valid_); }
  72. inline bool is_empty() const noexcept { return (!valid_); }
  73. protected:
  74. derived_t * derive = nullptr;
  75. // must hold the derived object, maybe empty in client
  76. // if didn't hold the derived object, when the callback is executed in the event queue,
  77. // the derived object which holded by the callback maybe destroyed by std::move(), when
  78. // event_queue_guard is destroyed, and will call derive.next_event; the "derive" maybe
  79. // invalid already.
  80. std::shared_ptr<derived_t> derive_ptr_;
  81. // whether the guard is valid, when object is moved by std::move the guard will be invalid
  82. bool valid_ = false;
  83. };
  84. template<class Function>
  85. class [[maybe_unused]] defer_event<Function>
  86. {
  87. template <class...> friend class defer_event;
  88. public:
  89. template<class Fn>
  90. defer_event(Fn&& fn) noexcept
  91. : fn_(std::forward<Fn>(fn)), valid_(true)
  92. {
  93. }
  94. inline defer_event(defer_event&& o) noexcept
  95. : fn_(std::move(o.fn_)), valid_(o.valid_)
  96. {
  97. o.valid_ = false;
  98. };
  99. defer_event(const defer_event&) = delete;
  100. defer_event& operator=(const defer_event&) = delete;
  101. defer_event& operator=(defer_event&&) = delete;
  102. ~defer_event() noexcept
  103. {
  104. if (valid_)
  105. {
  106. valid_ = false;
  107. (fn_)();
  108. }
  109. }
  110. inline bool empty() const noexcept { return (!valid_); }
  111. inline bool is_empty() const noexcept { return (!valid_); }
  112. inline constexpr bool is_event_queue_guard_empty() const noexcept { return true; }
  113. protected:
  114. Function fn_;
  115. bool valid_ = false;
  116. };
  117. template<>
  118. class [[maybe_unused]] defer_event<void>
  119. {
  120. template <class...> friend class defer_event;
  121. public:
  122. defer_event() noexcept {}
  123. defer_event(std::nullptr_t) noexcept {}
  124. defer_event(defer_event&&) noexcept = default;
  125. defer_event& operator=(defer_event&&) noexcept = default;
  126. defer_event(const defer_event&) = delete;
  127. defer_event& operator=(const defer_event&) = delete;
  128. inline constexpr bool empty() const noexcept { return true; }
  129. inline constexpr bool is_empty() const noexcept { return true; }
  130. inline constexpr bool is_event_queue_guard_empty() const noexcept { return true; }
  131. };
  132. // defer event with event queue guard dummy
  133. template<class derived_t>
  134. struct defer_eqg_dummy
  135. {
  136. inline void operator()(event_queue_guard<derived_t>) {}
  137. };
  138. template<class Function, class derived_t>
  139. class [[maybe_unused]] defer_event<Function, derived_t, std::false_type>
  140. {
  141. template <class...> friend class defer_event;
  142. public:
  143. template<class Fn>
  144. defer_event(Fn&& fn, std::nullptr_t) noexcept
  145. : fn_(std::forward<Fn>(fn))
  146. , valid_(true)
  147. {
  148. }
  149. inline defer_event(defer_event&& o) noexcept
  150. : fn_(std::move(o.fn_)), valid_(o.valid_)
  151. {
  152. o.valid_ = false;
  153. };
  154. defer_event(const defer_event&) = delete;
  155. defer_event& operator=(const defer_event&) = delete;
  156. defer_event& operator=(defer_event&&) = delete;
  157. ~defer_event() noexcept
  158. {
  159. if (valid_)
  160. {
  161. valid_ = false;
  162. (fn_)(event_queue_guard<derived_t>());
  163. }
  164. }
  165. inline bool empty() const noexcept { return (!valid_); }
  166. inline bool is_empty() const noexcept { return (!valid_); }
  167. inline constexpr bool is_event_queue_guard_empty() const noexcept { return true; }
  168. inline defer_event<Function, derived_t, std::false_type> move_event() noexcept
  169. {
  170. return std::move(*this);
  171. }
  172. inline event_queue_guard<derived_t> move_guard() noexcept
  173. {
  174. return event_queue_guard<derived_t>();
  175. }
  176. protected:
  177. Function fn_;
  178. bool valid_ = false;
  179. };
  180. template<class Function, class derived_t>
  181. class [[maybe_unused]] defer_event<Function, derived_t, std::true_type>
  182. {
  183. template <class...> friend class defer_event;
  184. public:
  185. template<class Fn, class D = derived_t>
  186. defer_event(Fn&& fn, event_queue_guard<D> guard) noexcept
  187. : fn_(std::forward<Fn>(fn))
  188. , valid_(true)
  189. , guard_(std::move(guard))
  190. {
  191. }
  192. template<class Fn, class D = derived_t>
  193. defer_event(defer_event<Fn, D, std::false_type> o, event_queue_guard<D> guard) noexcept
  194. : fn_ (std::move(o.fn_ ))
  195. , valid_( o.valid_ )
  196. , guard_(std::move(guard ))
  197. {
  198. o.valid_ = false;
  199. }
  200. inline defer_event(defer_event&& o) noexcept
  201. : fn_(std::move(o.fn_)), valid_(o.valid_), guard_(std::move(o.guard_))
  202. {
  203. o.valid_ = false;
  204. };
  205. defer_event(const defer_event&) = delete;
  206. defer_event& operator=(const defer_event&) = delete;
  207. defer_event& operator=(defer_event&&) = delete;
  208. ~defer_event() noexcept
  209. {
  210. if (valid_)
  211. {
  212. valid_ = false;
  213. (fn_)(std::move(guard_));
  214. }
  215. // guard will be destroy at here, then guard's destroctor will be called
  216. }
  217. inline bool empty() const noexcept { return (!valid_); }
  218. inline bool is_empty() const noexcept { return (!valid_); }
  219. inline bool is_event_queue_guard_empty() const noexcept { return guard_.empty(); }
  220. inline defer_event<Function, derived_t, std::false_type> move_event() noexcept
  221. {
  222. defer_event<Function, derived_t, std::false_type> evt(std::move(fn_), nullptr);
  223. evt.valid_ = this->valid_;
  224. this->valid_ = false;
  225. return evt;
  226. }
  227. inline event_queue_guard<derived_t> move_guard() noexcept
  228. {
  229. return std::move(guard_);
  230. }
  231. protected:
  232. Function fn_;
  233. bool valid_ = false;
  234. event_queue_guard<derived_t> guard_;
  235. };
  236. template<class derived_t>
  237. class [[maybe_unused]] defer_event<void, derived_t>
  238. {
  239. template <class...> friend class defer_event;
  240. public:
  241. defer_event() noexcept
  242. {
  243. }
  244. template<class D = derived_t>
  245. defer_event(event_queue_guard<D> guard) noexcept
  246. : guard_(std::move(guard))
  247. {
  248. }
  249. defer_event(defer_event&& o) noexcept = default;
  250. defer_event(const defer_event&) = delete;
  251. defer_event& operator=(const defer_event&) = delete;
  252. defer_event& operator=(defer_event&&) = delete;
  253. ~defer_event() noexcept
  254. {
  255. // guard will be destroy at here, then guard's destroctor will be called
  256. }
  257. inline constexpr bool empty() const noexcept { return true; }
  258. inline constexpr bool is_empty() const noexcept { return true; }
  259. inline bool is_event_queue_guard_empty() const noexcept { return guard_.empty(); }
  260. inline defer_event<defer_eqg_dummy<derived_t>, derived_t, std::false_type> move_event() noexcept
  261. {
  262. defer_event<defer_eqg_dummy<derived_t>, derived_t, std::false_type> evt(
  263. defer_eqg_dummy<derived_t>{}, nullptr);
  264. evt.valid_ = false;
  265. return evt;
  266. }
  267. inline event_queue_guard<derived_t> move_guard() noexcept
  268. {
  269. return std::move(guard_);
  270. }
  271. protected:
  272. event_queue_guard<derived_t> guard_;
  273. };
  274. template<class F>
  275. defer_event(F)->defer_event<F>;
  276. defer_event(std::nullptr_t)->defer_event<void>;
  277. template<class F, class derived_t>
  278. defer_event(F, event_queue_guard<derived_t>)->defer_event<F, derived_t, std::true_type>;
  279. template<class F, class derived_t>
  280. defer_event(defer_event<F, derived_t, std::false_type>, event_queue_guard<derived_t>)->
  281. defer_event<F, derived_t, std::true_type>;
  282. // This will cause error "non-deducible template parameter 'derived_t'" on macos clion
  283. //template<class F, class derived_t>
  284. //defer_event(F, std::nullptr_t)->defer_event<F, derived_t, std::false_type>;
  285. // This will cause error "non-deducible template parameter 'derived_t'" on macos clion
  286. //template<class derived_t>
  287. //defer_event()->defer_event<void, derived_t>;
  288. template<class derived_t>
  289. defer_event(event_queue_guard<derived_t>)->defer_event<void, derived_t>;
  290. template<class derived_t, class args_t = void>
  291. class event_queue_cp
  292. {
  293. protected:
  294. struct event_stack_size_guard
  295. {
  296. std::int16_t& x;
  297. event_stack_size_guard(std::int16_t& n) : x(n)
  298. {
  299. ++x;
  300. }
  301. ~event_stack_size_guard()
  302. {
  303. --x;
  304. }
  305. };
  306. public:
  307. /**
  308. * @brief constructor
  309. */
  310. event_queue_cp() noexcept {}
  311. /**
  312. * @brief destructor
  313. */
  314. ~event_queue_cp() = default;
  315. /**
  316. * @brief Get pending event count in the event queue.
  317. */
  318. inline std::size_t get_pending_event_count() const noexcept
  319. {
  320. return this->events_.size();
  321. }
  322. /**
  323. * post a task to the tail of the event queue
  324. * Callback signature : void()
  325. */
  326. template<class Callback>
  327. inline derived_t& post_queued_event(Callback&& func)
  328. {
  329. using return_type = std::invoke_result_t<Callback>;
  330. derived_t& derive = static_cast<derived_t&>(*this);
  331. std::packaged_task<return_type()> task(std::forward<Callback>(func));
  332. auto fn = [p = derive.selfptr(), t = std::move(task)](event_queue_guard<derived_t> g) mutable
  333. {
  334. detail::ignore_unused(p, g);
  335. t();
  336. };
  337. // Make sure we run on the io_context thread
  338. // beacuse the callback "fn" hold the derived_ptr already,
  339. // so this callback for asio::dispatch don't need hold the derived_ptr again.
  340. asio::post(derive.io_->context(), make_allocator(derive.wallocator(),
  341. [this, fn = std::move(fn)]() mutable
  342. {
  343. ASIO2_ASSERT(this->events_.size() < std::size_t(32767));
  344. bool empty = this->events_.empty();
  345. this->events_.emplace(std::move(fn));
  346. if (empty)
  347. {
  348. (this->events_.front())(event_queue_guard<derived_t>{static_cast<derived_t&>(*this)});
  349. }
  350. }));
  351. return derive;
  352. }
  353. /**
  354. * post a task to the tail of the event queue
  355. * Callback signature : void()
  356. */
  357. template<class Callback, typename Allocator>
  358. inline auto post_queued_event(Callback&& func, asio::use_future_t<Allocator>) ->
  359. std::future<std::invoke_result_t<Callback>>
  360. {
  361. using return_type = std::invoke_result_t<Callback>;
  362. derived_t& derive = static_cast<derived_t&>(*this);
  363. std::packaged_task<return_type()> task(std::forward<Callback>(func));
  364. std::future<return_type> future = task.get_future();
  365. auto fn = [p = derive.selfptr(), t = std::move(task)](event_queue_guard<derived_t> g) mutable
  366. {
  367. detail::ignore_unused(p, g);
  368. t();
  369. };
  370. // Make sure we run on the io_context thread
  371. // beacuse the callback "fn" hold the derived_ptr already,
  372. // so this callback for asio::dispatch don't need hold the derived_ptr again.
  373. asio::post(derive.io_->context(), make_allocator(derive.wallocator(),
  374. [this, fn = std::move(fn)]() mutable
  375. {
  376. ASIO2_ASSERT(this->events_.size() < std::size_t(32767));
  377. bool empty = this->events_.empty();
  378. this->events_.emplace(std::move(fn));
  379. if (empty)
  380. {
  381. (this->events_.front())(event_queue_guard<derived_t>{static_cast<derived_t&>(*this)});
  382. }
  383. }));
  384. return future;
  385. }
  386. protected:
  387. /**
  388. * push a task to the tail of the event queue
  389. * Callback signature : void(event_queue_guard<derived_t> g)
  390. * note : the callback must hold the derived_ptr itself
  391. * note : the callback must can't be hold the event_queue_guard, otherwise maybe cause deadlock.
  392. */
  393. template<class Callback>
  394. inline derived_t& push_event(Callback&& func)
  395. {
  396. derived_t& derive = static_cast<derived_t&>(*this);
  397. // Should we use post to ensure that the task must be executed in the order of push_event?
  398. // If we don't do that, example :
  399. // call push_event in thread main with task1 first, call push_event in thread 0 with task2 second,
  400. // beacuse the task1 is not in the thread 0, so the task1 will be enqueued by asio::dispatch, but
  401. // the task2 is in the thread 0, so the task2 will be enqueued directly, In this case,
  402. // task 2 is before task 1 in the queue
  403. #ifndef ASIO2_STRONG_EVENT_ORDER
  404. // manual dispatch has better performance.
  405. // Make sure we run on the io_context thread
  406. if (derive.io_->running_in_this_thread())
  407. {
  408. ASIO2_ASSERT(this->events_.size() < std::size_t(32767));
  409. bool empty = this->events_.empty();
  410. this->events_.emplace(std::forward<Callback>(func));
  411. if (empty)
  412. {
  413. (this->events_.front())(event_queue_guard<derived_t>{derive});
  414. }
  415. return derive;
  416. }
  417. #endif
  418. // beacuse the callback "func" hold the derived_ptr already,
  419. // so this callback for asio::dispatch don't need hold the derived_ptr again.
  420. asio::post(derive.io_->context(), make_allocator(derive.wallocator(),
  421. [this, func = std::forward<Callback>(func)]() mutable
  422. {
  423. ASIO2_ASSERT(this->events_.size() < std::size_t(32767));
  424. bool empty = this->events_.empty();
  425. this->events_.emplace(std::move(func));
  426. if (empty)
  427. {
  428. (this->events_.front())(event_queue_guard<derived_t>{static_cast<derived_t&>(*this)});
  429. }
  430. }));
  431. return derive;
  432. }
  433. /**
  434. * post a task to the tail of the event queue
  435. * Callback signature : void(event_queue_guard<derived_t> g)
  436. * note : the callback must hold the derived_ptr itself
  437. * note : the callback must can't be hold the event_queue_guard, otherwise maybe cause deadlock.
  438. */
  439. template<class Callback>
  440. inline derived_t& post_event(Callback&& func)
  441. {
  442. derived_t& derive = static_cast<derived_t&>(*this);
  443. // Make sure we run on the io_context thread
  444. // beacuse the callback "func" hold the derived_ptr already,
  445. // so this callback for asio::dispatch don't need hold the derived_ptr again.
  446. asio::post(derive.io_->context(), make_allocator(derive.wallocator(),
  447. [this, func = std::forward<Callback>(func)]() mutable
  448. {
  449. ASIO2_ASSERT(this->events_.size() < std::size_t(32767));
  450. bool empty = this->events_.empty();
  451. this->events_.emplace(std::move(func));
  452. if (empty)
  453. {
  454. (this->events_.front())(event_queue_guard<derived_t>{static_cast<derived_t&>(*this)});
  455. }
  456. }));
  457. return derive;
  458. }
  459. /**
  460. * dispatch a task
  461. * if the guard is not valid, the task will pushed to the tail of the event queue(like push_event),
  462. * otherwise the task will be executed directly.
  463. * Callback signature : void(event_queue_guard<derived_t> g)
  464. * note : the callback must hold the derived_ptr itself
  465. * note : the callback must can't be hold the event_queue_guard, otherwise maybe cause deadlock.
  466. */
  467. template<class Callback>
  468. inline derived_t& disp_event(Callback&& func, event_queue_guard<derived_t> guard)
  469. {
  470. derived_t& derive = static_cast<derived_t&>(*this);
  471. if (guard.is_empty())
  472. {
  473. derive.push_event(std::forward<Callback>(func));
  474. }
  475. else
  476. {
  477. // when some exception occured, disp_event maybe called in the "catch(){ ... }",
  478. // then this maybe not in the io_context thread.
  479. //ASIO2_ASSERT(derive.io_->running_in_this_thread());
  480. // beacuse the callback "func" hold the derived_ptr already,
  481. // so this callback for asio::dispatch don't need hold the derived_ptr again.
  482. asio::dispatch(derive.io_->context(), make_allocator(derive.wallocator(),
  483. [func = std::forward<Callback>(func), guard = std::move(guard)]() mutable
  484. {
  485. func(std::move(guard));
  486. }));
  487. }
  488. return derive;
  489. }
  490. /**
  491. * Removes an element from the front of the event queue.
  492. * and then execute the next element of the queue.
  493. */
  494. template<typename = void>
  495. inline derived_t& next_event(event_queue_guard<derived_t> g)
  496. {
  497. derived_t& derive = static_cast<derived_t&>(*this);
  498. // manual dispatch has better performance.
  499. // Make sure we run on the io_context thread
  500. if (derive.io_->running_in_this_thread())
  501. {
  502. ASIO2_ASSERT(!g.is_empty());
  503. ASIO2_ASSERT(!this->events_.empty());
  504. if (this->event_stack_size_ < std::int16_t(ASIO2_EVENT_QUEUE_STACK_MAX_SIZE))
  505. {
  506. if (!this->events_.empty())
  507. {
  508. this->events_.pop();
  509. if (!this->events_.empty())
  510. {
  511. event_stack_size_guard sg{ this->event_stack_size_ };
  512. (this->events_.front())(std::move(g));
  513. }
  514. else
  515. {
  516. // must set valid to false, otherwise when g is destroyed, it will enter
  517. // next_event again, this will cause a infinite loop, and cause stack overflow.
  518. g.valid_ = false;
  519. }
  520. }
  521. ASIO2_ASSERT(g.is_empty());
  522. return derive;
  523. }
  524. else
  525. {
  526. std::ignore = true;
  527. }
  528. }
  529. // must hold the derived_ptr, beacuse next_event is called by event_queue_guard, when
  530. // event_queue_guard is destroyed, the event queue and event_queue_guard maybe has't
  531. // hold derived object both.
  532. asio::post(derive.io_->context(), make_allocator(derive.wallocator(),
  533. [this, p = derive.selfptr(), g = std::move(g)]() mutable
  534. {
  535. ASIO2_ASSERT(!g.is_empty());
  536. ASIO2_ASSERT(!this->events_.empty());
  537. if (!this->events_.empty())
  538. {
  539. this->events_.pop();
  540. if (!this->events_.empty())
  541. {
  542. (this->events_.front())(std::move(g));
  543. }
  544. else
  545. {
  546. // must set valid to false, otherwise when g is destroyed, it will enter
  547. // next_event again, this will cause a infinite loop, and cause stack overflow.
  548. g.valid_ = false;
  549. }
  550. }
  551. ASIO2_ASSERT(g.is_empty());
  552. }));
  553. return derive;
  554. }
  555. protected:
  556. std::int16_t event_stack_size_{ std::int16_t(0) };
  557. std::queue<detail::function<
  558. void(event_queue_guard<derived_t>), detail::function_size_traits<args_t>::value>> events_;
  559. };
  560. }
  561. #endif // !__ASIO2_EVENT_QUEUE_COMPONENT_HPP__