win_iocp_io_context.ipp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614
  1. //
  2. // detail/impl/win_iocp_io_context.ipp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef ASIO_DETAIL_IMPL_WIN_IOCP_IO_CONTEXT_IPP
  11. #define ASIO_DETAIL_IMPL_WIN_IOCP_IO_CONTEXT_IPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include "asio/detail/config.hpp"
  16. #if defined(ASIO_HAS_IOCP)
  17. #include "asio/error.hpp"
  18. #include "asio/detail/cstdint.hpp"
  19. #include "asio/detail/handler_alloc_helpers.hpp"
  20. #include "asio/detail/limits.hpp"
  21. #include "asio/detail/thread.hpp"
  22. #include "asio/detail/throw_error.hpp"
  23. #include "asio/detail/win_iocp_io_context.hpp"
  24. #include "asio/detail/push_options.hpp"
  25. namespace asio {
  26. namespace detail {
  27. struct win_iocp_io_context::thread_function
  28. {
  29. explicit thread_function(win_iocp_io_context* s)
  30. : this_(s)
  31. {
  32. }
  33. void operator()()
  34. {
  35. asio::error_code ec;
  36. this_->run(ec);
  37. }
  38. win_iocp_io_context* this_;
  39. };
  40. struct win_iocp_io_context::work_finished_on_block_exit
  41. {
  42. ~work_finished_on_block_exit() noexcept(false)
  43. {
  44. io_context_->work_finished();
  45. }
  46. win_iocp_io_context* io_context_;
  47. };
  48. struct win_iocp_io_context::timer_thread_function
  49. {
  50. void operator()()
  51. {
  52. while (::InterlockedExchangeAdd(&io_context_->shutdown_, 0) == 0)
  53. {
  54. if (::WaitForSingleObject(io_context_->waitable_timer_.handle,
  55. INFINITE) == WAIT_OBJECT_0)
  56. {
  57. ::InterlockedExchange(&io_context_->dispatch_required_, 1);
  58. ::PostQueuedCompletionStatus(io_context_->iocp_.handle,
  59. 0, wake_for_dispatch, 0);
  60. }
  61. }
  62. }
  63. win_iocp_io_context* io_context_;
  64. };
  65. win_iocp_io_context::win_iocp_io_context(
  66. asio::execution_context& ctx, int concurrency_hint, bool own_thread)
  67. : execution_context_service_base<win_iocp_io_context>(ctx),
  68. iocp_(),
  69. outstanding_work_(0),
  70. stopped_(0),
  71. stop_event_posted_(0),
  72. shutdown_(0),
  73. gqcs_timeout_(get_gqcs_timeout()),
  74. dispatch_required_(0),
  75. concurrency_hint_(concurrency_hint)
  76. {
  77. ASIO_HANDLER_TRACKING_INIT;
  78. iocp_.handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0,
  79. static_cast<DWORD>(concurrency_hint >= 0 ? concurrency_hint : DWORD(~0)));
  80. if (!iocp_.handle)
  81. {
  82. DWORD last_error = ::GetLastError();
  83. asio::error_code ec(last_error,
  84. asio::error::get_system_category());
  85. asio::detail::throw_error(ec, "iocp");
  86. }
  87. if (own_thread)
  88. {
  89. ::InterlockedIncrement(&outstanding_work_);
  90. thread_.reset(new asio::detail::thread(thread_function(this)));
  91. }
  92. }
  93. win_iocp_io_context::~win_iocp_io_context()
  94. {
  95. if (thread_.get())
  96. {
  97. stop();
  98. thread_->join();
  99. thread_.reset();
  100. }
  101. }
  102. void win_iocp_io_context::shutdown()
  103. {
  104. ::InterlockedExchange(&shutdown_, 1);
  105. if (timer_thread_.get())
  106. {
  107. LARGE_INTEGER timeout;
  108. timeout.QuadPart = 1;
  109. ::SetWaitableTimer(waitable_timer_.handle, &timeout, 1, 0, 0, FALSE);
  110. }
  111. if (thread_.get())
  112. {
  113. stop();
  114. thread_->join();
  115. thread_.reset();
  116. ::InterlockedDecrement(&outstanding_work_);
  117. }
  118. while (::InterlockedExchangeAdd(&outstanding_work_, 0) > 0)
  119. {
  120. op_queue<win_iocp_operation> ops;
  121. timer_queues_.get_all_timers(ops);
  122. ops.push(completed_ops_);
  123. if (!ops.empty())
  124. {
  125. while (win_iocp_operation* op = ops.front())
  126. {
  127. ops.pop();
  128. ::InterlockedDecrement(&outstanding_work_);
  129. op->destroy();
  130. }
  131. }
  132. else
  133. {
  134. DWORD bytes_transferred = 0;
  135. dword_ptr_t completion_key = 0;
  136. LPOVERLAPPED overlapped = 0;
  137. ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
  138. &completion_key, &overlapped, gqcs_timeout_);
  139. if (overlapped)
  140. {
  141. ::InterlockedDecrement(&outstanding_work_);
  142. static_cast<win_iocp_operation*>(overlapped)->destroy();
  143. }
  144. }
  145. }
  146. if (timer_thread_.get())
  147. {
  148. timer_thread_->join();
  149. timer_thread_.reset();
  150. }
  151. }
  152. asio::error_code win_iocp_io_context::register_handle(
  153. HANDLE handle, asio::error_code& ec)
  154. {
  155. if (::CreateIoCompletionPort(handle, iocp_.handle, 0, 0) == 0)
  156. {
  157. DWORD last_error = ::GetLastError();
  158. ec = asio::error_code(last_error,
  159. asio::error::get_system_category());
  160. }
  161. else
  162. {
  163. ec = asio::error_code();
  164. }
  165. return ec;
  166. }
  167. size_t win_iocp_io_context::run(asio::error_code& ec)
  168. {
  169. if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
  170. {
  171. stop();
  172. ec = asio::error_code();
  173. return 0;
  174. }
  175. win_iocp_thread_info this_thread;
  176. thread_call_stack::context ctx(this, this_thread);
  177. size_t n = 0;
  178. while (do_one(INFINITE, this_thread, ec))
  179. if (n != (std::numeric_limits<size_t>::max)())
  180. ++n;
  181. return n;
  182. }
  183. size_t win_iocp_io_context::run_one(asio::error_code& ec)
  184. {
  185. if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
  186. {
  187. stop();
  188. ec = asio::error_code();
  189. return 0;
  190. }
  191. win_iocp_thread_info this_thread;
  192. thread_call_stack::context ctx(this, this_thread);
  193. return do_one(INFINITE, this_thread, ec);
  194. }
  195. size_t win_iocp_io_context::wait_one(long usec, asio::error_code& ec)
  196. {
  197. if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
  198. {
  199. stop();
  200. ec = asio::error_code();
  201. return 0;
  202. }
  203. win_iocp_thread_info this_thread;
  204. thread_call_stack::context ctx(this, this_thread);
  205. return do_one(usec < 0 ? INFINITE : ((usec - 1) / 1000 + 1), this_thread, ec);
  206. }
  207. size_t win_iocp_io_context::poll(asio::error_code& ec)
  208. {
  209. if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
  210. {
  211. stop();
  212. ec = asio::error_code();
  213. return 0;
  214. }
  215. win_iocp_thread_info this_thread;
  216. thread_call_stack::context ctx(this, this_thread);
  217. size_t n = 0;
  218. while (do_one(0, this_thread, ec))
  219. if (n != (std::numeric_limits<size_t>::max)())
  220. ++n;
  221. return n;
  222. }
  223. size_t win_iocp_io_context::poll_one(asio::error_code& ec)
  224. {
  225. if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
  226. {
  227. stop();
  228. ec = asio::error_code();
  229. return 0;
  230. }
  231. win_iocp_thread_info this_thread;
  232. thread_call_stack::context ctx(this, this_thread);
  233. return do_one(0, this_thread, ec);
  234. }
  235. void win_iocp_io_context::stop()
  236. {
  237. if (::InterlockedExchange(&stopped_, 1) == 0)
  238. {
  239. if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
  240. {
  241. if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
  242. {
  243. DWORD last_error = ::GetLastError();
  244. asio::error_code ec(last_error,
  245. asio::error::get_system_category());
  246. asio::detail::throw_error(ec, "pqcs");
  247. }
  248. }
  249. }
  250. }
  251. bool win_iocp_io_context::can_dispatch()
  252. {
  253. return thread_call_stack::contains(this) != 0;
  254. }
  255. void win_iocp_io_context::capture_current_exception()
  256. {
  257. if (thread_info_base* this_thread = thread_call_stack::contains(this))
  258. this_thread->capture_current_exception();
  259. }
  260. void win_iocp_io_context::post_deferred_completion(win_iocp_operation* op)
  261. {
  262. // Flag the operation as ready.
  263. op->ready_ = 1;
  264. // Enqueue the operation on the I/O completion port.
  265. if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, op))
  266. {
  267. // Out of resources. Put on completed queue instead.
  268. mutex::scoped_lock lock(dispatch_mutex_);
  269. completed_ops_.push(op);
  270. ::InterlockedExchange(&dispatch_required_, 1);
  271. }
  272. }
  273. void win_iocp_io_context::post_deferred_completions(
  274. op_queue<win_iocp_operation>& ops)
  275. {
  276. while (win_iocp_operation* op = ops.front())
  277. {
  278. ops.pop();
  279. // Flag the operation as ready.
  280. op->ready_ = 1;
  281. // Enqueue the operation on the I/O completion port.
  282. if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, op))
  283. {
  284. // Out of resources. Put on completed queue instead.
  285. mutex::scoped_lock lock(dispatch_mutex_);
  286. completed_ops_.push(op);
  287. completed_ops_.push(ops);
  288. ::InterlockedExchange(&dispatch_required_, 1);
  289. }
  290. }
  291. }
  292. void win_iocp_io_context::abandon_operations(
  293. op_queue<win_iocp_operation>& ops)
  294. {
  295. while (win_iocp_operation* op = ops.front())
  296. {
  297. ops.pop();
  298. ::InterlockedDecrement(&outstanding_work_);
  299. op->destroy();
  300. }
  301. }
  302. void win_iocp_io_context::on_pending(win_iocp_operation* op)
  303. {
  304. if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
  305. {
  306. // Enqueue the operation on the I/O completion port.
  307. if (!::PostQueuedCompletionStatus(iocp_.handle,
  308. 0, overlapped_contains_result, op))
  309. {
  310. // Out of resources. Put on completed queue instead.
  311. mutex::scoped_lock lock(dispatch_mutex_);
  312. completed_ops_.push(op);
  313. ::InterlockedExchange(&dispatch_required_, 1);
  314. }
  315. }
  316. }
  317. void win_iocp_io_context::on_completion(win_iocp_operation* op,
  318. DWORD last_error, DWORD bytes_transferred)
  319. {
  320. // Flag that the operation is ready for invocation.
  321. op->ready_ = 1;
  322. // Store results in the OVERLAPPED structure.
  323. op->Internal = reinterpret_cast<ulong_ptr_t>(
  324. &asio::error::get_system_category());
  325. op->Offset = last_error;
  326. op->OffsetHigh = bytes_transferred;
  327. // Enqueue the operation on the I/O completion port.
  328. if (!::PostQueuedCompletionStatus(iocp_.handle,
  329. 0, overlapped_contains_result, op))
  330. {
  331. // Out of resources. Put on completed queue instead.
  332. mutex::scoped_lock lock(dispatch_mutex_);
  333. completed_ops_.push(op);
  334. ::InterlockedExchange(&dispatch_required_, 1);
  335. }
  336. }
  337. void win_iocp_io_context::on_completion(win_iocp_operation* op,
  338. const asio::error_code& ec, DWORD bytes_transferred)
  339. {
  340. // Flag that the operation is ready for invocation.
  341. op->ready_ = 1;
  342. // Store results in the OVERLAPPED structure.
  343. op->Internal = reinterpret_cast<ulong_ptr_t>(&ec.category());
  344. op->Offset = ec.value();
  345. op->OffsetHigh = bytes_transferred;
  346. // Enqueue the operation on the I/O completion port.
  347. if (!::PostQueuedCompletionStatus(iocp_.handle,
  348. 0, overlapped_contains_result, op))
  349. {
  350. // Out of resources. Put on completed queue instead.
  351. mutex::scoped_lock lock(dispatch_mutex_);
  352. completed_ops_.push(op);
  353. ::InterlockedExchange(&dispatch_required_, 1);
  354. }
  355. }
  356. size_t win_iocp_io_context::do_one(DWORD msec,
  357. win_iocp_thread_info& this_thread, asio::error_code& ec)
  358. {
  359. for (;;)
  360. {
  361. // Try to acquire responsibility for dispatching timers and completed ops.
  362. if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1)
  363. {
  364. mutex::scoped_lock lock(dispatch_mutex_);
  365. // Dispatch pending timers and operations.
  366. op_queue<win_iocp_operation> ops;
  367. ops.push(completed_ops_);
  368. timer_queues_.get_ready_timers(ops);
  369. post_deferred_completions(ops);
  370. update_timeout();
  371. }
  372. // Get the next operation from the queue.
  373. DWORD bytes_transferred = 0;
  374. dword_ptr_t completion_key = 0;
  375. LPOVERLAPPED overlapped = 0;
  376. ::SetLastError(0);
  377. BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle,
  378. &bytes_transferred, &completion_key, &overlapped,
  379. msec < gqcs_timeout_ ? msec : gqcs_timeout_);
  380. DWORD last_error = ::GetLastError();
  381. if (overlapped)
  382. {
  383. win_iocp_operation* op = static_cast<win_iocp_operation*>(overlapped);
  384. asio::error_code result_ec(last_error,
  385. asio::error::get_system_category());
  386. // We may have been passed the last_error and bytes_transferred in the
  387. // OVERLAPPED structure itself.
  388. if (completion_key == overlapped_contains_result)
  389. {
  390. result_ec = asio::error_code(static_cast<int>(op->Offset),
  391. *reinterpret_cast<asio::error_category*>(op->Internal));
  392. bytes_transferred = op->OffsetHigh;
  393. }
  394. // Otherwise ensure any result has been saved into the OVERLAPPED
  395. // structure.
  396. else
  397. {
  398. op->Internal = reinterpret_cast<ulong_ptr_t>(&result_ec.category());
  399. op->Offset = result_ec.value();
  400. op->OffsetHigh = bytes_transferred;
  401. }
  402. // Dispatch the operation only if ready. The operation may not be ready
  403. // if the initiating function (e.g. a call to WSARecv) has not yet
  404. // returned. This is because the initiating function still wants access
  405. // to the operation's OVERLAPPED structure.
  406. if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
  407. {
  408. // Ensure the count of outstanding work is decremented on block exit.
  409. work_finished_on_block_exit on_exit = { this };
  410. (void)on_exit;
  411. op->complete(this, result_ec, bytes_transferred);
  412. this_thread.rethrow_pending_exception();
  413. ec = asio::error_code();
  414. return 1;
  415. }
  416. }
  417. else if (!ok)
  418. {
  419. if (last_error != WAIT_TIMEOUT)
  420. {
  421. ec = asio::error_code(last_error,
  422. asio::error::get_system_category());
  423. return 0;
  424. }
  425. // If we're waiting indefinitely we need to keep going until we get a
  426. // real handler.
  427. if (msec == INFINITE)
  428. continue;
  429. ec = asio::error_code();
  430. return 0;
  431. }
  432. else if (completion_key == wake_for_dispatch)
  433. {
  434. // We have been woken up to try to acquire responsibility for dispatching
  435. // timers and completed operations.
  436. }
  437. else
  438. {
  439. // Indicate that there is no longer an in-flight stop event.
  440. ::InterlockedExchange(&stop_event_posted_, 0);
  441. // The stopped_ flag is always checked to ensure that any leftover
  442. // stop events from a previous run invocation are ignored.
  443. if (::InterlockedExchangeAdd(&stopped_, 0) != 0)
  444. {
  445. // Wake up next thread that is blocked on GetQueuedCompletionStatus.
  446. if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
  447. {
  448. if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
  449. {
  450. last_error = ::GetLastError();
  451. ec = asio::error_code(last_error,
  452. asio::error::get_system_category());
  453. return 0;
  454. }
  455. }
  456. ec = asio::error_code();
  457. return 0;
  458. }
  459. }
  460. }
  461. }
  462. DWORD win_iocp_io_context::get_gqcs_timeout()
  463. {
  464. #if !defined(_WIN32_WINNT) || (_WIN32_WINNT < 0x0600)
  465. OSVERSIONINFOEX osvi;
  466. ZeroMemory(&osvi, sizeof(osvi));
  467. osvi.dwOSVersionInfoSize = sizeof(osvi);
  468. osvi.dwMajorVersion = 6ul;
  469. const uint64_t condition_mask = ::VerSetConditionMask(
  470. 0, VER_MAJORVERSION, VER_GREATER_EQUAL);
  471. if (!!::VerifyVersionInfo(&osvi, VER_MAJORVERSION, condition_mask))
  472. return INFINITE;
  473. return default_gqcs_timeout;
  474. #else // !defined(_WIN32_WINNT) || (_WIN32_WINNT < 0x0600)
  475. return INFINITE;
  476. #endif // !defined(_WIN32_WINNT) || (_WIN32_WINNT < 0x0600)
  477. }
  478. void win_iocp_io_context::do_add_timer_queue(timer_queue_base& queue)
  479. {
  480. mutex::scoped_lock lock(dispatch_mutex_);
  481. timer_queues_.insert(&queue);
  482. if (!waitable_timer_.handle)
  483. {
  484. waitable_timer_.handle = ::CreateWaitableTimer(0, FALSE, 0);
  485. if (waitable_timer_.handle == 0)
  486. {
  487. DWORD last_error = ::GetLastError();
  488. asio::error_code ec(last_error,
  489. asio::error::get_system_category());
  490. asio::detail::throw_error(ec, "timer");
  491. }
  492. LARGE_INTEGER timeout;
  493. timeout.QuadPart = -max_timeout_usec;
  494. timeout.QuadPart *= 10;
  495. ::SetWaitableTimer(waitable_timer_.handle,
  496. &timeout, max_timeout_msec, 0, 0, FALSE);
  497. }
  498. if (!timer_thread_.get())
  499. {
  500. timer_thread_function thread_function = { this };
  501. timer_thread_.reset(new thread(thread_function, 65536));
  502. }
  503. }
  504. void win_iocp_io_context::do_remove_timer_queue(timer_queue_base& queue)
  505. {
  506. mutex::scoped_lock lock(dispatch_mutex_);
  507. timer_queues_.erase(&queue);
  508. }
  509. void win_iocp_io_context::update_timeout()
  510. {
  511. if (timer_thread_.get())
  512. {
  513. // There's no point updating the waitable timer if the new timeout period
  514. // exceeds the maximum timeout. In that case, we might as well wait for the
  515. // existing period of the timer to expire.
  516. long timeout_usec = timer_queues_.wait_duration_usec(max_timeout_usec);
  517. if (timeout_usec < max_timeout_usec)
  518. {
  519. LARGE_INTEGER timeout;
  520. timeout.QuadPart = -timeout_usec;
  521. timeout.QuadPart *= 10;
  522. ::SetWaitableTimer(waitable_timer_.handle,
  523. &timeout, max_timeout_msec, 0, 0, FALSE);
  524. }
  525. }
  526. }
  527. } // namespace detail
  528. } // namespace asio
  529. #include "asio/detail/pop_options.hpp"
  530. #endif // defined(ASIO_HAS_IOCP)
  531. #endif // ASIO_DETAIL_IMPL_WIN_IOCP_IO_CONTEXT_IPP