reactor_op_queue.hpp 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. //
  2. // detail/reactor_op_queue.hpp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef ASIO_DETAIL_REACTOR_OP_QUEUE_HPP
  11. #define ASIO_DETAIL_REACTOR_OP_QUEUE_HPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include "asio/detail/config.hpp"
  16. #include "asio/detail/hash_map.hpp"
  17. #include "asio/detail/noncopyable.hpp"
  18. #include "asio/detail/op_queue.hpp"
  19. #include "asio/detail/reactor_op.hpp"
  20. #include "asio/error.hpp"
  21. #include "asio/detail/push_options.hpp"
  22. namespace asio {
  23. namespace detail {
  24. template <typename Descriptor>
  25. class reactor_op_queue
  26. : private noncopyable
  27. {
  28. public:
  29. typedef Descriptor key_type;
  30. struct mapped_type : op_queue<reactor_op>
  31. {
  32. mapped_type() {}
  33. mapped_type(const mapped_type&) {}
  34. void operator=(const mapped_type&) {}
  35. };
  36. typedef typename hash_map<key_type, mapped_type>::value_type value_type;
  37. typedef typename hash_map<key_type, mapped_type>::iterator iterator;
  38. // Constructor.
  39. reactor_op_queue()
  40. : operations_()
  41. {
  42. }
  43. // Obtain iterators to all registered descriptors.
  44. iterator begin() { return operations_.begin(); }
  45. iterator end() { return operations_.end(); }
  46. // Add a new operation to the queue. Returns true if this is the only
  47. // operation for the given descriptor, in which case the reactor's event
  48. // demultiplexing function call may need to be interrupted and restarted.
  49. bool enqueue_operation(Descriptor descriptor, reactor_op* op)
  50. {
  51. std::pair<iterator, bool> entry =
  52. operations_.insert(value_type(descriptor, mapped_type()));
  53. entry.first->second.push(op);
  54. return entry.second;
  55. }
  56. // Cancel all operations associated with the descriptor identified by the
  57. // supplied iterator. Any operations pending for the descriptor will be
  58. // cancelled. Returns true if any operations were cancelled, in which case
  59. // the reactor's event demultiplexing function may need to be interrupted and
  60. // restarted.
  61. bool cancel_operations(iterator i, op_queue<operation>& ops,
  62. const asio::error_code& ec =
  63. asio::error::operation_aborted)
  64. {
  65. if (i != operations_.end())
  66. {
  67. while (reactor_op* op = i->second.front())
  68. {
  69. op->ec_ = ec;
  70. i->second.pop();
  71. ops.push(op);
  72. }
  73. operations_.erase(i);
  74. return true;
  75. }
  76. return false;
  77. }
  78. // Cancel all operations associated with the descriptor. Any operations
  79. // pending for the descriptor will be cancelled. Returns true if any
  80. // operations were cancelled, in which case the reactor's event
  81. // demultiplexing function may need to be interrupted and restarted.
  82. bool cancel_operations(Descriptor descriptor, op_queue<operation>& ops,
  83. const asio::error_code& ec =
  84. asio::error::operation_aborted)
  85. {
  86. return this->cancel_operations(operations_.find(descriptor), ops, ec);
  87. }
  88. // Cancel operations associated with the descriptor identified by the
  89. // supplied iterator, and the specified cancellation key. Any operations
  90. // pending for the descriptor with the key will be cancelled. Returns true if
  91. // any operations were cancelled, in which case the reactor's event
  92. // demultiplexing function may need to be interrupted and restarted.
  93. bool cancel_operations_by_key(iterator i, op_queue<operation>& ops,
  94. void* cancellation_key, const asio::error_code& ec =
  95. asio::error::operation_aborted)
  96. {
  97. bool result = false;
  98. if (i != operations_.end())
  99. {
  100. op_queue<reactor_op> other_ops;
  101. while (reactor_op* op = i->second.front())
  102. {
  103. i->second.pop();
  104. if (op->cancellation_key_ == cancellation_key)
  105. {
  106. op->ec_ = ec;
  107. ops.push(op);
  108. result = true;
  109. }
  110. else
  111. other_ops.push(op);
  112. }
  113. i->second.push(other_ops);
  114. if (i->second.empty())
  115. operations_.erase(i);
  116. }
  117. return result;
  118. }
  119. // Cancel all operations associated with the descriptor. Any operations
  120. // pending for the descriptor will be cancelled. Returns true if any
  121. // operations were cancelled, in which case the reactor's event
  122. // demultiplexing function may need to be interrupted and restarted.
  123. bool cancel_operations_by_key(Descriptor descriptor, op_queue<operation>& ops,
  124. void* cancellation_key, const asio::error_code& ec =
  125. asio::error::operation_aborted)
  126. {
  127. return this->cancel_operations_by_key(
  128. operations_.find(descriptor), ops, cancellation_key, ec);
  129. }
  130. // Whether there are no operations in the queue.
  131. bool empty() const
  132. {
  133. return operations_.empty();
  134. }
  135. // Determine whether there are any operations associated with the descriptor.
  136. bool has_operation(Descriptor descriptor) const
  137. {
  138. return operations_.find(descriptor) != operations_.end();
  139. }
  140. // Perform the operations corresponding to the descriptor identified by the
  141. // supplied iterator. Returns true if there are still unfinished operations
  142. // queued for the descriptor.
  143. bool perform_operations(iterator i, op_queue<operation>& ops)
  144. {
  145. if (i != operations_.end())
  146. {
  147. while (reactor_op* op = i->second.front())
  148. {
  149. if (op->perform())
  150. {
  151. i->second.pop();
  152. ops.push(op);
  153. }
  154. else
  155. {
  156. return true;
  157. }
  158. }
  159. operations_.erase(i);
  160. }
  161. return false;
  162. }
  163. // Perform the operations corresponding to the descriptor. Returns true if
  164. // there are still unfinished operations queued for the descriptor.
  165. bool perform_operations(Descriptor descriptor, op_queue<operation>& ops)
  166. {
  167. return this->perform_operations(operations_.find(descriptor), ops);
  168. }
  169. // Get all operations owned by the queue.
  170. void get_all_operations(op_queue<operation>& ops)
  171. {
  172. iterator i = operations_.begin();
  173. while (i != operations_.end())
  174. {
  175. iterator op_iter = i++;
  176. ops.push(op_iter->second);
  177. operations_.erase(op_iter);
  178. }
  179. }
  180. private:
  181. // The operations that are currently executing asynchronously.
  182. hash_map<key_type, mapped_type> operations_;
  183. };
  184. } // namespace detail
  185. } // namespace asio
  186. #include "asio/detail/pop_options.hpp"
  187. #endif // ASIO_DETAIL_REACTOR_OP_QUEUE_HPP