mpmc_blocking_q.h 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. // Copyright(c) 2015-present, Gabi Melman & spdlog contributors.
  2. // Distributed under the MIT License (http://opensource.org/licenses/MIT)
  3. #pragma once
  4. // multi producer-multi consumer blocking queue.
  5. // enqueue(..) - will block until room found to put the new message.
  6. // enqueue_nowait(..) - will return immediately with false if no room left in
  7. // the queue.
  8. // dequeue_for(..) - will block until the queue is not empty or timeout have
  9. // passed.
  10. #include <spdlog/details/circular_q.h>
  11. #include <condition_variable>
  12. #include <mutex>
  13. namespace spdlog {
  14. namespace details {
  15. template<typename T>
  16. class mpmc_blocking_queue
  17. {
  18. public:
  19. using item_type = T;
  20. explicit mpmc_blocking_queue(size_t max_items)
  21. : q_(max_items)
  22. {}
  23. #ifndef __MINGW32__
  24. // try to enqueue and block if no room left
  25. void enqueue(T &&item)
  26. {
  27. {
  28. std::unique_lock<std::mutex> lock(queue_mutex_);
  29. pop_cv_.wait(lock, [this] { return !this->q_.full(); });
  30. q_.push_back(std::move(item));
  31. }
  32. push_cv_.notify_one();
  33. }
  34. // enqueue immediately. overrun oldest message in the queue if no room left.
  35. void enqueue_nowait(T &&item)
  36. {
  37. {
  38. std::unique_lock<std::mutex> lock(queue_mutex_);
  39. q_.push_back(std::move(item));
  40. }
  41. push_cv_.notify_one();
  42. }
  43. // dequeue with a timeout.
  44. // Return true, if succeeded dequeue item, false otherwise
  45. bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration)
  46. {
  47. {
  48. std::unique_lock<std::mutex> lock(queue_mutex_);
  49. if (!push_cv_.wait_for(lock, wait_duration, [this] { return !this->q_.empty(); }))
  50. {
  51. return false;
  52. }
  53. popped_item = std::move(q_.front());
  54. q_.pop_front();
  55. }
  56. pop_cv_.notify_one();
  57. return true;
  58. }
  59. // blocking dequeue without a timeout.
  60. void dequeue(T &popped_item)
  61. {
  62. {
  63. std::unique_lock<std::mutex> lock(queue_mutex_);
  64. push_cv_.wait(lock, [this] { return !this->q_.empty(); });
  65. popped_item = std::move(q_.front());
  66. q_.pop_front();
  67. }
  68. pop_cv_.notify_one();
  69. }
  70. #else
  71. // apparently mingw deadlocks if the mutex is released before cv.notify_one(),
  72. // so release the mutex at the very end each function.
  73. // try to enqueue and block if no room left
  74. void enqueue(T &&item)
  75. {
  76. std::unique_lock<std::mutex> lock(queue_mutex_);
  77. pop_cv_.wait(lock, [this] { return !this->q_.full(); });
  78. q_.push_back(std::move(item));
  79. push_cv_.notify_one();
  80. }
  81. // enqueue immediately. overrun oldest message in the queue if no room left.
  82. void enqueue_nowait(T &&item)
  83. {
  84. std::unique_lock<std::mutex> lock(queue_mutex_);
  85. q_.push_back(std::move(item));
  86. push_cv_.notify_one();
  87. }
  88. // dequeue with a timeout.
  89. // Return true, if succeeded dequeue item, false otherwise
  90. bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration)
  91. {
  92. std::unique_lock<std::mutex> lock(queue_mutex_);
  93. if (!push_cv_.wait_for(lock, wait_duration, [this] { return !this->q_.empty(); }))
  94. {
  95. return false;
  96. }
  97. popped_item = std::move(q_.front());
  98. q_.pop_front();
  99. pop_cv_.notify_one();
  100. return true;
  101. }
  102. // blocking dequeue without a timeout.
  103. void dequeue(T &popped_item)
  104. {
  105. std::unique_lock<std::mutex> lock(queue_mutex_);
  106. push_cv_.wait(lock, [this] { return !this->q_.empty(); });
  107. popped_item = std::move(q_.front());
  108. q_.pop_front();
  109. pop_cv_.notify_one();
  110. }
  111. #endif
  112. size_t overrun_counter()
  113. {
  114. std::unique_lock<std::mutex> lock(queue_mutex_);
  115. return q_.overrun_counter();
  116. }
  117. size_t size()
  118. {
  119. std::unique_lock<std::mutex> lock(queue_mutex_);
  120. return q_.size();
  121. }
  122. void reset_overrun_counter()
  123. {
  124. std::unique_lock<std::mutex> lock(queue_mutex_);
  125. q_.reset_overrun_counter();
  126. }
  127. private:
  128. std::mutex queue_mutex_;
  129. std::condition_variable push_cv_;
  130. std::condition_variable pop_cv_;
  131. spdlog::details::circular_q<T> q_;
  132. };
  133. } // namespace details
  134. } // namespace spdlog