condition.hpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. //////////////////////////////////////////////////////////////////////////////
  2. //
  3. // (C) Copyright Ion Gaztanaga 2005-2012. Distributed under the Boost
  4. // Software License, Version 1.0. (See accompanying file
  5. // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // See http://www.boost.org/libs/interprocess for documentation.
  8. //
  9. //////////////////////////////////////////////////////////////////////////////
  10. #ifndef BOOST_INTERPROCESS_DETAIL_SPIN_CONDITION_HPP
  11. #define BOOST_INTERPROCESS_DETAIL_SPIN_CONDITION_HPP
  12. #ifndef BOOST_CONFIG_HPP
  13. # include <boost/config.hpp>
  14. #endif
  15. #
  16. #if defined(BOOST_HAS_PRAGMA_ONCE)
  17. # pragma once
  18. #endif
  19. #include <boost/interprocess/detail/config_begin.hpp>
  20. #include <boost/interprocess/detail/workaround.hpp>
  21. #include <boost/interprocess/sync/cv_status.hpp>
  22. #include <boost/interprocess/sync/spin/mutex.hpp>
  23. #include <boost/interprocess/detail/atomic.hpp>
  24. #include <boost/interprocess/sync/scoped_lock.hpp>
  25. #include <boost/interprocess/exceptions.hpp>
  26. #include <boost/interprocess/detail/os_thread_functions.hpp>
  27. #include <boost/interprocess/timed_utils.hpp>
  28. #include <boost/interprocess/sync/spin/wait.hpp>
  29. #include <boost/move/utility_core.hpp>
  30. #include <boost/cstdint.hpp>
  31. namespace boost {
  32. namespace interprocess {
  33. namespace ipcdetail {
  34. class spin_condition
  35. {
  36. spin_condition(const spin_condition &);
  37. spin_condition &operator=(const spin_condition &);
  38. public:
  39. spin_condition()
  40. {
  41. //Note that this class is initialized to zero.
  42. //So zeroed memory can be interpreted as an initialized
  43. //condition variable
  44. m_command = SLEEP;
  45. m_num_waiters = 0;
  46. }
  47. ~spin_condition()
  48. {
  49. //Notify all waiting threads
  50. //to allow POSIX semantics on condition destruction
  51. this->notify_all();
  52. }
  53. void notify_one()
  54. { this->notify(NOTIFY_ONE); }
  55. void notify_all()
  56. { this->notify(NOTIFY_ALL); }
  57. template <typename L>
  58. void wait(L& lock)
  59. {
  60. if (!lock)
  61. throw lock_exception();
  62. this->do_timed_wait_impl<false>(ustime(0u), *lock.mutex());
  63. }
  64. template <typename L, typename Pr>
  65. void wait(L& lock, Pr pred)
  66. {
  67. if (!lock)
  68. throw lock_exception();
  69. while (!pred())
  70. this->do_timed_wait_impl<false>(ustime(0u), *lock.mutex());
  71. }
  72. template <typename L, typename TimePoint>
  73. bool timed_wait(L& lock, const TimePoint &abs_time)
  74. {
  75. if (!lock)
  76. throw lock_exception();
  77. //Handle infinity absolute time here to avoid complications in do_timed_wait
  78. if(is_pos_infinity(abs_time)){
  79. this->wait(lock);
  80. return true;
  81. }
  82. return this->do_timed_wait_impl<true>(abs_time, *lock.mutex());
  83. }
  84. template <typename L, typename TimePoint, typename Pr>
  85. bool timed_wait(L& lock, const TimePoint &abs_time, Pr pred)
  86. {
  87. if (!lock)
  88. throw lock_exception();
  89. //Handle infinity absolute time here to avoid complications in do_timed_wait
  90. if(is_pos_infinity(abs_time)){
  91. this->wait(lock, pred);
  92. return true;
  93. }
  94. while (!pred()){
  95. if (!this->do_timed_wait_impl<true>(abs_time, *lock.mutex()))
  96. return pred();
  97. }
  98. return true;
  99. }
  100. template <typename L, class TimePoint>
  101. cv_status wait_until(L& lock, const TimePoint &abs_time)
  102. { return this->timed_wait(lock, abs_time) ? cv_status::no_timeout : cv_status::timeout; }
  103. template <typename L, class TimePoint, typename Pr>
  104. bool wait_until(L& lock, const TimePoint &abs_time, Pr pred)
  105. { return this->timed_wait(lock, abs_time, pred); }
  106. template <typename L, class Duration>
  107. cv_status wait_for(L& lock, const Duration &dur)
  108. { return this->wait_until(lock, duration_to_ustime(dur)); }
  109. template <typename L, class Duration, typename Pr>
  110. bool wait_for(L& lock, const Duration &dur, Pr pred)
  111. { return this->wait_until(lock, duration_to_ustime(dur), pred); }
  112. private:
  113. template<bool TimeoutEnabled, class InterprocessMutex, class TimePoint>
  114. bool do_timed_wait_impl(const TimePoint &abs_time, InterprocessMutex &mut)
  115. {
  116. typedef boost::interprocess::scoped_lock<spin_mutex> InternalLock;
  117. //The enter mutex guarantees that while executing a notification,
  118. //no other thread can execute the do_timed_wait method.
  119. {
  120. //---------------------------------------------------------------
  121. InternalLock lock;
  122. get_lock(bool_<TimeoutEnabled>(), m_enter_mut, lock, abs_time);
  123. if(!lock)
  124. return false;
  125. //---------------------------------------------------------------
  126. //We increment the waiting thread count protected so that it will be
  127. //always constant when another thread enters the notification logic.
  128. //The increment marks this thread as "waiting on spin_condition"
  129. atomic_inc32(const_cast<boost::uint32_t*>(&m_num_waiters));
  130. //We unlock the external mutex atomically with the increment
  131. mut.unlock();
  132. }
  133. //By default, we suppose that no timeout has happened
  134. bool timed_out = false, unlock_enter_mut= false;
  135. //Loop until a notification indicates that the thread should
  136. //exit or timeout occurs
  137. while(1){
  138. //The thread sleeps/spins until a spin_condition commands a notification
  139. //Notification occurred, we will lock the checking mutex so that
  140. spin_wait swait;
  141. while(atomic_read32(&m_command) == SLEEP){
  142. swait.yield();
  143. //Check for timeout
  144. if(TimeoutEnabled){
  145. typedef typename microsec_clock<TimePoint>::time_point time_point;
  146. time_point now = get_now<TimePoint>(bool_<TimeoutEnabled>());
  147. if(now >= abs_time){
  148. //If we can lock the mutex it means that no notification
  149. //is being executed in this spin_condition variable
  150. timed_out = m_enter_mut.try_lock();
  151. //If locking fails, indicates that another thread is executing
  152. //notification, so we play the notification game
  153. if(!timed_out){
  154. //There is an ongoing notification, we will try again later
  155. continue;
  156. }
  157. //No notification in execution, since enter mutex is locked.
  158. //We will execute time-out logic, so we will decrement count,
  159. //release the enter mutex and return false.
  160. break;
  161. }
  162. }
  163. }
  164. //If a timeout occurred, the mutex will not execute checking logic
  165. if(TimeoutEnabled && timed_out){
  166. //Decrement wait count
  167. atomic_dec32(const_cast<boost::uint32_t*>(&m_num_waiters));
  168. unlock_enter_mut = true;
  169. break;
  170. }
  171. else{
  172. boost::uint32_t result = atomic_cas32
  173. (const_cast<boost::uint32_t*>(&m_command), SLEEP, NOTIFY_ONE);
  174. if(result == SLEEP){
  175. //Other thread has been notified and since it was a NOTIFY one
  176. //command, this thread must sleep again
  177. continue;
  178. }
  179. else if(result == NOTIFY_ONE){
  180. //If it was a NOTIFY_ONE command, only this thread should
  181. //exit. This thread has atomically marked command as sleep before
  182. //so no other thread will exit.
  183. //Decrement wait count.
  184. unlock_enter_mut = true;
  185. atomic_dec32(const_cast<boost::uint32_t*>(&m_num_waiters));
  186. break;
  187. }
  188. else{
  189. //If it is a NOTIFY_ALL command, all threads should return
  190. //from do_timed_wait function. Decrement wait count.
  191. unlock_enter_mut = 1 == atomic_dec32(const_cast<boost::uint32_t*>(&m_num_waiters));
  192. //Check if this is the last thread of notify_all waiters
  193. //Only the last thread will release the mutex
  194. if(unlock_enter_mut){
  195. atomic_cas32(const_cast<boost::uint32_t*>(&m_command), SLEEP, NOTIFY_ALL);
  196. }
  197. break;
  198. }
  199. }
  200. }
  201. //Unlock the enter mutex if it is a single notification, if this is
  202. //the last notified thread in a notify_all or a timeout has occurred
  203. if(unlock_enter_mut){
  204. m_enter_mut.unlock();
  205. }
  206. //Lock external again before returning from the method
  207. mut.lock();
  208. return !timed_out;
  209. }
  210. template <class TimePoint>
  211. static typename microsec_clock<TimePoint>::time_point get_now(bool_<true>)
  212. { return microsec_clock<TimePoint>::universal_time(); }
  213. template <class TimePoint>
  214. static typename microsec_clock<TimePoint>::time_point get_now(bool_<false>)
  215. { return typename microsec_clock<TimePoint>::time_point(); }
  216. template <class Mutex, class Lock, class TimePoint>
  217. static void get_lock(bool_<true>, Mutex &m, Lock &lck, const TimePoint &abs_time)
  218. {
  219. Lock dummy(m, abs_time);
  220. lck = boost::move(dummy);
  221. }
  222. template <class Mutex, class Lock, class TimePoint>
  223. static void get_lock(bool_<false>, Mutex &m, Lock &lck, const TimePoint &)
  224. {
  225. Lock dummy(m);
  226. lck = boost::move(dummy);
  227. }
  228. void notify(boost::uint32_t command)
  229. {
  230. //This mutex guarantees that no other thread can enter to the
  231. //do_timed_wait method logic, so that thread count will be
  232. //constant until the function writes a NOTIFY_ALL command.
  233. //It also guarantees that no other notification can be signaled
  234. //on this spin_condition before this one ends
  235. m_enter_mut.lock();
  236. //Return if there are no waiters
  237. if(!atomic_read32(&m_num_waiters)) {
  238. m_enter_mut.unlock();
  239. return;
  240. }
  241. //Notify that all threads should execute wait logic
  242. spin_wait swait;
  243. while(SLEEP != atomic_cas32(const_cast<boost::uint32_t*>(&m_command), command, SLEEP)){
  244. swait.yield();
  245. }
  246. //The enter mutex will rest locked until the last waiting thread unlocks it
  247. }
  248. enum { SLEEP = 0, NOTIFY_ONE, NOTIFY_ALL };
  249. spin_mutex m_enter_mut;
  250. volatile boost::uint32_t m_command;
  251. volatile boost::uint32_t m_num_waiters;
  252. };
  253. } //namespace ipcdetail
  254. } //namespace interprocess
  255. } //namespace boost
  256. #include <boost/interprocess/detail/config_end.hpp>
  257. #endif //BOOST_INTERPROCESS_DETAIL_SPIN_CONDITION_HPP