scheduler.ipp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677
  1. //
  2. // detail/impl/scheduler.ipp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2024 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP
  11. #define BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <boost/asio/detail/config.hpp>
  16. #include <boost/asio/detail/concurrency_hint.hpp>
  17. #include <boost/asio/detail/event.hpp>
  18. #include <boost/asio/detail/limits.hpp>
  19. #include <boost/asio/detail/scheduler.hpp>
  20. #include <boost/asio/detail/scheduler_thread_info.hpp>
  21. #include <boost/asio/detail/signal_blocker.hpp>
  22. #if defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
  23. # include <boost/asio/detail/io_uring_service.hpp>
  24. #else // defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
  25. # include <boost/asio/detail/reactor.hpp>
  26. #endif // defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
  27. #include <boost/asio/detail/push_options.hpp>
  28. namespace boost {
  29. namespace asio {
  30. namespace detail {
  31. class scheduler::thread_function
  32. {
  33. public:
  34. explicit thread_function(scheduler* s)
  35. : this_(s)
  36. {
  37. }
  38. void operator()()
  39. {
  40. boost::system::error_code ec;
  41. this_->run(ec);
  42. }
  43. private:
  44. scheduler* this_;
  45. };
  46. struct scheduler::task_cleanup
  47. {
  48. ~task_cleanup()
  49. {
  50. if (this_thread_->private_outstanding_work > 0)
  51. {
  52. boost::asio::detail::increment(
  53. scheduler_->outstanding_work_,
  54. this_thread_->private_outstanding_work);
  55. }
  56. this_thread_->private_outstanding_work = 0;
  57. // Enqueue the completed operations and reinsert the task at the end of
  58. // the operation queue.
  59. lock_->lock();
  60. scheduler_->task_interrupted_ = true;
  61. scheduler_->op_queue_.push(this_thread_->private_op_queue);
  62. scheduler_->op_queue_.push(&scheduler_->task_operation_);
  63. }
  64. scheduler* scheduler_;
  65. mutex::scoped_lock* lock_;
  66. thread_info* this_thread_;
  67. };
  68. struct scheduler::work_cleanup
  69. {
  70. ~work_cleanup()
  71. {
  72. if (this_thread_->private_outstanding_work > 1)
  73. {
  74. boost::asio::detail::increment(
  75. scheduler_->outstanding_work_,
  76. this_thread_->private_outstanding_work - 1);
  77. }
  78. else if (this_thread_->private_outstanding_work < 1)
  79. {
  80. scheduler_->work_finished();
  81. }
  82. this_thread_->private_outstanding_work = 0;
  83. #if defined(BOOST_ASIO_HAS_THREADS)
  84. if (!this_thread_->private_op_queue.empty())
  85. {
  86. lock_->lock();
  87. scheduler_->op_queue_.push(this_thread_->private_op_queue);
  88. }
  89. #endif // defined(BOOST_ASIO_HAS_THREADS)
  90. }
  91. scheduler* scheduler_;
  92. mutex::scoped_lock* lock_;
  93. thread_info* this_thread_;
  94. };
  95. scheduler::scheduler(boost::asio::execution_context& ctx,
  96. int concurrency_hint, bool own_thread, get_task_func_type get_task)
  97. : boost::asio::detail::execution_context_service_base<scheduler>(ctx),
  98. one_thread_(concurrency_hint == 1
  99. || !BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
  100. SCHEDULER, concurrency_hint)
  101. || !BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
  102. REACTOR_IO, concurrency_hint)),
  103. mutex_(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
  104. SCHEDULER, concurrency_hint)),
  105. task_(0),
  106. get_task_(get_task),
  107. task_interrupted_(true),
  108. outstanding_work_(0),
  109. stopped_(false),
  110. shutdown_(false),
  111. concurrency_hint_(concurrency_hint),
  112. thread_(0)
  113. {
  114. BOOST_ASIO_HANDLER_TRACKING_INIT;
  115. if (own_thread)
  116. {
  117. ++outstanding_work_;
  118. boost::asio::detail::signal_blocker sb;
  119. thread_ = new boost::asio::detail::thread(thread_function(this));
  120. }
  121. }
  122. scheduler::~scheduler()
  123. {
  124. if (thread_)
  125. {
  126. mutex::scoped_lock lock(mutex_);
  127. shutdown_ = true;
  128. stop_all_threads(lock);
  129. lock.unlock();
  130. thread_->join();
  131. delete thread_;
  132. }
  133. }
  134. void scheduler::shutdown()
  135. {
  136. mutex::scoped_lock lock(mutex_);
  137. shutdown_ = true;
  138. if (thread_)
  139. stop_all_threads(lock);
  140. lock.unlock();
  141. // Join thread to ensure task operation is returned to queue.
  142. if (thread_)
  143. {
  144. thread_->join();
  145. delete thread_;
  146. thread_ = 0;
  147. }
  148. // Destroy handler objects.
  149. while (!op_queue_.empty())
  150. {
  151. operation* o = op_queue_.front();
  152. op_queue_.pop();
  153. if (o != &task_operation_)
  154. o->destroy();
  155. }
  156. // Reset to initial state.
  157. task_ = 0;
  158. }
  159. void scheduler::init_task()
  160. {
  161. mutex::scoped_lock lock(mutex_);
  162. if (!shutdown_ && !task_)
  163. {
  164. task_ = get_task_(this->context());
  165. op_queue_.push(&task_operation_);
  166. wake_one_thread_and_unlock(lock);
  167. }
  168. }
  169. std::size_t scheduler::run(boost::system::error_code& ec)
  170. {
  171. ec = boost::system::error_code();
  172. if (outstanding_work_ == 0)
  173. {
  174. stop();
  175. return 0;
  176. }
  177. thread_info this_thread;
  178. this_thread.private_outstanding_work = 0;
  179. thread_call_stack::context ctx(this, this_thread);
  180. mutex::scoped_lock lock(mutex_);
  181. std::size_t n = 0;
  182. for (; do_run_one(lock, this_thread, ec); lock.lock())
  183. if (n != (std::numeric_limits<std::size_t>::max)())
  184. ++n;
  185. return n;
  186. }
  187. std::size_t scheduler::run_one(boost::system::error_code& ec)
  188. {
  189. ec = boost::system::error_code();
  190. if (outstanding_work_ == 0)
  191. {
  192. stop();
  193. return 0;
  194. }
  195. thread_info this_thread;
  196. this_thread.private_outstanding_work = 0;
  197. thread_call_stack::context ctx(this, this_thread);
  198. mutex::scoped_lock lock(mutex_);
  199. return do_run_one(lock, this_thread, ec);
  200. }
  201. std::size_t scheduler::wait_one(long usec, boost::system::error_code& ec)
  202. {
  203. ec = boost::system::error_code();
  204. if (outstanding_work_ == 0)
  205. {
  206. stop();
  207. return 0;
  208. }
  209. thread_info this_thread;
  210. this_thread.private_outstanding_work = 0;
  211. thread_call_stack::context ctx(this, this_thread);
  212. mutex::scoped_lock lock(mutex_);
  213. return do_wait_one(lock, this_thread, usec, ec);
  214. }
  215. std::size_t scheduler::poll(boost::system::error_code& ec)
  216. {
  217. ec = boost::system::error_code();
  218. if (outstanding_work_ == 0)
  219. {
  220. stop();
  221. return 0;
  222. }
  223. thread_info this_thread;
  224. this_thread.private_outstanding_work = 0;
  225. thread_call_stack::context ctx(this, this_thread);
  226. mutex::scoped_lock lock(mutex_);
  227. #if defined(BOOST_ASIO_HAS_THREADS)
  228. // We want to support nested calls to poll() and poll_one(), so any handlers
  229. // that are already on a thread-private queue need to be put on to the main
  230. // queue now.
  231. if (one_thread_)
  232. if (thread_info* outer_info = static_cast<thread_info*>(ctx.next_by_key()))
  233. op_queue_.push(outer_info->private_op_queue);
  234. #endif // defined(BOOST_ASIO_HAS_THREADS)
  235. std::size_t n = 0;
  236. for (; do_poll_one(lock, this_thread, ec); lock.lock())
  237. if (n != (std::numeric_limits<std::size_t>::max)())
  238. ++n;
  239. return n;
  240. }
  241. std::size_t scheduler::poll_one(boost::system::error_code& ec)
  242. {
  243. ec = boost::system::error_code();
  244. if (outstanding_work_ == 0)
  245. {
  246. stop();
  247. return 0;
  248. }
  249. thread_info this_thread;
  250. this_thread.private_outstanding_work = 0;
  251. thread_call_stack::context ctx(this, this_thread);
  252. mutex::scoped_lock lock(mutex_);
  253. #if defined(BOOST_ASIO_HAS_THREADS)
  254. // We want to support nested calls to poll() and poll_one(), so any handlers
  255. // that are already on a thread-private queue need to be put on to the main
  256. // queue now.
  257. if (one_thread_)
  258. if (thread_info* outer_info = static_cast<thread_info*>(ctx.next_by_key()))
  259. op_queue_.push(outer_info->private_op_queue);
  260. #endif // defined(BOOST_ASIO_HAS_THREADS)
  261. return do_poll_one(lock, this_thread, ec);
  262. }
  263. void scheduler::stop()
  264. {
  265. mutex::scoped_lock lock(mutex_);
  266. stop_all_threads(lock);
  267. }
  268. bool scheduler::stopped() const
  269. {
  270. mutex::scoped_lock lock(mutex_);
  271. return stopped_;
  272. }
  273. void scheduler::restart()
  274. {
  275. mutex::scoped_lock lock(mutex_);
  276. stopped_ = false;
  277. }
  278. void scheduler::compensating_work_started()
  279. {
  280. thread_info_base* this_thread = thread_call_stack::contains(this);
  281. BOOST_ASIO_ASSUME(this_thread != 0); // Only called from inside scheduler.
  282. ++static_cast<thread_info*>(this_thread)->private_outstanding_work;
  283. }
  284. bool scheduler::can_dispatch()
  285. {
  286. return thread_call_stack::contains(this) != 0;
  287. }
  288. void scheduler::capture_current_exception()
  289. {
  290. if (thread_info_base* this_thread = thread_call_stack::contains(this))
  291. this_thread->capture_current_exception();
  292. }
  293. void scheduler::post_immediate_completion(
  294. scheduler::operation* op, bool is_continuation)
  295. {
  296. #if defined(BOOST_ASIO_HAS_THREADS)
  297. if (one_thread_ || is_continuation)
  298. {
  299. if (thread_info_base* this_thread = thread_call_stack::contains(this))
  300. {
  301. ++static_cast<thread_info*>(this_thread)->private_outstanding_work;
  302. static_cast<thread_info*>(this_thread)->private_op_queue.push(op);
  303. return;
  304. }
  305. }
  306. #else // defined(BOOST_ASIO_HAS_THREADS)
  307. (void)is_continuation;
  308. #endif // defined(BOOST_ASIO_HAS_THREADS)
  309. work_started();
  310. mutex::scoped_lock lock(mutex_);
  311. op_queue_.push(op);
  312. wake_one_thread_and_unlock(lock);
  313. }
  314. void scheduler::post_immediate_completions(std::size_t n,
  315. op_queue<scheduler::operation>& ops, bool is_continuation)
  316. {
  317. #if defined(BOOST_ASIO_HAS_THREADS)
  318. if (one_thread_ || is_continuation)
  319. {
  320. if (thread_info_base* this_thread = thread_call_stack::contains(this))
  321. {
  322. static_cast<thread_info*>(this_thread)->private_outstanding_work
  323. += static_cast<long>(n);
  324. static_cast<thread_info*>(this_thread)->private_op_queue.push(ops);
  325. return;
  326. }
  327. }
  328. #else // defined(BOOST_ASIO_HAS_THREADS)
  329. (void)is_continuation;
  330. #endif // defined(BOOST_ASIO_HAS_THREADS)
  331. increment(outstanding_work_, static_cast<long>(n));
  332. mutex::scoped_lock lock(mutex_);
  333. op_queue_.push(ops);
  334. wake_one_thread_and_unlock(lock);
  335. }
  336. void scheduler::post_deferred_completion(scheduler::operation* op)
  337. {
  338. #if defined(BOOST_ASIO_HAS_THREADS)
  339. if (one_thread_)
  340. {
  341. if (thread_info_base* this_thread = thread_call_stack::contains(this))
  342. {
  343. static_cast<thread_info*>(this_thread)->private_op_queue.push(op);
  344. return;
  345. }
  346. }
  347. #endif // defined(BOOST_ASIO_HAS_THREADS)
  348. mutex::scoped_lock lock(mutex_);
  349. op_queue_.push(op);
  350. wake_one_thread_and_unlock(lock);
  351. }
  352. void scheduler::post_deferred_completions(
  353. op_queue<scheduler::operation>& ops)
  354. {
  355. if (!ops.empty())
  356. {
  357. #if defined(BOOST_ASIO_HAS_THREADS)
  358. if (one_thread_)
  359. {
  360. if (thread_info_base* this_thread = thread_call_stack::contains(this))
  361. {
  362. static_cast<thread_info*>(this_thread)->private_op_queue.push(ops);
  363. return;
  364. }
  365. }
  366. #endif // defined(BOOST_ASIO_HAS_THREADS)
  367. mutex::scoped_lock lock(mutex_);
  368. op_queue_.push(ops);
  369. wake_one_thread_and_unlock(lock);
  370. }
  371. }
  372. void scheduler::do_dispatch(
  373. scheduler::operation* op)
  374. {
  375. work_started();
  376. mutex::scoped_lock lock(mutex_);
  377. op_queue_.push(op);
  378. wake_one_thread_and_unlock(lock);
  379. }
  380. void scheduler::abandon_operations(
  381. op_queue<scheduler::operation>& ops)
  382. {
  383. op_queue<scheduler::operation> ops2;
  384. ops2.push(ops);
  385. }
  386. std::size_t scheduler::do_run_one(mutex::scoped_lock& lock,
  387. scheduler::thread_info& this_thread,
  388. const boost::system::error_code& ec)
  389. {
  390. while (!stopped_)
  391. {
  392. if (!op_queue_.empty())
  393. {
  394. // Prepare to execute first handler from queue.
  395. operation* o = op_queue_.front();
  396. op_queue_.pop();
  397. bool more_handlers = (!op_queue_.empty());
  398. if (o == &task_operation_)
  399. {
  400. task_interrupted_ = more_handlers;
  401. if (more_handlers && !one_thread_)
  402. wakeup_event_.unlock_and_signal_one(lock);
  403. else
  404. lock.unlock();
  405. task_cleanup on_exit = { this, &lock, &this_thread };
  406. (void)on_exit;
  407. // Run the task. May throw an exception. Only block if the operation
  408. // queue is empty and we're not polling, otherwise we want to return
  409. // as soon as possible.
  410. task_->run(more_handlers ? 0 : -1, this_thread.private_op_queue);
  411. }
  412. else
  413. {
  414. std::size_t task_result = o->task_result_;
  415. if (more_handlers && !one_thread_)
  416. wake_one_thread_and_unlock(lock);
  417. else
  418. lock.unlock();
  419. // Ensure the count of outstanding work is decremented on block exit.
  420. work_cleanup on_exit = { this, &lock, &this_thread };
  421. (void)on_exit;
  422. // Complete the operation. May throw an exception. Deletes the object.
  423. o->complete(this, ec, task_result);
  424. this_thread.rethrow_pending_exception();
  425. return 1;
  426. }
  427. }
  428. else
  429. {
  430. wakeup_event_.clear(lock);
  431. wakeup_event_.wait(lock);
  432. }
  433. }
  434. return 0;
  435. }
  436. std::size_t scheduler::do_wait_one(mutex::scoped_lock& lock,
  437. scheduler::thread_info& this_thread, long usec,
  438. const boost::system::error_code& ec)
  439. {
  440. if (stopped_)
  441. return 0;
  442. operation* o = op_queue_.front();
  443. if (o == 0)
  444. {
  445. wakeup_event_.clear(lock);
  446. wakeup_event_.wait_for_usec(lock, usec);
  447. usec = 0; // Wait at most once.
  448. o = op_queue_.front();
  449. }
  450. if (o == &task_operation_)
  451. {
  452. op_queue_.pop();
  453. bool more_handlers = (!op_queue_.empty());
  454. task_interrupted_ = more_handlers;
  455. if (more_handlers && !one_thread_)
  456. wakeup_event_.unlock_and_signal_one(lock);
  457. else
  458. lock.unlock();
  459. {
  460. task_cleanup on_exit = { this, &lock, &this_thread };
  461. (void)on_exit;
  462. // Run the task. May throw an exception. Only block if the operation
  463. // queue is empty and we're not polling, otherwise we want to return
  464. // as soon as possible.
  465. task_->run(more_handlers ? 0 : usec, this_thread.private_op_queue);
  466. }
  467. o = op_queue_.front();
  468. if (o == &task_operation_)
  469. {
  470. if (!one_thread_)
  471. wakeup_event_.maybe_unlock_and_signal_one(lock);
  472. return 0;
  473. }
  474. }
  475. if (o == 0)
  476. return 0;
  477. op_queue_.pop();
  478. bool more_handlers = (!op_queue_.empty());
  479. std::size_t task_result = o->task_result_;
  480. if (more_handlers && !one_thread_)
  481. wake_one_thread_and_unlock(lock);
  482. else
  483. lock.unlock();
  484. // Ensure the count of outstanding work is decremented on block exit.
  485. work_cleanup on_exit = { this, &lock, &this_thread };
  486. (void)on_exit;
  487. // Complete the operation. May throw an exception. Deletes the object.
  488. o->complete(this, ec, task_result);
  489. this_thread.rethrow_pending_exception();
  490. return 1;
  491. }
  492. std::size_t scheduler::do_poll_one(mutex::scoped_lock& lock,
  493. scheduler::thread_info& this_thread,
  494. const boost::system::error_code& ec)
  495. {
  496. if (stopped_)
  497. return 0;
  498. operation* o = op_queue_.front();
  499. if (o == &task_operation_)
  500. {
  501. op_queue_.pop();
  502. lock.unlock();
  503. {
  504. task_cleanup c = { this, &lock, &this_thread };
  505. (void)c;
  506. // Run the task. May throw an exception. Only block if the operation
  507. // queue is empty and we're not polling, otherwise we want to return
  508. // as soon as possible.
  509. task_->run(0, this_thread.private_op_queue);
  510. }
  511. o = op_queue_.front();
  512. if (o == &task_operation_)
  513. {
  514. wakeup_event_.maybe_unlock_and_signal_one(lock);
  515. return 0;
  516. }
  517. }
  518. if (o == 0)
  519. return 0;
  520. op_queue_.pop();
  521. bool more_handlers = (!op_queue_.empty());
  522. std::size_t task_result = o->task_result_;
  523. if (more_handlers && !one_thread_)
  524. wake_one_thread_and_unlock(lock);
  525. else
  526. lock.unlock();
  527. // Ensure the count of outstanding work is decremented on block exit.
  528. work_cleanup on_exit = { this, &lock, &this_thread };
  529. (void)on_exit;
  530. // Complete the operation. May throw an exception. Deletes the object.
  531. o->complete(this, ec, task_result);
  532. this_thread.rethrow_pending_exception();
  533. return 1;
  534. }
  535. void scheduler::stop_all_threads(
  536. mutex::scoped_lock& lock)
  537. {
  538. stopped_ = true;
  539. wakeup_event_.signal_all(lock);
  540. if (!task_interrupted_ && task_)
  541. {
  542. task_interrupted_ = true;
  543. task_->interrupt();
  544. }
  545. }
  546. void scheduler::wake_one_thread_and_unlock(
  547. mutex::scoped_lock& lock)
  548. {
  549. if (!wakeup_event_.maybe_unlock_and_signal_one(lock))
  550. {
  551. if (!task_interrupted_ && task_)
  552. {
  553. task_interrupted_ = true;
  554. task_->interrupt();
  555. }
  556. lock.unlock();
  557. }
  558. }
  559. scheduler_task* scheduler::get_default_task(boost::asio::execution_context& ctx)
  560. {
  561. #if defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
  562. return &use_service<io_uring_service>(ctx);
  563. #else // defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
  564. return &use_service<reactor>(ctx);
  565. #endif // defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
  566. }
  567. } // namespace detail
  568. } // namespace asio
  569. } // namespace boost
  570. #include <boost/asio/detail/pop_options.hpp>
  571. #endif // BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP