dev_poll_reactor.ipp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469
  1. //
  2. // detail/impl/dev_poll_reactor.ipp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef ASIO_DETAIL_IMPL_DEV_POLL_REACTOR_IPP
  11. #define ASIO_DETAIL_IMPL_DEV_POLL_REACTOR_IPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include "asio/detail/config.hpp"
  16. #if defined(ASIO_HAS_DEV_POLL)
  17. #include "asio/detail/dev_poll_reactor.hpp"
  18. #include "asio/detail/assert.hpp"
  19. #include "asio/detail/scheduler.hpp"
  20. #include "asio/detail/throw_error.hpp"
  21. #include "asio/error.hpp"
  22. #include "asio/detail/push_options.hpp"
  23. namespace asio {
  24. namespace detail {
  25. dev_poll_reactor::dev_poll_reactor(asio::execution_context& ctx)
  26. : asio::detail::execution_context_service_base<dev_poll_reactor>(ctx),
  27. scheduler_(use_service<scheduler>(ctx)),
  28. mutex_(),
  29. dev_poll_fd_(do_dev_poll_create()),
  30. interrupter_(),
  31. shutdown_(false)
  32. {
  33. // Add the interrupter's descriptor to /dev/poll.
  34. ::pollfd ev = { 0, 0, 0 };
  35. ev.fd = interrupter_.read_descriptor();
  36. ev.events = POLLIN | POLLERR;
  37. ev.revents = 0;
  38. ::write(dev_poll_fd_, &ev, sizeof(ev));
  39. }
  40. dev_poll_reactor::~dev_poll_reactor()
  41. {
  42. shutdown();
  43. ::close(dev_poll_fd_);
  44. }
  45. void dev_poll_reactor::shutdown()
  46. {
  47. asio::detail::mutex::scoped_lock lock(mutex_);
  48. shutdown_ = true;
  49. lock.unlock();
  50. op_queue<operation> ops;
  51. for (int i = 0; i < max_ops; ++i)
  52. op_queue_[i].get_all_operations(ops);
  53. timer_queues_.get_all_timers(ops);
  54. scheduler_.abandon_operations(ops);
  55. }
  56. void dev_poll_reactor::notify_fork(
  57. asio::execution_context::fork_event fork_ev)
  58. {
  59. if (fork_ev == asio::execution_context::fork_child)
  60. {
  61. detail::mutex::scoped_lock lock(mutex_);
  62. if (dev_poll_fd_ != -1)
  63. ::close(dev_poll_fd_);
  64. dev_poll_fd_ = -1;
  65. dev_poll_fd_ = do_dev_poll_create();
  66. interrupter_.recreate();
  67. // Add the interrupter's descriptor to /dev/poll.
  68. ::pollfd ev = { 0, 0, 0 };
  69. ev.fd = interrupter_.read_descriptor();
  70. ev.events = POLLIN | POLLERR;
  71. ev.revents = 0;
  72. ::write(dev_poll_fd_, &ev, sizeof(ev));
  73. // Re-register all descriptors with /dev/poll. The changes will be written
  74. // to the /dev/poll descriptor the next time the reactor is run.
  75. for (int i = 0; i < max_ops; ++i)
  76. {
  77. reactor_op_queue<socket_type>::iterator iter = op_queue_[i].begin();
  78. reactor_op_queue<socket_type>::iterator end = op_queue_[i].end();
  79. for (; iter != end; ++iter)
  80. {
  81. ::pollfd& pending_ev = add_pending_event_change(iter->first);
  82. pending_ev.events |= POLLERR | POLLHUP;
  83. switch (i)
  84. {
  85. case read_op: pending_ev.events |= POLLIN; break;
  86. case write_op: pending_ev.events |= POLLOUT; break;
  87. case except_op: pending_ev.events |= POLLPRI; break;
  88. default: break;
  89. }
  90. }
  91. }
  92. interrupter_.interrupt();
  93. }
  94. }
  95. void dev_poll_reactor::init_task()
  96. {
  97. scheduler_.init_task();
  98. }
  99. int dev_poll_reactor::register_descriptor(socket_type, per_descriptor_data&)
  100. {
  101. return 0;
  102. }
  103. int dev_poll_reactor::register_internal_descriptor(int op_type,
  104. socket_type descriptor, per_descriptor_data&, reactor_op* op)
  105. {
  106. asio::detail::mutex::scoped_lock lock(mutex_);
  107. op_queue_[op_type].enqueue_operation(descriptor, op);
  108. ::pollfd& ev = add_pending_event_change(descriptor);
  109. ev.events = POLLERR | POLLHUP;
  110. switch (op_type)
  111. {
  112. case read_op: ev.events |= POLLIN; break;
  113. case write_op: ev.events |= POLLOUT; break;
  114. case except_op: ev.events |= POLLPRI; break;
  115. default: break;
  116. }
  117. interrupter_.interrupt();
  118. return 0;
  119. }
  120. void dev_poll_reactor::move_descriptor(socket_type,
  121. dev_poll_reactor::per_descriptor_data&,
  122. dev_poll_reactor::per_descriptor_data&)
  123. {
  124. }
  125. void dev_poll_reactor::call_post_immediate_completion(
  126. operation* op, bool is_continuation, const void* self)
  127. {
  128. static_cast<const dev_poll_reactor*>(self)->post_immediate_completion(
  129. op, is_continuation);
  130. }
  131. void dev_poll_reactor::start_op(int op_type, socket_type descriptor,
  132. dev_poll_reactor::per_descriptor_data&, reactor_op* op,
  133. bool is_continuation, bool allow_speculative,
  134. void (*on_immediate)(operation*, bool, const void*),
  135. const void* immediate_arg)
  136. {
  137. asio::detail::mutex::scoped_lock lock(mutex_);
  138. if (shutdown_)
  139. {
  140. on_immediate(op, is_continuation, immediate_arg);
  141. return;
  142. }
  143. if (allow_speculative)
  144. {
  145. if (op_type != read_op || !op_queue_[except_op].has_operation(descriptor))
  146. {
  147. if (!op_queue_[op_type].has_operation(descriptor))
  148. {
  149. if (op->perform())
  150. {
  151. lock.unlock();
  152. on_immediate(op, is_continuation, immediate_arg);
  153. return;
  154. }
  155. }
  156. }
  157. }
  158. bool first = op_queue_[op_type].enqueue_operation(descriptor, op);
  159. scheduler_.work_started();
  160. if (first)
  161. {
  162. ::pollfd& ev = add_pending_event_change(descriptor);
  163. ev.events = POLLERR | POLLHUP;
  164. if (op_type == read_op
  165. || op_queue_[read_op].has_operation(descriptor))
  166. ev.events |= POLLIN;
  167. if (op_type == write_op
  168. || op_queue_[write_op].has_operation(descriptor))
  169. ev.events |= POLLOUT;
  170. if (op_type == except_op
  171. || op_queue_[except_op].has_operation(descriptor))
  172. ev.events |= POLLPRI;
  173. interrupter_.interrupt();
  174. }
  175. }
  176. void dev_poll_reactor::cancel_ops(socket_type descriptor,
  177. dev_poll_reactor::per_descriptor_data&)
  178. {
  179. asio::detail::mutex::scoped_lock lock(mutex_);
  180. cancel_ops_unlocked(descriptor, asio::error::operation_aborted);
  181. }
  182. void dev_poll_reactor::cancel_ops_by_key(socket_type descriptor,
  183. dev_poll_reactor::per_descriptor_data&,
  184. int op_type, void* cancellation_key)
  185. {
  186. asio::detail::mutex::scoped_lock lock(mutex_);
  187. op_queue<operation> ops;
  188. bool need_interrupt = op_queue_[op_type].cancel_operations_by_key(
  189. descriptor, ops, cancellation_key, asio::error::operation_aborted);
  190. scheduler_.post_deferred_completions(ops);
  191. if (need_interrupt)
  192. interrupter_.interrupt();
  193. }
  194. void dev_poll_reactor::deregister_descriptor(socket_type descriptor,
  195. dev_poll_reactor::per_descriptor_data&, bool)
  196. {
  197. asio::detail::mutex::scoped_lock lock(mutex_);
  198. // Remove the descriptor from /dev/poll.
  199. ::pollfd& ev = add_pending_event_change(descriptor);
  200. ev.events = POLLREMOVE;
  201. interrupter_.interrupt();
  202. // Cancel any outstanding operations associated with the descriptor.
  203. cancel_ops_unlocked(descriptor, asio::error::operation_aborted);
  204. }
  205. void dev_poll_reactor::deregister_internal_descriptor(
  206. socket_type descriptor, dev_poll_reactor::per_descriptor_data&)
  207. {
  208. asio::detail::mutex::scoped_lock lock(mutex_);
  209. // Remove the descriptor from /dev/poll. Since this function is only called
  210. // during a fork, we can apply the change immediately.
  211. ::pollfd ev = { 0, 0, 0 };
  212. ev.fd = descriptor;
  213. ev.events = POLLREMOVE;
  214. ev.revents = 0;
  215. ::write(dev_poll_fd_, &ev, sizeof(ev));
  216. // Destroy all operations associated with the descriptor.
  217. op_queue<operation> ops;
  218. asio::error_code ec;
  219. for (int i = 0; i < max_ops; ++i)
  220. op_queue_[i].cancel_operations(descriptor, ops, ec);
  221. }
  222. void dev_poll_reactor::cleanup_descriptor_data(
  223. dev_poll_reactor::per_descriptor_data&)
  224. {
  225. }
  226. void dev_poll_reactor::run(long usec, op_queue<operation>& ops)
  227. {
  228. asio::detail::mutex::scoped_lock lock(mutex_);
  229. // We can return immediately if there's no work to do and the reactor is
  230. // not supposed to block.
  231. if (usec == 0 && op_queue_[read_op].empty() && op_queue_[write_op].empty()
  232. && op_queue_[except_op].empty() && timer_queues_.all_empty())
  233. return;
  234. // Write the pending event registration changes to the /dev/poll descriptor.
  235. std::size_t events_size = sizeof(::pollfd) * pending_event_changes_.size();
  236. if (events_size > 0)
  237. {
  238. errno = 0;
  239. int result = ::write(dev_poll_fd_,
  240. &pending_event_changes_[0], events_size);
  241. if (result != static_cast<int>(events_size))
  242. {
  243. asio::error_code ec = asio::error_code(
  244. errno, asio::error::get_system_category());
  245. for (std::size_t i = 0; i < pending_event_changes_.size(); ++i)
  246. {
  247. int descriptor = pending_event_changes_[i].fd;
  248. for (int j = 0; j < max_ops; ++j)
  249. op_queue_[j].cancel_operations(descriptor, ops, ec);
  250. }
  251. }
  252. pending_event_changes_.clear();
  253. pending_event_change_index_.clear();
  254. }
  255. // Calculate timeout.
  256. int timeout;
  257. if (usec == 0)
  258. timeout = 0;
  259. else
  260. {
  261. timeout = (usec < 0) ? -1 : ((usec - 1) / 1000 + 1);
  262. timeout = get_timeout(timeout);
  263. }
  264. lock.unlock();
  265. // Block on the /dev/poll descriptor.
  266. ::pollfd events[128] = { { 0, 0, 0 } };
  267. ::dvpoll dp = { 0, 0, 0 };
  268. dp.dp_fds = events;
  269. dp.dp_nfds = 128;
  270. dp.dp_timeout = timeout;
  271. int num_events = ::ioctl(dev_poll_fd_, DP_POLL, &dp);
  272. lock.lock();
  273. // Dispatch the waiting events.
  274. for (int i = 0; i < num_events; ++i)
  275. {
  276. int descriptor = events[i].fd;
  277. if (descriptor == interrupter_.read_descriptor())
  278. {
  279. interrupter_.reset();
  280. }
  281. else
  282. {
  283. bool more_reads = false;
  284. bool more_writes = false;
  285. bool more_except = false;
  286. // Exception operations must be processed first to ensure that any
  287. // out-of-band data is read before normal data.
  288. if (events[i].events & (POLLPRI | POLLERR | POLLHUP))
  289. more_except =
  290. op_queue_[except_op].perform_operations(descriptor, ops);
  291. else
  292. more_except = op_queue_[except_op].has_operation(descriptor);
  293. if (events[i].events & (POLLIN | POLLERR | POLLHUP))
  294. more_reads = op_queue_[read_op].perform_operations(descriptor, ops);
  295. else
  296. more_reads = op_queue_[read_op].has_operation(descriptor);
  297. if (events[i].events & (POLLOUT | POLLERR | POLLHUP))
  298. more_writes = op_queue_[write_op].perform_operations(descriptor, ops);
  299. else
  300. more_writes = op_queue_[write_op].has_operation(descriptor);
  301. if ((events[i].events & (POLLERR | POLLHUP)) != 0
  302. && !more_except && !more_reads && !more_writes)
  303. {
  304. // If we have an event and no operations associated with the
  305. // descriptor then we need to delete the descriptor from /dev/poll.
  306. // The poll operation can produce POLLHUP or POLLERR events when there
  307. // is no operation pending, so if we do not remove the descriptor we
  308. // can end up in a tight polling loop.
  309. ::pollfd ev = { 0, 0, 0 };
  310. ev.fd = descriptor;
  311. ev.events = POLLREMOVE;
  312. ev.revents = 0;
  313. ::write(dev_poll_fd_, &ev, sizeof(ev));
  314. }
  315. else
  316. {
  317. ::pollfd ev = { 0, 0, 0 };
  318. ev.fd = descriptor;
  319. ev.events = POLLERR | POLLHUP;
  320. if (more_reads)
  321. ev.events |= POLLIN;
  322. if (more_writes)
  323. ev.events |= POLLOUT;
  324. if (more_except)
  325. ev.events |= POLLPRI;
  326. ev.revents = 0;
  327. int result = ::write(dev_poll_fd_, &ev, sizeof(ev));
  328. if (result != sizeof(ev))
  329. {
  330. asio::error_code ec(errno,
  331. asio::error::get_system_category());
  332. for (int j = 0; j < max_ops; ++j)
  333. op_queue_[j].cancel_operations(descriptor, ops, ec);
  334. }
  335. }
  336. }
  337. }
  338. timer_queues_.get_ready_timers(ops);
  339. }
  340. void dev_poll_reactor::interrupt()
  341. {
  342. interrupter_.interrupt();
  343. }
  344. int dev_poll_reactor::do_dev_poll_create()
  345. {
  346. int fd = ::open("/dev/poll", O_RDWR);
  347. if (fd == -1)
  348. {
  349. asio::error_code ec(errno,
  350. asio::error::get_system_category());
  351. asio::detail::throw_error(ec, "/dev/poll");
  352. }
  353. return fd;
  354. }
  355. void dev_poll_reactor::do_add_timer_queue(timer_queue_base& queue)
  356. {
  357. mutex::scoped_lock lock(mutex_);
  358. timer_queues_.insert(&queue);
  359. }
  360. void dev_poll_reactor::do_remove_timer_queue(timer_queue_base& queue)
  361. {
  362. mutex::scoped_lock lock(mutex_);
  363. timer_queues_.erase(&queue);
  364. }
  365. int dev_poll_reactor::get_timeout(int msec)
  366. {
  367. // By default we will wait no longer than 5 minutes. This will ensure that
  368. // any changes to the system clock are detected after no longer than this.
  369. const int max_msec = 5 * 60 * 1000;
  370. return timer_queues_.wait_duration_msec(
  371. (msec < 0 || max_msec < msec) ? max_msec : msec);
  372. }
  373. void dev_poll_reactor::cancel_ops_unlocked(socket_type descriptor,
  374. const asio::error_code& ec)
  375. {
  376. bool need_interrupt = false;
  377. op_queue<operation> ops;
  378. for (int i = 0; i < max_ops; ++i)
  379. need_interrupt = op_queue_[i].cancel_operations(
  380. descriptor, ops, ec) || need_interrupt;
  381. scheduler_.post_deferred_completions(ops);
  382. if (need_interrupt)
  383. interrupter_.interrupt();
  384. }
  385. ::pollfd& dev_poll_reactor::add_pending_event_change(int descriptor)
  386. {
  387. hash_map<int, std::size_t>::iterator iter
  388. = pending_event_change_index_.find(descriptor);
  389. if (iter == pending_event_change_index_.end())
  390. {
  391. std::size_t index = pending_event_changes_.size();
  392. pending_event_changes_.reserve(pending_event_changes_.size() + 1);
  393. pending_event_change_index_.insert(std::make_pair(descriptor, index));
  394. pending_event_changes_.push_back(::pollfd());
  395. pending_event_changes_[index].fd = descriptor;
  396. pending_event_changes_[index].revents = 0;
  397. return pending_event_changes_[index];
  398. }
  399. else
  400. {
  401. return pending_event_changes_[iter->second];
  402. }
  403. }
  404. } // namespace detail
  405. } // namespace asio
  406. #include "asio/detail/pop_options.hpp"
  407. #endif // defined(ASIO_HAS_DEV_POLL)
  408. #endif // ASIO_DETAIL_IMPL_DEV_POLL_REACTOR_IPP