123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- #ifndef BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H
- #define BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H
- #include <atomic>
- #include <cstddef>
- #include <cstdint>
- #include <memory>
- #include <type_traits>
- #include <utility>
- #include <boost/assert.hpp>
- #include <boost/config.hpp>
- #include <boost/fiber/detail/config.hpp>
- #include <boost/fiber/context.hpp>
- #if BOOST_COMP_CLANG
- #pragma clang diagnostic push
- #pragma clang diagnostic ignored "-Wunused-private-field"
- #endif
- namespace boost {
- namespace fibers {
- namespace detail {
- class context_spmc_queue {
- private:
- class array {
- private:
- typedef std::atomic< context * > atomic_type;
- typedef atomic_type storage_type;
- std::size_t capacity_;
- storage_type * storage_;
- public:
- array( std::size_t capacity) :
- capacity_{ capacity },
- storage_{ new storage_type[capacity_] } {
- for ( std::size_t i = 0; i < capacity_; ++i) {
- ::new ( static_cast< void * >( std::addressof( storage_[i]) ) ) atomic_type{ nullptr };
- }
- }
- ~array() {
- for ( std::size_t i = 0; i < capacity_; ++i) {
- reinterpret_cast< atomic_type * >( std::addressof( storage_[i]) )->~atomic_type();
- }
- delete [] storage_;
- }
- std::size_t capacity() const noexcept {
- return capacity_;
- }
- void push( std::size_t bottom, context * ctx) noexcept {
- reinterpret_cast< atomic_type * >(
- std::addressof( storage_[bottom % capacity_]) )
- ->store( ctx, std::memory_order_relaxed);
- }
- context * pop( std::size_t top) noexcept {
- return reinterpret_cast< atomic_type * >(
- std::addressof( storage_[top % capacity_]) )
- ->load( std::memory_order_relaxed);
- }
- array * resize( std::size_t bottom, std::size_t top) {
- std::unique_ptr< array > tmp{ new array{ 2 * capacity_ } };
- for ( std::size_t i = top; i != bottom; ++i) {
- tmp->push( i, pop( i) );
- }
- return tmp.release();
- }
- };
- std::atomic< std::size_t > top_{ 0 };
- std::atomic< std::size_t > bottom_{ 0 };
- std::atomic< array * > array_;
- std::vector< array * > old_arrays_{};
- char padding_[cacheline_length];
- public:
- context_spmc_queue( std::size_t capacity = 4096) :
- array_{ new array{ capacity } } {
- old_arrays_.reserve( 32);
- }
- ~context_spmc_queue() {
- for ( array * a : old_arrays_) {
- delete a;
- }
- delete array_.load();
- }
- context_spmc_queue( context_spmc_queue const&) = delete;
- context_spmc_queue & operator=( context_spmc_queue const&) = delete;
- bool empty() const noexcept {
- std::size_t bottom = bottom_.load( std::memory_order_relaxed);
- std::size_t top = top_.load( std::memory_order_relaxed);
- return bottom <= top;
- }
- void push( context * ctx) {
- std::size_t bottom = bottom_.load( std::memory_order_relaxed);
- std::size_t top = top_.load( std::memory_order_acquire);
- array * a = array_.load( std::memory_order_relaxed);
- if ( (a->capacity() - 1) < (bottom - top) ) {
-
-
- array * tmp = a->resize( bottom, top);
- old_arrays_.push_back( a);
- std::swap( a, tmp);
- array_.store( a, std::memory_order_relaxed);
- }
- a->push( bottom, ctx);
- std::atomic_thread_fence( std::memory_order_release);
- bottom_.store( bottom + 1, std::memory_order_relaxed);
- }
- context * pop() {
- std::size_t bottom = bottom_.load( std::memory_order_relaxed) - 1;
- array * a = array_.load( std::memory_order_relaxed);
- bottom_.store( bottom, std::memory_order_relaxed);
- std::atomic_thread_fence( std::memory_order_seq_cst);
- std::size_t top = top_.load( std::memory_order_relaxed);
- context * ctx = nullptr;
- if ( top <= bottom) {
-
- ctx = a->pop( bottom);
- BOOST_ASSERT( nullptr != ctx);
- if ( top == bottom) {
-
- if ( ! top_.compare_exchange_strong( top, top + 1,
- std::memory_order_seq_cst,
- std::memory_order_relaxed) ) {
-
- ctx = nullptr;
- }
- bottom_.store( bottom + 1, std::memory_order_relaxed);
- }
- } else {
-
- bottom_.store( bottom + 1, std::memory_order_relaxed);
- }
- return ctx;
- }
- context * steal() {
- std::size_t top = top_.load( std::memory_order_acquire);
- std::atomic_thread_fence( std::memory_order_seq_cst);
- std::size_t bottom = bottom_.load( std::memory_order_acquire);
- context * ctx = nullptr;
- if ( top < bottom) {
-
- array * a = array_.load( std::memory_order_consume);
- ctx = a->pop( top);
- BOOST_ASSERT( nullptr != ctx);
-
- if ( ctx->is_context( type::pinned_context) ) {
- return nullptr;
- }
- if ( ! top_.compare_exchange_strong( top, top + 1,
- std::memory_order_seq_cst,
- std::memory_order_relaxed) ) {
-
- return nullptr;
- }
- }
- return ctx;
- }
- };
- }}}
- #if BOOST_COMP_CLANG
- #pragma clang diagnostic pop
- #endif
- #endif
|