30 #ifndef DMLC_CONCURRENTQUEUE_H_ 31 #define DMLC_CONCURRENTQUEUE_H_ 38 #pragma GCC diagnostic push 39 #pragma GCC diagnostic ignored "-Wconversion" 41 #ifdef MCDBGQ_USE_RELACY 42 #pragma GCC diagnostic ignored "-Wint-to-pointer-cast" 46 #if defined(_WIN32) || defined(__WINDOWS__) || defined(__WIN32__) || defined(_WIN64) 50 #if defined(__APPLE__) 51 #include "TargetConditionals.h" 54 #ifdef MCDBGQ_USE_RELACY 55 #include "relacy/relacy_std.hpp" 56 #include "relacy_shims.h" 70 #include <type_traits> 81 namespace moodycamel {
namespace details {
82 template<
typename thread_
id_t>
struct thread_id_converter {
83 typedef thread_id_t thread_id_numeric_size_t;
84 typedef thread_id_t thread_id_hash_t;
85 static thread_id_hash_t prehash(thread_id_t
const& x) {
return x; }
88 #if defined(MCDBGQ_USE_RELACY) 89 namespace moodycamel {
namespace details {
90 typedef std::uint32_t thread_id_t;
91 static const thread_id_t invalid_thread_id = 0xFFFFFFFFU;
92 static const thread_id_t invalid_thread_id2 = 0xFFFFFFFEU;
93 static inline thread_id_t thread_id() {
return rl::thread_index(); }
95 #elif defined(_WIN32) || defined(__WINDOWS__) || defined(__WIN32__) 98 extern "C" __declspec(dllimport) unsigned
long __stdcall GetCurrentThreadId(
void);
99 namespace moodycamel {
namespace details {
100 static_assert(
sizeof(
unsigned long) ==
sizeof(std::uint32_t),
"Expected size of unsigned long to be 32 bits on Windows");
101 typedef std::uint32_t thread_id_t;
102 static const thread_id_t invalid_thread_id = 0;
103 static const thread_id_t invalid_thread_id2 = 0xFFFFFFFFU;
104 static inline thread_id_t thread_id() {
return static_cast<thread_id_t
>(::GetCurrentThreadId()); }
106 #elif defined(__arm__) || defined(_M_ARM) || defined(__aarch64__) || (defined(__APPLE__) && TARGET_OS_IPHONE) 107 namespace moodycamel {
namespace details {
108 static_assert(
sizeof(std::thread::id) == 4 ||
sizeof(std::thread::id) == 8,
"std::thread::id is expected to be either 4 or 8 bytes");
110 typedef std::thread::id thread_id_t;
111 static const thread_id_t invalid_thread_id;
116 static inline thread_id_t thread_id() {
return std::this_thread::get_id(); }
118 template<std::
size_t>
struct thread_id_size { };
119 template<>
struct thread_id_size<4> {
typedef std::uint32_t numeric_t; };
120 template<>
struct thread_id_size<8> {
typedef std::uint64_t numeric_t; };
122 template<>
struct thread_id_converter<thread_id_t> {
123 typedef thread_id_size<sizeof(thread_id_t)>::numeric_t thread_id_numeric_size_t;
125 typedef std::size_t thread_id_hash_t;
127 typedef thread_id_numeric_size_t thread_id_hash_t;
130 static thread_id_hash_t prehash(thread_id_t
const& x)
133 return std::hash<std::thread::id>()(x);
135 return *
reinterpret_cast<thread_id_hash_t const*
>(&x);
144 #if defined(__GNUC__) || defined(__INTEL_COMPILER) 145 #define MOODYCAMEL_THREADLOCAL __thread 146 #elif defined(_MSC_VER) 147 #define MOODYCAMEL_THREADLOCAL __declspec(thread) 150 #define MOODYCAMEL_THREADLOCAL thread_local 152 namespace moodycamel {
namespace details {
153 typedef std::uintptr_t thread_id_t;
154 static const thread_id_t invalid_thread_id = 0;
155 static const thread_id_t invalid_thread_id2 = 1;
156 static inline thread_id_t thread_id() {
static MOODYCAMEL_THREADLOCAL
int x;
return reinterpret_cast<thread_id_t
>(&x); }
161 #ifndef MOODYCAMEL_EXCEPTIONS_ENABLED 162 #if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || (!defined(_MSC_VER) && !defined(__GNUC__)) 163 #define MOODYCAMEL_EXCEPTIONS_ENABLED 166 #ifdef MOODYCAMEL_EXCEPTIONS_ENABLED 167 #define MOODYCAMEL_TRY try 168 #define MOODYCAMEL_CATCH(...) catch(__VA_ARGS__) 169 #define MOODYCAMEL_RETHROW throw 170 #define MOODYCAMEL_THROW(expr) throw (expr) 172 #define MOODYCAMEL_TRY if (true) 173 #define MOODYCAMEL_CATCH(...) else if (false) 174 #define MOODYCAMEL_RETHROW 175 #define MOODYCAMEL_THROW(expr) 178 #ifndef MOODYCAMEL_NOEXCEPT 179 #if !defined(MOODYCAMEL_EXCEPTIONS_ENABLED) 180 #define MOODYCAMEL_NOEXCEPT 181 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) true 182 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) true 183 #elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1800 186 #define MOODYCAMEL_NOEXCEPT _NOEXCEPT 187 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value) 188 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr)) 189 #elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1900 190 #define MOODYCAMEL_NOEXCEPT _NOEXCEPT 191 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value || std::is_nothrow_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value || std::is_nothrow_copy_constructible<type>::value) 192 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr)) 194 #define MOODYCAMEL_NOEXCEPT noexcept 195 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) noexcept(expr) 196 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) noexcept(expr) 200 #ifndef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 201 #ifdef MCDBGQ_USE_RELACY 202 #define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 207 #if (!defined(_MSC_VER) || _MSC_VER >= 1900) && (!defined(__MINGW32__) && !defined(__MINGW64__) || !defined(__WINPTHREADS_VERSION)) && (!defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)) && (!defined(__APPLE__) || !TARGET_OS_IPHONE) && !defined(__arm__) && !defined(_M_ARM) && !defined(__aarch64__) 216 #ifndef MOODYCAMEL_DELETE_FUNCTION 217 #if defined(_MSC_VER) && _MSC_VER < 1800 218 #define MOODYCAMEL_DELETE_FUNCTION 220 #define MOODYCAMEL_DELETE_FUNCTION = delete 225 namespace moodycamel {
namespace details {
226 #if defined(__GNUC__) 227 inline bool likely(
bool x) {
return __builtin_expect((x),
true); }
228 inline bool unlikely(
bool x) {
return __builtin_expect((x),
false); }
230 inline bool likely(
bool x) {
return x; }
231 inline bool unlikely(
bool x) {
return x; }
235 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG 236 #include "internal/concurrentqueue_internal_debug.h" 239 namespace moodycamel {
242 struct const_numeric_max {
243 static_assert(std::is_integral<T>::value,
"const_numeric_max can only be used with integers");
244 static const T value = std::numeric_limits<T>::is_signed
245 ? (
static_cast<T
>(1) << (
sizeof(T) * CHAR_BIT - 1)) -
static_cast<T
>(1)
246 : static_cast<T>(-1);
249 #if defined(__GLIBCXX__) 250 typedef ::max_align_t std_max_align_t;
252 typedef std::max_align_t std_max_align_t;
270 struct ConcurrentQueueDefaultTraits
273 typedef std::size_t size_t;
292 static const size_t BLOCK_SIZE = 32;
299 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = 32;
303 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 32;
307 static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 32;
313 static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = 32;
318 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 256;
324 static const size_t MAX_SUBQUEUE_SIZE = details::const_numeric_max<size_t>::value;
327 #ifndef MCDBGQ_USE_RELACY 330 #if defined(malloc) || defined(free) 333 static inline void* WORKAROUND_malloc(
size_t size) {
return malloc(size); }
334 static inline void WORKAROUND_free(
void* ptr) {
return free(ptr); }
335 static inline void* (malloc)(
size_t size) {
return WORKAROUND_malloc(size); }
336 static inline void (free)(
void* ptr) {
return WORKAROUND_free(ptr); }
338 static inline void* malloc(
size_t size) {
return std::malloc(size); }
339 static inline void free(
void* ptr) {
return std::free(ptr); }
344 static inline void* malloc(
size_t size) {
return rl::rl_malloc(size, $); }
345 static inline void free(
void* ptr) {
return rl::rl_free(ptr, $); }
357 struct ProducerToken;
358 struct ConsumerToken;
360 template<
typename T,
typename Traits>
class ConcurrentQueue;
361 template<
typename T,
typename Traits>
class BlockingConcurrentQueue;
362 class ConcurrentQueueTests;
367 struct ConcurrentQueueProducerTypelessBase
369 ConcurrentQueueProducerTypelessBase* next;
370 std::atomic<bool> inactive;
371 ProducerToken* token;
373 ConcurrentQueueProducerTypelessBase()
374 : next(nullptr), inactive(false), token(nullptr)
379 template<
bool use32>
struct _hash_32_or_64 {
380 static inline std::uint32_t hash(std::uint32_t h)
390 return h ^ (h >> 16);
393 template<>
struct _hash_32_or_64<1> {
394 static inline std::uint64_t hash(std::uint64_t h)
397 h *= 0xff51afd7ed558ccd;
399 h *= 0xc4ceb9fe1a85ec53;
400 return h ^ (h >> 33);
403 template<std::
size_t size>
struct hash_32_or_64 :
public _hash_32_or_64<(size > 4)> { };
405 static inline size_t hash_thread_id(thread_id_t
id)
407 static_assert(
sizeof(thread_id_t) <= 8,
"Expected a platform where thread IDs are at most 64-bit values");
408 return static_cast<size_t>(hash_32_or_64<sizeof(thread_id_converter<thread_id_t>::thread_id_hash_t)>::hash(
409 thread_id_converter<thread_id_t>::prehash(
id)));
413 static inline bool circular_less_than(T a, T b)
416 #pragma warning(push) 417 #pragma warning(disable: 4554) 419 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed,
"circular_less_than is intended to be used only with unsigned integer types");
420 return static_cast<T
>(a - b) > static_cast<T>(static_cast<T>(1) <<
static_cast<T
>(
sizeof(T) * CHAR_BIT - 1));
427 static inline char* align_for(
char* ptr)
429 const std::size_t alignment = std::alignment_of<U>::value;
430 return ptr + (alignment - (
reinterpret_cast<std::uintptr_t
>(ptr) % alignment)) % alignment;
434 static inline T ceil_to_pow_2(T x)
436 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed,
"ceil_to_pow_2 is intended to be used only with unsigned integer types");
443 for (std::size_t i = 1; i <
sizeof(T); i <<= 1) {
451 static inline void swap_relaxed(std::atomic<T>& left, std::atomic<T>& right)
453 T temp = std::move(left.load(std::memory_order_relaxed));
454 left.store(std::move(right.load(std::memory_order_relaxed)), std::memory_order_relaxed);
455 right.store(std::move(temp), std::memory_order_relaxed);
459 static inline T
const& nomove(T
const& x)
464 template<
bool Enable>
468 static inline T
const& eval(T
const& x)
475 struct nomove_if<false>
478 static inline auto eval(U&& x)
479 -> decltype(std::forward<U>(x))
481 return std::forward<U>(x);
485 template<
typename It>
486 static inline auto deref_noexcept(It& it) MOODYCAMEL_NOEXCEPT -> decltype(*it)
491 #if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8) 492 template<
typename T>
struct is_trivially_destructible : std::is_trivially_destructible<T> { };
494 template<
typename T>
struct is_trivially_destructible : std::has_trivial_destructor<T> { };
497 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 498 #ifdef MCDBGQ_USE_RELACY 499 typedef RelacyThreadExitListener ThreadExitListener;
500 typedef RelacyThreadExitNotifier ThreadExitNotifier;
502 struct ThreadExitListener
504 typedef void (*callback_t)(
void*);
508 ThreadExitListener* next;
512 class ThreadExitNotifier
515 static void subscribe(ThreadExitListener* listener)
517 auto& tlsInst = instance();
518 listener->next = tlsInst.tail;
519 tlsInst.tail = listener;
522 static void unsubscribe(ThreadExitListener* listener)
524 auto& tlsInst = instance();
525 ThreadExitListener** prev = &tlsInst.tail;
526 for (
auto ptr = tlsInst.tail; ptr !=
nullptr; ptr = ptr->next) {
527 if (ptr == listener) {
536 ThreadExitNotifier() : tail(nullptr) { }
537 ThreadExitNotifier(ThreadExitNotifier
const&) MOODYCAMEL_DELETE_FUNCTION;
538 ThreadExitNotifier& operator=(ThreadExitNotifier const&) MOODYCAMEL_DELETE_FUNCTION;
540 ~ThreadExitNotifier()
543 assert(
this == &instance() &&
"If this assert fails, you likely have a buggy compiler! Change the preprocessor conditions such that MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is no longer defined.");
544 for (
auto ptr = tail; ptr !=
nullptr; ptr = ptr->next) {
545 ptr->callback(ptr->userData);
550 static inline ThreadExitNotifier& instance()
552 static thread_local ThreadExitNotifier notifier;
557 ThreadExitListener* tail;
562 template<
typename T>
struct static_is_lock_free_num {
enum { value = 0 }; };
563 template<>
struct static_is_lock_free_num<signed char> {
enum { value = ATOMIC_CHAR_LOCK_FREE }; };
564 template<>
struct static_is_lock_free_num<short> {
enum { value = ATOMIC_SHORT_LOCK_FREE }; };
565 template<>
struct static_is_lock_free_num<int> {
enum { value = ATOMIC_INT_LOCK_FREE }; };
566 template<>
struct static_is_lock_free_num<long> {
enum { value = ATOMIC_LONG_LOCK_FREE }; };
567 template<>
struct static_is_lock_free_num<long long> {
enum { value = ATOMIC_LLONG_LOCK_FREE }; };
568 template<
typename T>
struct static_is_lock_free : static_is_lock_free_num<typename std::make_signed<T>::type> { };
569 template<>
struct static_is_lock_free<bool> {
enum { value = ATOMIC_BOOL_LOCK_FREE }; };
570 template<
typename U>
struct static_is_lock_free<U*> {
enum { value = ATOMIC_POINTER_LOCK_FREE }; };
576 template<
typename T,
typename Traits>
577 explicit ProducerToken(ConcurrentQueue<T, Traits>& queue);
579 template<
typename T,
typename Traits>
580 explicit ProducerToken(BlockingConcurrentQueue<T, Traits>& queue);
582 ProducerToken(ProducerToken&& other) MOODYCAMEL_NOEXCEPT
583 : producer(other.producer)
585 other.producer =
nullptr;
586 if (producer !=
nullptr) {
587 producer->token =
this;
591 inline ProducerToken& operator=(ProducerToken&& other) MOODYCAMEL_NOEXCEPT
597 void swap(ProducerToken& other) MOODYCAMEL_NOEXCEPT
599 std::swap(producer, other.producer);
600 if (producer !=
nullptr) {
601 producer->token =
this;
603 if (other.producer !=
nullptr) {
604 other.producer->token = &other;
616 inline bool valid()
const {
return producer !=
nullptr; }
620 if (producer !=
nullptr) {
621 producer->token =
nullptr;
622 producer->inactive.store(
true, std::memory_order_release);
627 ProducerToken(ProducerToken
const&) MOODYCAMEL_DELETE_FUNCTION;
628 ProducerToken& operator=(ProducerToken const&) MOODYCAMEL_DELETE_FUNCTION;
631 template<typename T, typename Traits> friend class ConcurrentQueue;
632 friend class ConcurrentQueueTests;
635 details::ConcurrentQueueProducerTypelessBase* producer;
641 template<
typename T,
typename Traits>
642 explicit ConsumerToken(ConcurrentQueue<T, Traits>& q);
644 template<
typename T,
typename Traits>
645 explicit ConsumerToken(BlockingConcurrentQueue<T, Traits>& q);
647 ConsumerToken(ConsumerToken&& other) MOODYCAMEL_NOEXCEPT
648 : initialOffset(other.initialOffset), lastKnownGlobalOffset(other.lastKnownGlobalOffset), itemsConsumedFromCurrent(other.itemsConsumedFromCurrent), currentProducer(other.currentProducer), desiredProducer(other.desiredProducer)
652 inline ConsumerToken& operator=(ConsumerToken&& other) MOODYCAMEL_NOEXCEPT
658 void swap(ConsumerToken& other) MOODYCAMEL_NOEXCEPT
660 std::swap(initialOffset, other.initialOffset);
661 std::swap(lastKnownGlobalOffset, other.lastKnownGlobalOffset);
662 std::swap(itemsConsumedFromCurrent, other.itemsConsumedFromCurrent);
663 std::swap(currentProducer, other.currentProducer);
664 std::swap(desiredProducer, other.desiredProducer);
668 ConsumerToken(ConsumerToken
const&) MOODYCAMEL_DELETE_FUNCTION;
669 ConsumerToken& operator=(ConsumerToken const&) MOODYCAMEL_DELETE_FUNCTION;
672 template<typename T, typename Traits> friend class ConcurrentQueue;
673 friend class ConcurrentQueueTests;
676 std::uint32_t initialOffset;
677 std::uint32_t lastKnownGlobalOffset;
678 std::uint32_t itemsConsumedFromCurrent;
679 details::ConcurrentQueueProducerTypelessBase* currentProducer;
680 details::ConcurrentQueueProducerTypelessBase* desiredProducer;
685 template<typename T, typename Traits>
686 inline
void swap(typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& a, typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& b) MOODYCAMEL_NOEXCEPT;
689 template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
690 class ConcurrentQueue {
692 typedef ::dmlc::moodycamel::ProducerToken producer_token_t;
693 typedef ::dmlc::moodycamel::ConsumerToken consumer_token_t;
696 typedef typename Traits::size_t size_t;
698 static const size_t BLOCK_SIZE =
static_cast<size_t>(Traits::BLOCK_SIZE);
699 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD =
static_cast<size_t>(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD);
700 static const size_t EXPLICIT_INITIAL_INDEX_SIZE =
static_cast<size_t>(Traits::EXPLICIT_INITIAL_INDEX_SIZE);
701 static const size_t IMPLICIT_INITIAL_INDEX_SIZE =
static_cast<size_t>(Traits::IMPLICIT_INITIAL_INDEX_SIZE);
702 static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE =
static_cast<size_t>(Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE);
703 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE =
static_cast<std::uint32_t
>(Traits::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE);
705 #pragma warning(push) 706 #pragma warning(disable: 4307) // + integral constant overflow (that's what the ternary expression is for!) 707 #pragma warning(disable: 4309) // static_cast: Truncation of constant value 709 static const size_t MAX_SUBQUEUE_SIZE = (details::const_numeric_max<size_t>::value -
710 static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) <
711 BLOCK_SIZE) ? details::const_numeric_max<size_t>::value
713 (
static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) +
714 (BLOCK_SIZE - 1)) / BLOCK_SIZE * BLOCK_SIZE);
719 static_assert(!std::numeric_limits<size_t>::is_signed && std::is_integral<size_t>::value,
720 "Traits::size_t must be an unsigned integral type");
721 static_assert(!std::numeric_limits<index_t>::is_signed && std::is_integral<index_t>::value,
722 "Traits::index_t must be an unsigned integral type");
723 static_assert(
sizeof(index_t) >=
sizeof(
size_t),
724 "Traits::index_t must be at least as wide as Traits::size_t");
725 static_assert((BLOCK_SIZE > 1) && !(BLOCK_SIZE & (BLOCK_SIZE - 1)),
726 "Traits::BLOCK_SIZE must be a power of 2 (and at least 2)");
727 static_assert((EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD > 1) &&
728 !(EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD &
729 (EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD - 1)),
730 "Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD must be a power of 2 (and greater than 1)");
731 static_assert((EXPLICIT_INITIAL_INDEX_SIZE > 1) &&
732 !(EXPLICIT_INITIAL_INDEX_SIZE & (EXPLICIT_INITIAL_INDEX_SIZE - 1)),
733 "Traits::EXPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
734 static_assert((IMPLICIT_INITIAL_INDEX_SIZE > 1) &&
735 !(IMPLICIT_INITIAL_INDEX_SIZE & (IMPLICIT_INITIAL_INDEX_SIZE - 1)),
736 "Traits::IMPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
737 static_assert((INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) ||
738 !(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE & (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE - 1)),
739 "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be a power of 2");
741 INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0 || INITIAL_IMPLICIT_PRODUCER_HASH_SIZE >= 1,
742 "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be at least 1 (or 0 to disable implicit enqueueing)");
755 explicit ConcurrentQueue(
size_t capacity = 6 * BLOCK_SIZE)
756 : producerListTail(nullptr), producerCount(0), initialBlockPoolIndex(0), nextExplicitConsumerId(
757 0), globalExplicitConsumerOffset(0) {
758 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
759 populate_initial_implicit_producer_hash();
760 populate_initial_block_list(
761 capacity / BLOCK_SIZE + ((capacity & (BLOCK_SIZE - 1)) == 0 ? 0 : 1));
763 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG 768 explicitProducers.store(
nullptr, std::memory_order_relaxed);
769 implicitProducers.store(
nullptr, std::memory_order_relaxed);
776 ConcurrentQueue(
size_t minCapacity,
size_t maxExplicitProducers,
size_t maxImplicitProducers)
777 : producerListTail(nullptr), producerCount(0), initialBlockPoolIndex(0), nextExplicitConsumerId(
778 0), globalExplicitConsumerOffset(0) {
779 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
780 populate_initial_implicit_producer_hash();
782 (((minCapacity + BLOCK_SIZE - 1) / BLOCK_SIZE) - 1) * (maxExplicitProducers + 1) +
783 2 * (maxExplicitProducers + maxImplicitProducers);
784 populate_initial_block_list(blocks);
786 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG 787 explicitProducers.store(
nullptr, std::memory_order_relaxed);
788 implicitProducers.store(
nullptr, std::memory_order_relaxed);
797 auto ptr = producerListTail.load(std::memory_order_relaxed);
798 while (ptr !=
nullptr) {
799 auto next = ptr->next_prod();
800 if (ptr->token !=
nullptr) {
801 ptr->token->producer =
nullptr;
808 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE != 0) {
809 auto hash = implicitProducerHash.load(std::memory_order_relaxed);
810 while (hash !=
nullptr) {
811 auto prev = hash->prev;
814 for (
size_t i = 0; i != hash->capacity; ++i) {
815 hash->entries[i].~ImplicitProducerKVP();
817 hash->~ImplicitProducerHash();
818 (Traits::free)(hash);
825 auto block = freeList.head_unsafe();
826 while (block !=
nullptr) {
827 auto next = block->freeListNext.load(std::memory_order_relaxed);
828 if (block->dynamicallyAllocated) {
835 destroy_array(initialBlockPool, initialBlockPoolSize);
839 ConcurrentQueue(ConcurrentQueue
const &) MOODYCAMEL_DELETE_FUNCTION;
841 ConcurrentQueue &operator=(ConcurrentQueue const &) MOODYCAMEL_DELETE_FUNCTION;
849 ConcurrentQueue(ConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT
850 : producerListTail(other.producerListTail.load(
std::memory_order_relaxed)), producerCount(
851 other.producerCount.load(
std::memory_order_relaxed)), initialBlockPoolIndex(
852 other.initialBlockPoolIndex.load(
std::memory_order_relaxed)), initialBlockPool(
853 other.initialBlockPool), initialBlockPoolSize(other.initialBlockPoolSize), freeList(
854 std::move(other.freeList)), nextExplicitConsumerId(
855 other.nextExplicitConsumerId.load(
std::memory_order_relaxed)), globalExplicitConsumerOffset(
856 other.globalExplicitConsumerOffset.load(
std::memory_order_relaxed)) {
858 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
859 populate_initial_implicit_producer_hash();
860 swap_implicit_producer_hashes(other);
862 other.producerListTail.store(
nullptr, std::memory_order_relaxed);
863 other.producerCount.store(0, std::memory_order_relaxed);
864 other.nextExplicitConsumerId.store(0, std::memory_order_relaxed);
865 other.globalExplicitConsumerOffset.store(0, std::memory_order_relaxed);
867 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG 868 explicitProducers.store(other.explicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
869 other.explicitProducers.store(
nullptr, std::memory_order_relaxed);
870 implicitProducers.store(other.implicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
871 other.implicitProducers.store(
nullptr, std::memory_order_relaxed);
874 other.initialBlockPoolIndex.store(0, std::memory_order_relaxed);
875 other.initialBlockPoolSize = 0;
876 other.initialBlockPool =
nullptr;
881 inline ConcurrentQueue &operator=(ConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT {
882 return swap_internal(other);
890 inline void swap(ConcurrentQueue &other) MOODYCAMEL_NOEXCEPT {
891 swap_internal(other);
895 ConcurrentQueue &swap_internal(ConcurrentQueue &other) {
896 if (
this == &other) {
900 details::swap_relaxed(producerListTail, other.producerListTail);
901 details::swap_relaxed(producerCount, other.producerCount);
902 details::swap_relaxed(initialBlockPoolIndex, other.initialBlockPoolIndex);
903 std::swap(initialBlockPool, other.initialBlockPool);
904 std::swap(initialBlockPoolSize, other.initialBlockPoolSize);
905 freeList.swap(other.freeList);
906 details::swap_relaxed(nextExplicitConsumerId, other.nextExplicitConsumerId);
907 details::swap_relaxed(globalExplicitConsumerOffset, other.globalExplicitConsumerOffset);
909 swap_implicit_producer_hashes(other);
912 other.reown_producers();
914 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG 915 details::swap_relaxed(explicitProducers, other.explicitProducers);
916 details::swap_relaxed(implicitProducers, other.implicitProducers);
928 inline bool enqueue(T
const &item) {
929 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return false;
930 return inner_enqueue<CanAlloc>(item);
938 inline bool enqueue(T &&item) {
939 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return false;
940 return inner_enqueue<CanAlloc>(std::move(item));
947 inline bool enqueue(producer_token_t
const &token, T
const &item) {
948 return inner_enqueue<CanAlloc>(token, item);
955 inline bool enqueue(producer_token_t
const &token, T &&item) {
956 return inner_enqueue<CanAlloc>(token, std::move(item));
965 template<
typename It>
966 bool enqueue_bulk(It itemFirst,
size_t count) {
967 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return false;
968 return inner_enqueue_bulk<CanAlloc>(itemFirst, count);
977 template<
typename It>
978 bool enqueue_bulk(producer_token_t
const &token, It itemFirst,
size_t count) {
979 return inner_enqueue_bulk<CanAlloc>(token, itemFirst, count);
987 inline bool try_enqueue(T
const &item) {
988 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return false;
989 return inner_enqueue<CannotAlloc>(item);
997 inline bool try_enqueue(T &&item) {
998 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return false;
999 return inner_enqueue<CannotAlloc>(std::move(item));
1005 inline bool try_enqueue(producer_token_t
const &token, T
const &item) {
1006 return inner_enqueue<CannotAlloc>(token, item);
1012 inline bool try_enqueue(producer_token_t
const &token, T &&item) {
1013 return inner_enqueue<CannotAlloc>(token, std::move(item));
1023 template<
typename It>
1024 bool try_enqueue_bulk(It itemFirst,
size_t count) {
1025 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return false;
1026 return inner_enqueue_bulk<CannotAlloc>(itemFirst, count);
1034 template<
typename It>
1035 bool try_enqueue_bulk(producer_token_t
const &token, It itemFirst,
size_t count) {
1036 return inner_enqueue_bulk<CannotAlloc>(token, itemFirst, count);
1044 template<
typename U>
1045 bool try_dequeue(U &item) {
1048 size_t nonEmptyCount = 0;
1049 ProducerBase *best =
nullptr;
1050 size_t bestSize = 0;
1051 for (
auto ptr = producerListTail.load(std::memory_order_acquire);
1052 nonEmptyCount < 3 && ptr !=
nullptr; ptr = ptr->next_prod()) {
1053 auto size = ptr->size_approx();
1055 if (size > bestSize) {
1065 if (nonEmptyCount > 0) {
1066 if (details::likely(best->dequeue(item))) {
1069 for (
auto ptr = producerListTail.load(std::memory_order_acquire);
1070 ptr !=
nullptr; ptr = ptr->next_prod()) {
1071 if (ptr != best && ptr->dequeue(item)) {
1088 template<
typename U>
1089 bool try_dequeue_non_interleaved(U &item) {
1090 for (
auto ptr = producerListTail.load(std::memory_order_acquire);
1091 ptr !=
nullptr; ptr = ptr->next_prod()) {
1092 if (ptr->dequeue(item)) {
1103 template<
typename U>
1104 bool try_dequeue(consumer_token_t &token, U &item) {
1111 if (token.desiredProducer ==
nullptr || token.lastKnownGlobalOffset !=
1112 globalExplicitConsumerOffset.load(
1113 std::memory_order_relaxed)) {
1114 if (!update_current_producer_after_rotation(token)) {
1121 if (static_cast<ProducerBase *>(token.currentProducer)->dequeue(item)) {
1122 if (++token.itemsConsumedFromCurrent == EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
1123 globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1128 auto tail = producerListTail.load(std::memory_order_acquire);
1129 auto ptr =
static_cast<ProducerBase *
>(token.currentProducer)->next_prod();
1130 if (ptr ==
nullptr) {
1133 while (ptr != static_cast<ProducerBase *>(token.currentProducer)) {
1134 if (ptr->dequeue(item)) {
1135 token.currentProducer = ptr;
1136 token.itemsConsumedFromCurrent = 1;
1139 ptr = ptr->next_prod();
1140 if (ptr ==
nullptr) {
1152 template<
typename It>
1153 size_t try_dequeue_bulk(It itemFirst,
size_t max) {
1155 for (
auto ptr = producerListTail.load(std::memory_order_acquire);
1156 ptr !=
nullptr; ptr = ptr->next_prod()) {
1157 count += ptr->dequeue_bulk(itemFirst, max - count);
1170 template<
typename It>
1171 size_t try_dequeue_bulk(consumer_token_t &token, It itemFirst,
size_t max) {
1172 if (token.desiredProducer ==
nullptr || token.lastKnownGlobalOffset !=
1173 globalExplicitConsumerOffset.load(
1174 std::memory_order_relaxed)) {
1175 if (!update_current_producer_after_rotation(token)) {
1180 size_t count =
static_cast<ProducerBase *
>(token.currentProducer)->dequeue_bulk(itemFirst, max);
1182 if ((token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(max)) >=
1183 EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
1184 globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1188 token.itemsConsumedFromCurrent +=
static_cast<std::uint32_t
>(count);
1191 auto tail = producerListTail.load(std::memory_order_acquire);
1192 auto ptr =
static_cast<ProducerBase *
>(token.currentProducer)->next_prod();
1193 if (ptr ==
nullptr) {
1196 while (ptr != static_cast<ProducerBase *>(token.currentProducer)) {
1197 auto dequeued = ptr->dequeue_bulk(itemFirst, max);
1199 if (dequeued != 0) {
1200 token.currentProducer = ptr;
1201 token.itemsConsumedFromCurrent =
static_cast<std::uint32_t
>(dequeued);
1203 if (dequeued == max) {
1207 ptr = ptr->next_prod();
1208 if (ptr ==
nullptr) {
1222 template<
typename U>
1223 inline bool try_dequeue_from_producer(producer_token_t
const &producer, U &item) {
1224 return static_cast<ExplicitProducer *
>(producer.producer)->dequeue(item);
1234 template<
typename It>
1236 try_dequeue_bulk_from_producer(producer_token_t
const &producer, It itemFirst,
size_t max) {
1237 return static_cast<ExplicitProducer *
>(producer.producer)->dequeue_bulk(itemFirst, max);
1247 size_t size_approx()
const {
1249 for (
auto ptr = producerListTail.load(std::memory_order_acquire);
1250 ptr !=
nullptr; ptr = ptr->next_prod()) {
1251 size += ptr->size_approx();
1260 static bool is_lock_free() {
1262 details::static_is_lock_free<bool>::value == 2 &&
1263 details::static_is_lock_free<size_t>::value == 2 &&
1264 details::static_is_lock_free<std::uint32_t>::value == 2 &&
1265 details::static_is_lock_free<index_t>::value == 2 &&
1266 details::static_is_lock_free<void *>::value == 2 &&
1267 details::static_is_lock_free<typename details::thread_id_converter<details::thread_id_t>::thread_id_numeric_size_t>::value ==
1273 friend struct ProducerToken;
1274 friend struct ConsumerToken;
1275 friend struct ExplicitProducer;
1277 friend class ConcurrentQueueTests;
1279 enum AllocationMode {
1280 CanAlloc, CannotAlloc
1288 template<AllocationMode canAlloc,
typename U>
1289 inline bool inner_enqueue(producer_token_t
const &token, U &&element) {
1290 return static_cast<ExplicitProducer *
>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue<canAlloc>(
1291 std::forward<U>(element));
1294 template<AllocationMode canAlloc,
typename U>
1295 inline bool inner_enqueue(U &&element) {
1296 auto producer = get_or_add_implicit_producer();
1297 return producer ==
nullptr ?
false 1298 : producer->ConcurrentQueue::ImplicitProducer::template enqueue<canAlloc>(
1299 std::forward<U>(element));
1302 template<AllocationMode canAlloc,
typename It>
1303 inline bool inner_enqueue_bulk(producer_token_t
const &token, It itemFirst,
size_t count) {
1304 return static_cast<ExplicitProducer *
>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue_bulk<canAlloc>(
1308 template<AllocationMode canAlloc,
typename It>
1309 inline bool inner_enqueue_bulk(It itemFirst,
size_t count) {
1310 auto producer = get_or_add_implicit_producer();
1311 return producer ==
nullptr ?
false 1312 : producer->ConcurrentQueue::ImplicitProducer::template enqueue_bulk<canAlloc>(
1316 inline bool update_current_producer_after_rotation(consumer_token_t &token) {
1318 auto tail = producerListTail.load(std::memory_order_acquire);
1319 if (token.desiredProducer ==
nullptr && tail ==
nullptr) {
1322 auto prodCount = producerCount.load(std::memory_order_relaxed);
1323 auto globalOffset = globalExplicitConsumerOffset.load(std::memory_order_relaxed);
1324 if (details::unlikely(token.desiredProducer ==
nullptr)) {
1328 std::uint32_t offset = prodCount - 1 - (token.initialOffset % prodCount);
1329 token.desiredProducer = tail;
1330 for (std::uint32_t i = 0; i != offset; ++i) {
1331 token.desiredProducer =
static_cast<ProducerBase *
>(token.desiredProducer)->next_prod();
1332 if (token.desiredProducer ==
nullptr) {
1333 token.desiredProducer = tail;
1338 std::uint32_t delta = globalOffset - token.lastKnownGlobalOffset;
1339 if (delta >= prodCount) {
1340 delta = delta % prodCount;
1342 for (std::uint32_t i = 0; i != delta; ++i) {
1343 token.desiredProducer =
static_cast<ProducerBase *
>(token.desiredProducer)->next_prod();
1344 if (token.desiredProducer ==
nullptr) {
1345 token.desiredProducer = tail;
1349 token.lastKnownGlobalOffset = globalOffset;
1350 token.currentProducer = token.desiredProducer;
1351 token.itemsConsumedFromCurrent = 0;
1360 template<
typename N>
1361 struct FreeListNode {
1363 : freeListRefs(0), freeListNext(nullptr) {}
1365 std::atomic<std::uint32_t> freeListRefs;
1366 std::atomic<N *> freeListNext;
1372 template<
typename N>
1375 : freeListHead(nullptr) {}
1377 FreeList(FreeList &&other)
1378 : freeListHead(other.freeListHead.load(
std::memory_order_relaxed)) {
1379 other.freeListHead.store(
nullptr, std::memory_order_relaxed);
1382 void swap(FreeList &other) { details::swap_relaxed(freeListHead, other.freeListHead); }
1384 FreeList(FreeList
const &) MOODYCAMEL_DELETE_FUNCTION;
1386 FreeList &operator=(FreeList const &) MOODYCAMEL_DELETE_FUNCTION;
1388 inline
void add(N *node) {
1389 #if MCDBGQ_NOLOCKFREE_FREELIST 1390 debug::DebugLock lock(mutex);
1394 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) {
1397 add_knowing_refcount_is_zero(node);
1401 inline N *try_get() {
1402 #if MCDBGQ_NOLOCKFREE_FREELIST 1403 debug::DebugLock lock(mutex);
1405 auto head = freeListHead.load(std::memory_order_acquire);
1406 while (head !=
nullptr) {
1407 auto prevHead = head;
1408 auto refs = head->freeListRefs.load(std::memory_order_relaxed);
1409 if ((refs & REFS_MASK) == 0 ||
1410 !head->freeListRefs.compare_exchange_strong(refs, refs + 1, std::memory_order_acquire,
1411 std::memory_order_relaxed)) {
1412 head = freeListHead.load(std::memory_order_acquire);
1418 auto next = head->freeListNext.load(std::memory_order_relaxed);
1419 if (freeListHead.compare_exchange_strong(head, next, std::memory_order_acquire,
1420 std::memory_order_relaxed)) {
1423 assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0);
1426 head->freeListRefs.fetch_sub(2, std::memory_order_release);
1433 refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel);
1434 if (refs == SHOULD_BE_ON_FREELIST + 1) {
1435 add_knowing_refcount_is_zero(prevHead);
1443 N *head_unsafe()
const {
return freeListHead.load(std::memory_order_relaxed); }
1446 inline void add_knowing_refcount_is_zero(N *node) {
1455 auto head = freeListHead.load(std::memory_order_relaxed);
1457 node->freeListNext.store(head, std::memory_order_relaxed);
1458 node->freeListRefs.store(1, std::memory_order_release);
1459 if (!freeListHead.compare_exchange_strong(head, node, std::memory_order_release,
1460 std::memory_order_relaxed)) {
1462 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_release) ==
1473 std::atomic<N *> freeListHead;
1475 static const std::uint32_t REFS_MASK = 0x7FFFFFFF;
1476 static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000;
1478 #if MCDBGQ_NOLOCKFREE_FREELIST 1479 debug::DebugMutex mutex;
1488 enum InnerQueueContext {
1489 implicit_context = 0, explicit_context = 1
1494 : next(nullptr), elementsCompletelyDequeued(0), freeListRefs(0), freeListNext(nullptr)
1495 , shouldBeOnFreeList(false), dynamicallyAllocated(true) {
1501 template<InnerQueueContext context>
1502 inline bool is_empty()
const {
1503 if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1505 for (
size_t i = 0; i < BLOCK_SIZE; ++i) {
1506 if (!emptyFlags[i].load(std::memory_order_relaxed)) {
1512 std::atomic_thread_fence(std::memory_order_acquire);
1516 if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == BLOCK_SIZE) {
1517 std::atomic_thread_fence(std::memory_order_acquire);
1520 assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <= BLOCK_SIZE);
1526 template<InnerQueueContext context>
1527 inline bool set_empty(index_t i) {
1528 if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1530 assert(!emptyFlags[BLOCK_SIZE - 1 -
1531 static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].load(
1532 std::memory_order_relaxed));
1533 emptyFlags[BLOCK_SIZE - 1 -
1534 static_cast<size_t>(i &
static_cast<index_t
>(BLOCK_SIZE - 1))].store(
true,
1535 std::memory_order_release);
1539 auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_release);
1540 assert(prevVal < BLOCK_SIZE);
1541 return prevVal == BLOCK_SIZE - 1;
1547 template<InnerQueueContext context>
1548 inline bool set_many_empty(index_t i,
size_t count) {
1549 if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1551 std::atomic_thread_fence(std::memory_order_release);
1552 i = BLOCK_SIZE - 1 -
static_cast<size_t>(i &
static_cast<index_t
>(BLOCK_SIZE - 1)) - count +
1554 for (
size_t j = 0; j != count; ++j) {
1555 assert(!emptyFlags[i + j].load(std::memory_order_relaxed));
1556 emptyFlags[i + j].store(
true, std::memory_order_relaxed);
1561 auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_release);
1562 assert(prevVal + count <= BLOCK_SIZE);
1563 return prevVal + count == BLOCK_SIZE;
1567 template<InnerQueueContext context>
1568 inline void set_all_empty() {
1569 if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1571 for (
size_t i = 0; i != BLOCK_SIZE; ++i) {
1572 emptyFlags[i].store(
true, std::memory_order_relaxed);
1576 elementsCompletelyDequeued.store(BLOCK_SIZE, std::memory_order_relaxed);
1580 template<InnerQueueContext context>
1581 inline void reset_empty() {
1582 if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1584 for (
size_t i = 0; i != BLOCK_SIZE; ++i) {
1585 emptyFlags[i].store(
false, std::memory_order_relaxed);
1589 elementsCompletelyDequeued.store(0, std::memory_order_relaxed);
1593 inline T *operator[](index_t idx) MOODYCAMEL_NOEXCEPT {
1594 return static_cast<T *
>(
static_cast<void *
>(elements)) +
1595 static_cast<size_t>(idx &
static_cast<index_t
>(BLOCK_SIZE - 1));
1598 inline T
const *operator[](index_t idx)
const MOODYCAMEL_NOEXCEPT {
1599 return static_cast<T
const *
>(
static_cast<void const *
>(elements)) +
1600 static_cast<size_t>(idx &
static_cast<index_t
>(BLOCK_SIZE - 1));
1609 static_assert(std::alignment_of<T>::value <= std::alignment_of<details::max_align_t>::value,
1610 "The queue does not support super-aligned types at this time");
1616 char elements[
sizeof(T) * BLOCK_SIZE];
1617 details::max_align_t dummy;
1621 std::atomic<size_t> elementsCompletelyDequeued;
1622 std::atomic<bool> emptyFlags[
1623 BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? BLOCK_SIZE : 1];
1625 std::atomic<std::uint32_t> freeListRefs;
1626 std::atomic<Block *> freeListNext;
1627 std::atomic<bool> shouldBeOnFreeList;
1628 bool dynamicallyAllocated;
1635 static_assert(std::alignment_of<Block>::value >= std::alignment_of<details::max_align_t>::value,
1636 "Internal error: Blocks must be at least as aligned as the type they are wrapping");
1649 struct ProducerBase :
public details::ConcurrentQueueProducerTypelessBase {
1650 ProducerBase(ConcurrentQueue *parent_,
bool isExplicit_)
1652 tailIndex(0), headIndex(0), dequeueOptimisticCount(0), dequeueOvercommit(0), tailBlock(
1653 nullptr), isExplicit(isExplicit_), parent(parent_) {
1656 virtual ~ProducerBase() {};
1658 template<
typename U>
1659 inline bool dequeue(U &element) {
1661 return static_cast<ExplicitProducer *
>(
this)->dequeue(element);
1663 return static_cast<ImplicitProducer *
>(
this)->dequeue(element);
1667 template<
typename It>
1668 inline size_t dequeue_bulk(It &itemFirst,
size_t max) {
1670 return static_cast<ExplicitProducer *
>(
this)->dequeue_bulk(itemFirst, max);
1672 return static_cast<ImplicitProducer *
>(
this)->dequeue_bulk(itemFirst, max);
1676 inline ProducerBase *next_prod()
const {
return static_cast<ProducerBase *
>(next); }
1678 inline size_t size_approx()
const {
1679 auto tail = tailIndex.load(std::memory_order_relaxed);
1680 auto head = headIndex.load(std::memory_order_relaxed);
1681 return details::circular_less_than(head, tail) ?
static_cast<size_t>(tail - head) : 0;
1684 inline index_t getTail()
const {
return tailIndex.load(std::memory_order_relaxed); }
1687 std::atomic<index_t> tailIndex;
1688 std::atomic<index_t> headIndex;
1690 std::atomic<index_t> dequeueOptimisticCount;
1691 std::atomic<index_t> dequeueOvercommit;
1697 ConcurrentQueue *parent;
1701 friend struct MemStats;
1710 struct ExplicitProducer :
public ProducerBase {
1711 explicit ExplicitProducer(ConcurrentQueue *parent)
1713 ProducerBase(parent, true), blockIndex(nullptr), pr_blockIndexSlotsUsed(0), pr_blockIndexSize(
1714 EXPLICIT_INITIAL_INDEX_SIZE >> 1), pr_blockIndexFront(0), pr_blockIndexEntries(nullptr)
1715 , pr_blockIndexRaw(nullptr) {
1716 size_t poolBasedIndexSize = details::ceil_to_pow_2(parent->initialBlockPoolSize) >> 1;
1717 if (poolBasedIndexSize > pr_blockIndexSize) {
1718 pr_blockIndexSize = poolBasedIndexSize;
1725 ~ExplicitProducer() {
1729 if (this->tailBlock !=
nullptr) {
1731 Block *halfDequeuedBlock =
nullptr;
1732 if ((this->headIndex.load(std::memory_order_relaxed) &
1733 static_cast<index_t
>(BLOCK_SIZE - 1)) != 0) {
1736 size_t i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & (pr_blockIndexSize - 1);
1737 while (details::circular_less_than<index_t>(pr_blockIndexEntries[i].base + BLOCK_SIZE,
1738 this->headIndex.load(
1739 std::memory_order_relaxed))) {
1740 i = (i + 1) & (pr_blockIndexSize - 1);
1742 assert(details::circular_less_than<index_t>(pr_blockIndexEntries[i].base,
1743 this->headIndex.load(
1744 std::memory_order_relaxed)));
1745 halfDequeuedBlock = pr_blockIndexEntries[i].block;
1749 auto block = this->tailBlock;
1751 block = block->next;
1752 if (block->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1757 if (block == halfDequeuedBlock) {
1758 i =
static_cast<size_t>(this->headIndex.load(std::memory_order_relaxed) &
1759 static_cast<index_t
>(BLOCK_SIZE - 1));
1763 auto lastValidIndex = (this->tailIndex.load(std::memory_order_relaxed) &
1764 static_cast<index_t
>(BLOCK_SIZE - 1)) == 0 ? BLOCK_SIZE
1765 :
static_cast<size_t>(
1766 this->tailIndex.load(std::memory_order_relaxed) &
1767 static_cast<index_t
>(BLOCK_SIZE - 1));
1768 while (i != BLOCK_SIZE && (block != this->tailBlock || i != lastValidIndex)) {
1769 (*block)[i++]->~T();
1771 }
while (block != this->tailBlock);
1775 if (this->tailBlock !=
nullptr) {
1776 auto block = this->tailBlock;
1778 auto nextBlock = block->next;
1779 if (block->dynamicallyAllocated) {
1782 this->parent->add_block_to_free_list(block);
1785 }
while (block != this->tailBlock);
1789 auto header =
static_cast<BlockIndexHeader *
>(pr_blockIndexRaw);
1790 while (header !=
nullptr) {
1791 auto prev =
static_cast<BlockIndexHeader *
>(header->prev);
1792 header->~BlockIndexHeader();
1793 (Traits::free)(header);
1798 template<AllocationMode allocMode,
typename U>
1799 inline bool enqueue(U &&element) {
1800 index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
1801 index_t newTailIndex = 1 + currentTailIndex;
1802 if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
1804 auto startBlock = this->tailBlock;
1805 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
1806 if (this->tailBlock !=
nullptr &&
1807 this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1809 this->tailBlock = this->tailBlock->next;
1810 this->tailBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1821 auto head = this->headIndex.load(std::memory_order_relaxed);
1822 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
1823 if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE)
1824 || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value &&
1825 (MAX_SUBQUEUE_SIZE == 0 ||
1826 MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
1833 if (pr_blockIndexRaw ==
nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize) {
1838 if (allocMode == CannotAlloc || !new_block_index(pr_blockIndexSlotsUsed)) {
1844 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
1845 if (newBlock ==
nullptr) {
1849 newBlock->owner =
this;
1851 newBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1852 if (this->tailBlock ==
nullptr) {
1853 newBlock->next = newBlock;
1855 newBlock->next = this->tailBlock->next;
1856 this->tailBlock->next = newBlock;
1858 this->tailBlock = newBlock;
1859 ++pr_blockIndexSlotsUsed;
1862 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U,
new(
nullptr) T(std::forward<U>(element)))) {
1866 new((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
1868 MOODYCAMEL_CATCH (...) {
1871 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
1872 this->tailBlock = startBlock ==
nullptr ? this->tailBlock : startBlock;
1877 (void) originalBlockIndexSlotsUsed;
1881 auto &entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
1882 entry.base = currentTailIndex;
1883 entry.block = this->tailBlock;
1884 blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront,
1885 std::memory_order_release);
1886 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
1888 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U,
new(
nullptr) T(std::forward<U>(element)))) {
1889 this->tailIndex.store(newTailIndex, std::memory_order_release);
1895 new((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
1897 this->tailIndex.store(newTailIndex, std::memory_order_release);
1901 template<
typename U>
1902 bool dequeue(U &element) {
1903 auto tail = this->tailIndex.load(std::memory_order_relaxed);
1904 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
1905 if (details::circular_less_than<index_t>(
1906 this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
1923 std::atomic_thread_fence(std::memory_order_acquire);
1926 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
1932 assert(overcommit <= myDequeueCount);
1937 tail = this->tailIndex.load(std::memory_order_acquire);
1938 if (details::likely(
1939 details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
1950 auto index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
1955 auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
1956 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
1961 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
1962 auto blockBaseIndex = index & ~static_cast<index_t>(BLOCK_SIZE - 1);
1963 auto offset =
static_cast<size_t>(
1964 static_cast<typename std::make_signed<index_t>::type
>(blockBaseIndex - headBase) /
1966 auto block = localBlockIndex->entries[(localBlockIndexHead + offset) &
1967 (localBlockIndex->size - 1)].block;
1970 auto &el = *((*block)[index]);
1971 if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T &&, element = std::move(el))) {
1979 (*block)[index]->~T();
1980 block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
1982 } guard = {block, index};
1984 element = std::move(el);
1986 element = std::move(el);
1988 block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
1994 this->dequeueOvercommit.fetch_add(1,
1995 std::memory_order_release);
2002 template<AllocationMode allocMode,
typename It>
2003 bool enqueue_bulk(It itemFirst,
size_t count) {
2007 index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2008 auto startBlock = this->tailBlock;
2009 auto originalBlockIndexFront = pr_blockIndexFront;
2010 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
2012 Block *firstAllocatedBlock =
nullptr;
2015 size_t blockBaseDiff =
2016 ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) -
2017 ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2018 index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2019 if (blockBaseDiff > 0) {
2021 while (blockBaseDiff > 0 && this->tailBlock !=
nullptr &&
2022 this->tailBlock->next != firstAllocatedBlock &&
2023 this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
2024 blockBaseDiff -=
static_cast<index_t
>(BLOCK_SIZE);
2025 currentTailIndex +=
static_cast<index_t
>(BLOCK_SIZE);
2027 this->tailBlock = this->tailBlock->next;
2028 firstAllocatedBlock =
2029 firstAllocatedBlock ==
nullptr ? this->tailBlock : firstAllocatedBlock;
2031 auto &entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2032 entry.base = currentTailIndex;
2033 entry.block = this->tailBlock;
2034 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2038 while (blockBaseDiff > 0) {
2039 blockBaseDiff -=
static_cast<index_t
>(BLOCK_SIZE);
2040 currentTailIndex +=
static_cast<index_t
>(BLOCK_SIZE);
2042 auto head = this->headIndex.load(std::memory_order_relaxed);
2043 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2044 bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) ||
2045 (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value &&
2046 (MAX_SUBQUEUE_SIZE == 0 ||
2047 MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2048 if (pr_blockIndexRaw ==
nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize || full) {
2049 if (allocMode == CannotAlloc || full || !new_block_index(originalBlockIndexSlotsUsed)) {
2051 pr_blockIndexFront = originalBlockIndexFront;
2052 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2053 this->tailBlock = startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2060 originalBlockIndexFront = originalBlockIndexSlotsUsed;
2064 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2065 if (newBlock ==
nullptr) {
2066 pr_blockIndexFront = originalBlockIndexFront;
2067 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2068 this->tailBlock = startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2073 newBlock->owner =
this;
2075 newBlock->ConcurrentQueue::Block::template set_all_empty<explicit_context>();
2076 if (this->tailBlock ==
nullptr) {
2077 newBlock->next = newBlock;
2079 newBlock->next = this->tailBlock->next;
2080 this->tailBlock->next = newBlock;
2082 this->tailBlock = newBlock;
2083 firstAllocatedBlock =
2084 firstAllocatedBlock ==
nullptr ? this->tailBlock : firstAllocatedBlock;
2086 ++pr_blockIndexSlotsUsed;
2088 auto &entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2089 entry.base = currentTailIndex;
2090 entry.block = this->tailBlock;
2091 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2096 auto block = firstAllocatedBlock;
2098 block->ConcurrentQueue::Block::template reset_empty<explicit_context>();
2099 if (block == this->tailBlock) {
2102 block = block->next;
2105 if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst),
2106 new(
nullptr) T(details::deref_noexcept(itemFirst)))) {
2107 blockIndex.load(std::memory_order_relaxed)->front.store(
2108 (pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2113 index_t newTailIndex = startTailIndex +
static_cast<index_t
>(count);
2114 currentTailIndex = startTailIndex;
2115 auto endBlock = this->tailBlock;
2116 this->tailBlock = startBlock;
2117 assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 ||
2118 firstAllocatedBlock !=
nullptr || count == 0);
2119 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 &&
2120 firstAllocatedBlock !=
nullptr) {
2121 this->tailBlock = firstAllocatedBlock;
2124 auto stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2125 static_cast<index_t
>(BLOCK_SIZE);
2126 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2127 stopIndex = newTailIndex;
2129 if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst),
2130 new(
nullptr) T(details::deref_noexcept(itemFirst)))) {
2131 while (currentTailIndex != stopIndex) {
2132 new((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2136 while (currentTailIndex != stopIndex) {
2144 new((*this->tailBlock)[currentTailIndex]) T(
2145 details::nomove_if<(
bool) !MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst),
2147 details::deref_noexcept(
2148 itemFirst)))>::eval(
2154 MOODYCAMEL_CATCH (...) {
2158 auto constructedStopIndex = currentTailIndex;
2159 auto lastBlockEnqueued = this->tailBlock;
2161 pr_blockIndexFront = originalBlockIndexFront;
2162 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2163 this->tailBlock = startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2165 if (!details::is_trivially_destructible<T>::value) {
2166 auto block = startBlock;
2167 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2168 block = firstAllocatedBlock;
2170 currentTailIndex = startTailIndex;
2172 stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2173 static_cast<index_t
>(BLOCK_SIZE);
2174 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2175 stopIndex = constructedStopIndex;
2177 while (currentTailIndex != stopIndex) {
2178 (*block)[currentTailIndex++]->~T();
2180 if (block == lastBlockEnqueued) {
2183 block = block->next;
2190 if (this->tailBlock == endBlock) {
2191 assert(currentTailIndex == newTailIndex);
2194 this->tailBlock = this->tailBlock->next;
2197 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst),
2198 new(
nullptr) T(details::deref_noexcept(itemFirst))) &&
2199 firstAllocatedBlock !=
nullptr) {
2200 blockIndex.load(std::memory_order_relaxed)->front.store(
2201 (pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2204 this->tailIndex.store(newTailIndex, std::memory_order_release);
2208 template<
typename It>
2209 size_t dequeue_bulk(It &itemFirst,
size_t max) {
2210 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2211 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2212 auto desiredCount =
static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(
2213 std::memory_order_relaxed) - overcommit));
2214 if (details::circular_less_than<size_t>(0, desiredCount)) {
2215 desiredCount = desiredCount < max ? desiredCount : max;
2216 std::atomic_thread_fence(std::memory_order_acquire);
2218 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount,
2219 std::memory_order_relaxed);
2220 assert(overcommit <= myDequeueCount);
2222 tail = this->tailIndex.load(std::memory_order_acquire);
2223 auto actualCount =
static_cast<size_t>(tail - (myDequeueCount - overcommit));
2224 if (details::circular_less_than<size_t>(0, actualCount)) {
2225 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2226 if (actualCount < desiredCount) {
2227 this->dequeueOvercommit.fetch_add(desiredCount - actualCount,
2228 std::memory_order_release);
2233 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2236 auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
2237 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
2239 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2240 auto firstBlockBaseIndex = firstIndex & ~static_cast<index_t>(BLOCK_SIZE - 1);
2241 auto offset =
static_cast<size_t>(
2242 static_cast<typename std::make_signed<index_t>::type
>(firstBlockBaseIndex - headBase) /
2244 auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1);
2247 auto index = firstIndex;
2249 auto firstIndexInBlock = index;
2251 (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
static_cast<index_t
>(BLOCK_SIZE);
2252 endIndex = details::circular_less_than<index_t>(
2253 firstIndex +
static_cast<index_t
>(actualCount), endIndex) ? firstIndex +
2254 static_cast<index_t
>(actualCount)
2256 auto block = localBlockIndex->entries[indexIndex].block;
2257 if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T &&, details::deref_noexcept(itemFirst) = std::move(
2258 (*(*block)[index])))) {
2259 while (index != endIndex) {
2260 auto &el = *((*block)[index]);
2261 *itemFirst++ = std::move(el);
2267 while (index != endIndex) {
2268 auto &el = *((*block)[index]);
2269 *itemFirst = std::move(el);
2275 MOODYCAMEL_CATCH (...) {
2280 block = localBlockIndex->entries[indexIndex].block;
2281 while (index != endIndex) {
2282 (*block)[index++]->~T();
2284 block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(
2285 firstIndexInBlock,
static_cast<size_t>(endIndex - firstIndexInBlock));
2286 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2288 firstIndexInBlock = index;
2289 endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2290 static_cast<index_t
>(BLOCK_SIZE);
2291 endIndex = details::circular_less_than<index_t>(
2292 firstIndex +
static_cast<index_t
>(actualCount), endIndex) ? firstIndex +
2293 static_cast<index_t
>(actualCount)
2295 }
while (index != firstIndex + actualCount);
2300 block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(
2301 firstIndexInBlock,
static_cast<size_t>(endIndex - firstIndexInBlock));
2302 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2303 }
while (index != firstIndex + actualCount);
2308 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2316 struct BlockIndexEntry {
2321 struct BlockIndexHeader {
2323 std::atomic<size_t> front;
2324 BlockIndexEntry *entries;
2329 bool new_block_index(
size_t numberOfFilledSlotsToExpose) {
2330 auto prevBlockSizeMask = pr_blockIndexSize - 1;
2333 pr_blockIndexSize <<= 1;
2334 auto newRawPtr =
static_cast<char *
>((Traits::malloc)(
2335 sizeof(BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value - 1 +
2336 sizeof(BlockIndexEntry) * pr_blockIndexSize));
2337 if (newRawPtr ==
nullptr) {
2338 pr_blockIndexSize >>= 1;
2342 auto newBlockIndexEntries =
reinterpret_cast<BlockIndexEntry *
>(details::align_for<BlockIndexEntry>(
2343 newRawPtr +
sizeof(BlockIndexHeader)));
2347 if (pr_blockIndexSlotsUsed != 0) {
2348 auto i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & prevBlockSizeMask;
2350 newBlockIndexEntries[j++] = pr_blockIndexEntries[i];
2351 i = (i + 1) & prevBlockSizeMask;
2352 }
while (i != pr_blockIndexFront);
2356 auto header =
new(newRawPtr) BlockIndexHeader;
2357 header->size = pr_blockIndexSize;
2358 header->front.store(numberOfFilledSlotsToExpose - 1, std::memory_order_relaxed);
2359 header->entries = newBlockIndexEntries;
2360 header->prev = pr_blockIndexRaw;
2362 pr_blockIndexFront = j;
2363 pr_blockIndexEntries = newBlockIndexEntries;
2364 pr_blockIndexRaw = newRawPtr;
2365 blockIndex.store(header, std::memory_order_release);
2371 std::atomic<BlockIndexHeader *> blockIndex;
2374 size_t pr_blockIndexSlotsUsed;
2375 size_t pr_blockIndexSize;
2376 size_t pr_blockIndexFront;
2377 BlockIndexEntry *pr_blockIndexEntries;
2378 void *pr_blockIndexRaw;
2380 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG 2382 ExplicitProducer* nextExplicitProducer;
2387 friend struct MemStats;
2396 struct ImplicitProducer :
public ProducerBase {
2397 ImplicitProducer(ConcurrentQueue *parent)
2399 ProducerBase(parent, false), nextBlockIndexCapacity(IMPLICIT_INITIAL_INDEX_SIZE), blockIndex(
2404 ~ImplicitProducer() {
2410 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 2412 if (!this->inactive.load(std::memory_order_relaxed)) {
2413 details::ThreadExitNotifier::unsubscribe(&threadExitListener);
2418 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2419 auto index = this->headIndex.load(std::memory_order_relaxed);
2420 Block *block =
nullptr;
2421 assert(index == tail || details::circular_less_than(index, tail));
2422 bool forceFreeLastBlock =
2424 while (index != tail) {
2425 if ((index & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 || block ==
nullptr) {
2426 if (block !=
nullptr) {
2428 this->parent->add_block_to_free_list(block);
2431 block = get_block_index_entry_for_index(index)->value.load(std::memory_order_relaxed);
2434 ((*block)[index])->~T();
2440 if (this->tailBlock !=
nullptr &&
2441 (forceFreeLastBlock || (tail & static_cast<index_t>(BLOCK_SIZE - 1)) != 0)) {
2442 this->parent->add_block_to_free_list(this->tailBlock);
2446 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2447 if (localBlockIndex !=
nullptr) {
2448 for (
size_t i = 0; i != localBlockIndex->capacity; ++i) {
2449 localBlockIndex->index[i]->~BlockIndexEntry();
2452 auto prev = localBlockIndex->prev;
2453 localBlockIndex->~BlockIndexHeader();
2454 (Traits::free)(localBlockIndex);
2455 localBlockIndex = prev;
2456 }
while (localBlockIndex !=
nullptr);
2460 template<AllocationMode allocMode,
typename U>
2461 inline bool enqueue(U &&element) {
2462 index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2463 index_t newTailIndex = 1 + currentTailIndex;
2464 if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2466 auto head = this->headIndex.load(std::memory_order_relaxed);
2467 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2468 if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) ||
2469 (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value &&
2470 (MAX_SUBQUEUE_SIZE == 0 ||
2471 MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
2474 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX 2475 debug::DebugLock lock(mutex);
2478 BlockIndexEntry *idxEntry;
2479 if (!insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) {
2484 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2485 if (newBlock ==
nullptr) {
2486 rewind_block_index_tail();
2487 idxEntry->value.store(
nullptr, std::memory_order_relaxed);
2491 newBlock->owner =
this;
2493 newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2495 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U,
new(
nullptr) T(std::forward<U>(element)))) {
2498 new((*newBlock)[currentTailIndex]) T(std::forward<U>(element));
2500 MOODYCAMEL_CATCH (...) {
2501 rewind_block_index_tail();
2502 idxEntry->value.store(
nullptr, std::memory_order_relaxed);
2503 this->parent->add_block_to_free_list(newBlock);
2509 idxEntry->value.store(newBlock, std::memory_order_relaxed);
2511 this->tailBlock = newBlock;
2513 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U,
new(
nullptr) T(std::forward<U>(element)))) {
2514 this->tailIndex.store(newTailIndex, std::memory_order_release);
2520 new((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2522 this->tailIndex.store(newTailIndex, std::memory_order_release);
2526 template<
typename U>
2527 bool dequeue(U &element) {
2529 index_t tail = this->tailIndex.load(std::memory_order_relaxed);
2530 index_t overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2531 if (details::circular_less_than<index_t>(
2532 this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
2533 std::atomic_thread_fence(std::memory_order_acquire);
2535 index_t myDequeueCount = this->dequeueOptimisticCount.fetch_add(1,
2536 std::memory_order_relaxed);
2537 assert(overcommit <= myDequeueCount);
2538 tail = this->tailIndex.load(std::memory_order_acquire);
2539 if (details::likely(
2540 details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
2541 index_t index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
2544 auto entry = get_block_index_entry_for_index(index);
2547 auto block = entry->value.load(std::memory_order_relaxed);
2548 auto &el = *((*block)[index]);
2550 if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T &&, element = std::move(el))) {
2551 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX 2554 debug::DebugLock lock(producer->mutex);
2559 BlockIndexEntry *entry;
2560 ConcurrentQueue *parent;
2563 (*block)[index]->~T();
2564 if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2565 entry->value.store(
nullptr, std::memory_order_relaxed);
2566 parent->add_block_to_free_list(block);
2569 } guard = {block, index, entry, this->parent};
2571 element = std::move(el);
2573 element = std::move(el);
2576 if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2578 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX 2579 debug::DebugLock lock(mutex);
2582 entry->value.store(
nullptr, std::memory_order_relaxed);
2584 this->parent->add_block_to_free_list(block);
2590 this->dequeueOvercommit.fetch_add(1, std::memory_order_release);
2597 template<AllocationMode allocMode,
typename It>
2598 bool enqueue_bulk(It itemFirst,
size_t count) {
2608 index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2609 auto startBlock = this->tailBlock;
2610 Block *firstAllocatedBlock =
nullptr;
2611 auto endBlock = this->tailBlock;
2614 size_t blockBaseDiff =
2615 ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) -
2616 ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2617 index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2618 if (blockBaseDiff > 0) {
2619 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX 2620 debug::DebugLock lock(mutex);
2623 blockBaseDiff -=
static_cast<index_t
>(BLOCK_SIZE);
2624 currentTailIndex +=
static_cast<index_t
>(BLOCK_SIZE);
2627 BlockIndexEntry *idxEntry =
nullptr;
2629 bool indexInserted =
false;
2630 auto head = this->headIndex.load(std::memory_order_relaxed);
2631 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2632 bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) ||
2633 (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value &&
2634 (MAX_SUBQUEUE_SIZE == 0 ||
2635 MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2637 !(indexInserted = insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) ||
2638 (newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>()) ==
2642 if (indexInserted) {
2643 rewind_block_index_tail();
2644 idxEntry->value.store(
nullptr, std::memory_order_relaxed);
2646 currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2647 for (
auto block = firstAllocatedBlock; block !=
nullptr; block = block->next) {
2648 currentTailIndex +=
static_cast<index_t
>(BLOCK_SIZE);
2649 idxEntry = get_block_index_entry_for_index(currentTailIndex);
2650 idxEntry->value.store(
nullptr, std::memory_order_relaxed);
2651 rewind_block_index_tail();
2653 this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2654 this->tailBlock = startBlock;
2660 newBlock->owner =
this;
2662 newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2663 newBlock->next =
nullptr;
2666 idxEntry->value.store(newBlock, std::memory_order_relaxed);
2670 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 ||
2671 firstAllocatedBlock !=
nullptr) {
2672 assert(this->tailBlock !=
nullptr);
2673 this->tailBlock->next = newBlock;
2675 this->tailBlock = newBlock;
2676 endBlock = newBlock;
2677 firstAllocatedBlock = firstAllocatedBlock ==
nullptr ? newBlock : firstAllocatedBlock;
2678 }
while (blockBaseDiff > 0);
2682 index_t newTailIndex = startTailIndex +
static_cast<index_t
>(count);
2683 currentTailIndex = startTailIndex;
2684 this->tailBlock = startBlock;
2685 assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 ||
2686 firstAllocatedBlock !=
nullptr || count == 0);
2687 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 &&
2688 firstAllocatedBlock !=
nullptr) {
2689 this->tailBlock = firstAllocatedBlock;
2692 auto stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2693 static_cast<index_t
>(BLOCK_SIZE);
2694 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2695 stopIndex = newTailIndex;
2697 if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst),
2698 new(
nullptr) T(details::deref_noexcept(itemFirst)))) {
2699 while (currentTailIndex != stopIndex) {
2700 new((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2704 while (currentTailIndex != stopIndex) {
2705 new((*this->tailBlock)[currentTailIndex]) T(
2706 details::nomove_if<(
bool) !MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst),
2708 details::deref_noexcept(
2709 itemFirst)))>::eval(
2715 MOODYCAMEL_CATCH (...) {
2716 auto constructedStopIndex = currentTailIndex;
2717 auto lastBlockEnqueued = this->tailBlock;
2719 if (!details::is_trivially_destructible<T>::value) {
2720 auto block = startBlock;
2721 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2722 block = firstAllocatedBlock;
2724 currentTailIndex = startTailIndex;
2726 stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2727 static_cast<index_t
>(BLOCK_SIZE);
2728 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2729 stopIndex = constructedStopIndex;
2731 while (currentTailIndex != stopIndex) {
2732 (*block)[currentTailIndex++]->~T();
2734 if (block == lastBlockEnqueued) {
2737 block = block->next;
2741 currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2742 for (
auto block = firstAllocatedBlock; block !=
nullptr; block = block->next) {
2743 currentTailIndex +=
static_cast<index_t
>(BLOCK_SIZE);
2744 auto idxEntry = get_block_index_entry_for_index(currentTailIndex);
2745 idxEntry->value.store(
nullptr, std::memory_order_relaxed);
2746 rewind_block_index_tail();
2748 this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2749 this->tailBlock = startBlock;
2754 if (this->tailBlock == endBlock) {
2755 assert(currentTailIndex == newTailIndex);
2758 this->tailBlock = this->tailBlock->next;
2760 this->tailIndex.store(newTailIndex, std::memory_order_release);
2764 template<
typename It>
2765 size_t dequeue_bulk(It &itemFirst,
size_t max) {
2766 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2767 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2768 auto desiredCount =
static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(
2769 std::memory_order_relaxed) - overcommit));
2770 if (details::circular_less_than<size_t>(0, desiredCount)) {
2771 desiredCount = desiredCount < max ? desiredCount : max;
2772 std::atomic_thread_fence(std::memory_order_acquire);
2774 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount,
2775 std::memory_order_relaxed);
2776 assert(overcommit <= myDequeueCount);
2778 tail = this->tailIndex.load(std::memory_order_acquire);
2779 auto actualCount =
static_cast<size_t>(tail - (myDequeueCount - overcommit));
2780 if (details::circular_less_than<size_t>(0, actualCount)) {
2781 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2782 if (actualCount < desiredCount) {
2783 this->dequeueOvercommit.fetch_add(desiredCount - actualCount,
2784 std::memory_order_release);
2789 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2792 auto index = firstIndex;
2793 BlockIndexHeader *localBlockIndex;
2794 auto indexIndex = get_block_index_index_for_index(index, localBlockIndex);
2796 auto blockStartIndex = index;
2798 (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
static_cast<index_t
>(BLOCK_SIZE);
2799 endIndex = details::circular_less_than<index_t>(
2800 firstIndex +
static_cast<index_t
>(actualCount), endIndex) ? firstIndex +
2801 static_cast<index_t
>(actualCount)
2804 auto entry = localBlockIndex->index[indexIndex];
2805 auto block = entry->value.load(std::memory_order_relaxed);
2806 if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T &&, details::deref_noexcept(itemFirst) = std::move(
2807 (*(*block)[index])))) {
2808 while (index != endIndex) {
2809 auto &el = *((*block)[index]);
2810 *itemFirst++ = std::move(el);
2816 while (index != endIndex) {
2817 auto &el = *((*block)[index]);
2818 *itemFirst = std::move(el);
2824 MOODYCAMEL_CATCH (...) {
2826 entry = localBlockIndex->index[indexIndex];
2827 block = entry->value.load(std::memory_order_relaxed);
2828 while (index != endIndex) {
2829 (*block)[index++]->~T();
2832 if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(
2833 blockStartIndex, static_cast<size_t>(endIndex - blockStartIndex))) {
2834 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX 2835 debug::DebugLock lock(mutex);
2837 entry->value.store(
nullptr, std::memory_order_relaxed);
2838 this->parent->add_block_to_free_list(block);
2840 indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
2842 blockStartIndex = index;
2843 endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2844 static_cast<index_t
>(BLOCK_SIZE);
2845 endIndex = details::circular_less_than<index_t>(
2846 firstIndex +
static_cast<index_t
>(actualCount), endIndex) ? firstIndex +
2847 static_cast<index_t
>(actualCount)
2849 }
while (index != firstIndex + actualCount);
2854 if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(
2855 blockStartIndex, static_cast<size_t>(endIndex - blockStartIndex))) {
2857 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX 2858 debug::DebugLock lock(mutex);
2862 entry->value.store(
nullptr, std::memory_order_relaxed);
2864 this->parent->add_block_to_free_list(block);
2866 indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
2867 }
while (index != firstIndex + actualCount);
2871 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2880 static const index_t INVALID_BLOCK_BASE = 1;
2882 struct BlockIndexEntry {
2883 std::atomic<index_t> key;
2884 std::atomic<Block *> value;
2887 struct BlockIndexHeader {
2889 std::atomic<size_t> tail;
2890 BlockIndexEntry *entries;
2891 BlockIndexEntry **index;
2892 BlockIndexHeader *prev;
2895 template<AllocationMode allocMode>
2896 inline bool insert_block_index_entry(BlockIndexEntry *&idxEntry, index_t blockStartIndex) {
2897 auto localBlockIndex = blockIndex.load(
2898 std::memory_order_relaxed);
2899 if (localBlockIndex ==
nullptr) {
2902 auto newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) &
2903 (localBlockIndex->capacity - 1);
2904 idxEntry = localBlockIndex->index[newTail];
2905 if (idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE ||
2906 idxEntry->value.load(std::memory_order_relaxed) ==
nullptr) {
2908 idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
2909 localBlockIndex->tail.store(newTail, std::memory_order_release);
2914 if (allocMode == CannotAlloc || !new_block_index()) {
2917 localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2918 newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) &
2919 (localBlockIndex->capacity - 1);
2920 idxEntry = localBlockIndex->index[newTail];
2921 assert(idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE);
2922 idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
2923 localBlockIndex->tail.store(newTail, std::memory_order_release);
2927 inline void rewind_block_index_tail() {
2928 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2929 localBlockIndex->tail.store((localBlockIndex->tail.load(std::memory_order_relaxed) - 1) &
2930 (localBlockIndex->capacity - 1), std::memory_order_relaxed);
2933 inline BlockIndexEntry *get_block_index_entry_for_index(index_t index)
const {
2934 BlockIndexHeader *localBlockIndex;
2935 auto idx = get_block_index_index_for_index(index, localBlockIndex);
2936 return localBlockIndex->index[idx];
2940 get_block_index_index_for_index(index_t index, BlockIndexHeader *&localBlockIndex)
const {
2941 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX 2942 debug::DebugLock lock(mutex);
2944 index &= ~static_cast<index_t>(BLOCK_SIZE - 1);
2945 localBlockIndex = blockIndex.load(std::memory_order_acquire);
2946 auto tail = localBlockIndex->tail.load(std::memory_order_acquire);
2947 auto tailBase = localBlockIndex->index[tail]->key.load(std::memory_order_relaxed);
2948 assert(tailBase != INVALID_BLOCK_BASE);
2951 auto offset =
static_cast<size_t>(
2952 static_cast<typename std::make_signed<index_t>::type
>(index - tailBase) / BLOCK_SIZE);
2953 size_t idx = (tail + offset) & (localBlockIndex->capacity - 1);
2954 assert(localBlockIndex->index[idx]->key.load(std::memory_order_relaxed) == index &&
2955 localBlockIndex->index[idx]->value.load(std::memory_order_relaxed) !=
nullptr);
2959 bool new_block_index() {
2960 auto prev = blockIndex.load(std::memory_order_relaxed);
2961 size_t prevCapacity = prev ==
nullptr ? 0 : prev->capacity;
2962 auto entryCount = prev ==
nullptr ? nextBlockIndexCapacity : prevCapacity;
2963 auto raw =
static_cast<char *
>((Traits::malloc)(
2964 sizeof(BlockIndexHeader) +
2965 std::alignment_of<BlockIndexEntry>::value - 1 +
sizeof(BlockIndexEntry) * entryCount +
2966 std::alignment_of<BlockIndexEntry *>::value - 1 +
2967 sizeof(BlockIndexEntry * ) * nextBlockIndexCapacity));
2968 if (raw ==
nullptr) {
2972 auto header =
new(raw) BlockIndexHeader;
2973 auto entries =
reinterpret_cast<BlockIndexEntry *
>(details::align_for<BlockIndexEntry>(
2974 raw +
sizeof(BlockIndexHeader)));
2975 auto index =
reinterpret_cast<BlockIndexEntry **
>(details::align_for<BlockIndexEntry *>(
2976 reinterpret_cast<char *
>(entries) +
sizeof(BlockIndexEntry) * entryCount));
2977 if (prev !=
nullptr) {
2978 auto prevTail = prev->tail.load(std::memory_order_relaxed);
2979 auto prevPos = prevTail;
2982 prevPos = (prevPos + 1) & (prev->capacity - 1);
2983 index[i++] = prev->index[prevPos];
2984 }
while (prevPos != prevTail);
2985 assert(i == prevCapacity);
2987 for (
size_t i = 0; i != entryCount; ++i) {
2988 new(entries + i) BlockIndexEntry;
2989 entries[i].key.store(INVALID_BLOCK_BASE, std::memory_order_relaxed);
2990 index[prevCapacity + i] = entries + i;
2992 header->prev = prev;
2993 header->entries = entries;
2994 header->index = index;
2995 header->capacity = nextBlockIndexCapacity;
2996 header->tail.store((prevCapacity - 1) & (nextBlockIndexCapacity - 1),
2997 std::memory_order_relaxed);
2999 blockIndex.store(header, std::memory_order_release);
3001 nextBlockIndexCapacity <<= 1;
3007 size_t nextBlockIndexCapacity;
3008 std::atomic<BlockIndexHeader *> blockIndex;
3010 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 3012 details::ThreadExitListener threadExitListener;
3016 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG 3018 ImplicitProducer* nextImplicitProducer;
3022 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX 3023 mutable debug::DebugMutex mutex;
3026 friend struct MemStats;
3035 void populate_initial_block_list(
size_t blockCount) {
3036 initialBlockPoolSize = blockCount;
3037 if (initialBlockPoolSize == 0) {
3038 initialBlockPool =
nullptr;
3042 initialBlockPool = create_array<Block>(blockCount);
3043 if (initialBlockPool ==
nullptr) {
3044 initialBlockPoolSize = 0;
3046 for (
size_t i = 0; i < initialBlockPoolSize; ++i) {
3047 initialBlockPool[i].dynamicallyAllocated =
false;
3051 inline Block *try_get_block_from_initial_pool() {
3052 if (initialBlockPoolIndex.load(std::memory_order_relaxed) >= initialBlockPoolSize) {
3056 auto index = initialBlockPoolIndex.fetch_add(1, std::memory_order_relaxed);
3058 return index < initialBlockPoolSize ? (initialBlockPool + index) :
nullptr;
3061 inline void add_block_to_free_list(Block *block) {
3063 block->owner =
nullptr;
3065 freeList.add(block);
3068 inline void add_blocks_to_free_list(Block *block) {
3069 while (block !=
nullptr) {
3070 auto next = block->next;
3071 add_block_to_free_list(block);
3076 inline Block *try_get_block_from_free_list() {
3077 return freeList.try_get();
3081 template<AllocationMode canAlloc>
3082 Block *requisition_block() {
3083 auto block = try_get_block_from_initial_pool();
3084 if (block !=
nullptr) {
3088 block = try_get_block_from_free_list();
3089 if (block !=
nullptr) {
3093 if (canAlloc == CanAlloc) {
3094 return create<Block>();
3104 size_t allocatedBlocks;
3107 size_t ownedBlocksExplicit;
3108 size_t ownedBlocksImplicit;
3109 size_t implicitProducers;
3110 size_t explicitProducers;
3111 size_t elementsEnqueued;
3112 size_t blockClassBytes;
3113 size_t queueClassBytes;
3114 size_t implicitBlockIndexBytes;
3115 size_t explicitBlockIndexBytes;
3117 friend class ConcurrentQueue;
3120 static MemStats getFor(ConcurrentQueue* q)
3122 MemStats stats = { 0 };
3124 stats.elementsEnqueued = q->size_approx();
3126 auto block = q->freeList.head_unsafe();
3127 while (block !=
nullptr) {
3128 ++stats.allocatedBlocks;
3130 block = block->freeListNext.load(std::memory_order_relaxed);
3133 for (
auto ptr = q->producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
3134 bool implicit =
dynamic_cast<ImplicitProducer*
>(ptr) !=
nullptr;
3135 stats.implicitProducers += implicit ? 1 : 0;
3136 stats.explicitProducers += implicit ? 0 : 1;
3139 auto prod =
static_cast<ImplicitProducer*
>(ptr);
3140 stats.queueClassBytes +=
sizeof(ImplicitProducer);
3141 auto head = prod->headIndex.load(std::memory_order_relaxed);
3142 auto tail = prod->tailIndex.load(std::memory_order_relaxed);
3143 auto hash = prod->blockIndex.load(std::memory_order_relaxed);
3144 if (hash !=
nullptr) {
3145 for (
size_t i = 0; i != hash->capacity; ++i) {
3146 if (hash->index[i]->key.load(std::memory_order_relaxed) != ImplicitProducer::INVALID_BLOCK_BASE && hash->index[i]->value.load(std::memory_order_relaxed) !=
nullptr) {
3147 ++stats.allocatedBlocks;
3148 ++stats.ownedBlocksImplicit;
3151 stats.implicitBlockIndexBytes += hash->capacity *
sizeof(
typename ImplicitProducer::BlockIndexEntry);
3152 for (; hash !=
nullptr; hash = hash->prev) {
3153 stats.implicitBlockIndexBytes +=
sizeof(
typename ImplicitProducer::BlockIndexHeader) + hash->capacity *
sizeof(
typename ImplicitProducer::BlockIndexEntry*);
3156 for (; details::circular_less_than<index_t>(head, tail); head += BLOCK_SIZE) {
3162 auto prod =
static_cast<ExplicitProducer*
>(ptr);
3163 stats.queueClassBytes +=
sizeof(ExplicitProducer);
3164 auto tailBlock = prod->tailBlock;
3165 bool wasNonEmpty =
false;
3166 if (tailBlock !=
nullptr) {
3167 auto block = tailBlock;
3169 ++stats.allocatedBlocks;
3170 if (!block->ConcurrentQueue::Block::template is_empty<explicit_context>() || wasNonEmpty) {
3172 wasNonEmpty = wasNonEmpty || block != tailBlock;
3174 ++stats.ownedBlocksExplicit;
3175 block = block->next;
3176 }
while (block != tailBlock);
3178 auto index = prod->blockIndex.load(std::memory_order_relaxed);
3179 while (index !=
nullptr) {
3180 stats.explicitBlockIndexBytes +=
sizeof(
typename ExplicitProducer::BlockIndexHeader) + index->size *
sizeof(
typename ExplicitProducer::BlockIndexEntry);
3181 index =
static_cast<typename ExplicitProducer::BlockIndexHeader*
>(index->prev);
3186 auto freeOnInitialPool = q->initialBlockPoolIndex.load(std::memory_order_relaxed) >= q->initialBlockPoolSize ? 0 : q->initialBlockPoolSize - q->initialBlockPoolIndex.load(std::memory_order_relaxed);
3187 stats.allocatedBlocks += freeOnInitialPool;
3188 stats.freeBlocks += freeOnInitialPool;
3190 stats.blockClassBytes =
sizeof(Block) * stats.allocatedBlocks;
3191 stats.queueClassBytes +=
sizeof(ConcurrentQueue);
3198 MemStats getMemStats()
3200 return MemStats::getFor(
this);
3203 friend struct MemStats;
3211 ProducerBase *recycle_or_create_producer(
bool isExplicit) {
3213 return recycle_or_create_producer(isExplicit, recycled);
3216 ProducerBase *recycle_or_create_producer(
bool isExplicit,
bool &recycled) {
3217 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH 3218 debug::DebugLock lock(implicitProdMutex);
3221 for (
auto ptr = producerListTail.load(std::memory_order_acquire);
3222 ptr !=
nullptr; ptr = ptr->next_prod()) {
3223 if (ptr->inactive.load(std::memory_order_relaxed) && ptr->isExplicit == isExplicit) {
3224 bool expected =
true;
3225 if (ptr->inactive.compare_exchange_strong(expected,
false,
3226 std::memory_order_acquire,
3227 std::memory_order_relaxed)) {
3236 return add_producer(isExplicit ? static_cast<ProducerBase *>(create<ExplicitProducer>(
this))
3237 : create<ImplicitProducer>(
this));
3240 ProducerBase *add_producer(ProducerBase *producer) {
3242 if (producer ==
nullptr) {
3246 producerCount.fetch_add(1, std::memory_order_relaxed);
3249 auto prevTail = producerListTail.load(std::memory_order_relaxed);
3251 producer->next = prevTail;
3252 }
while (!producerListTail.compare_exchange_weak(prevTail, producer, std::memory_order_release,
3253 std::memory_order_relaxed));
3255 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG 3256 if (producer->isExplicit) {
3257 auto prevTailExplicit = explicitProducers.load(std::memory_order_relaxed);
3259 static_cast<ExplicitProducer*
>(producer)->nextExplicitProducer = prevTailExplicit;
3260 }
while (!explicitProducers.compare_exchange_weak(prevTailExplicit, static_cast<ExplicitProducer*>(producer), std::memory_order_release, std::memory_order_relaxed));
3263 auto prevTailImplicit = implicitProducers.load(std::memory_order_relaxed);
3265 static_cast<ImplicitProducer*
>(producer)->nextImplicitProducer = prevTailImplicit;
3266 }
while (!implicitProducers.compare_exchange_weak(prevTailImplicit, static_cast<ImplicitProducer*>(producer), std::memory_order_release, std::memory_order_relaxed));
3273 void reown_producers() {
3277 for (
auto ptr = producerListTail.load(std::memory_order_relaxed);
3278 ptr !=
nullptr; ptr = ptr->next_prod()) {
3288 struct ImplicitProducerKVP {
3289 std::atomic<details::thread_id_t> key;
3290 ImplicitProducer *value;
3292 ImplicitProducerKVP()
3295 ImplicitProducerKVP(ImplicitProducerKVP &&other) MOODYCAMEL_NOEXCEPT {
3296 key.store(other.key.load(std::memory_order_relaxed), std::memory_order_relaxed);
3297 value = other.value;
3300 inline ImplicitProducerKVP &operator=(ImplicitProducerKVP &&other) MOODYCAMEL_NOEXCEPT {
3305 inline void swap(ImplicitProducerKVP &other) MOODYCAMEL_NOEXCEPT {
3306 if (
this != &other) {
3307 details::swap_relaxed(key, other.key);
3308 std::swap(value, other.value);
3313 template<
typename XT,
typename XTraits>
3314 friend void moodycamel::swap(
typename ConcurrentQueue<XT, XTraits>::ImplicitProducerKVP &,
3315 typename ConcurrentQueue<XT, XTraits>::ImplicitProducerKVP &) MOODYCAMEL_NOEXCEPT;
3317 struct ImplicitProducerHash {
3319 ImplicitProducerKVP *entries;
3320 ImplicitProducerHash *prev;
3323 inline void populate_initial_implicit_producer_hash() {
3324 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return;
3326 implicitProducerHashCount.store(0, std::memory_order_relaxed);
3327 auto hash = &initialImplicitProducerHash;
3328 hash->capacity = INITIAL_IMPLICIT_PRODUCER_HASH_SIZE;
3329 hash->entries = &initialImplicitProducerHashEntries[0];
3330 for (
size_t i = 0; i != INITIAL_IMPLICIT_PRODUCER_HASH_SIZE; ++i) {
3331 initialImplicitProducerHashEntries[i].key.store(details::invalid_thread_id,
3332 std::memory_order_relaxed);
3334 hash->prev =
nullptr;
3335 implicitProducerHash.store(hash, std::memory_order_relaxed);
3338 void swap_implicit_producer_hashes(ConcurrentQueue &other) {
3339 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return;
3342 initialImplicitProducerHashEntries.swap(other.initialImplicitProducerHashEntries);
3343 initialImplicitProducerHash.entries = &initialImplicitProducerHashEntries[0];
3344 other.initialImplicitProducerHash.entries = &other.initialImplicitProducerHashEntries[0];
3346 details::swap_relaxed(implicitProducerHashCount, other.implicitProducerHashCount);
3348 details::swap_relaxed(implicitProducerHash, other.implicitProducerHash);
3349 if (implicitProducerHash.load(std::memory_order_relaxed) ==
3350 &other.initialImplicitProducerHash) {
3351 implicitProducerHash.store(&initialImplicitProducerHash, std::memory_order_relaxed);
3353 ImplicitProducerHash *hash;
3354 for (hash = implicitProducerHash.load(std::memory_order_relaxed);
3355 hash->prev != &other.initialImplicitProducerHash; hash = hash->prev) {
3358 hash->prev = &initialImplicitProducerHash;
3360 if (other.implicitProducerHash.load(std::memory_order_relaxed) ==
3361 &initialImplicitProducerHash) {
3362 other.implicitProducerHash.store(&other.initialImplicitProducerHash,
3363 std::memory_order_relaxed);
3365 ImplicitProducerHash *hash;
3366 for (hash = other.implicitProducerHash.load(std::memory_order_relaxed);
3367 hash->prev != &initialImplicitProducerHash; hash = hash->prev) {
3370 hash->prev = &other.initialImplicitProducerHash;
3375 ImplicitProducer *get_or_add_implicit_producer() {
3386 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH 3387 debug::DebugLock lock(implicitProdMutex);
3390 auto id = details::thread_id();
3391 auto hashedId = details::hash_thread_id(
id);
3393 auto mainHash = implicitProducerHash.load(std::memory_order_acquire);
3394 for (
auto hash = mainHash; hash !=
nullptr; hash = hash->prev) {
3396 auto index = hashedId;
3398 index &= hash->capacity - 1;
3400 auto probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3401 if (probedKey ==
id) {
3407 auto value = hash->entries[index].value;
3408 if (hash != mainHash) {
3411 index &= mainHash->capacity - 1;
3412 probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
3413 auto empty = details::invalid_thread_id;
3414 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 3415 auto reusable = details::invalid_thread_id2;
3416 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty,
id, std::memory_order_relaxed, std::memory_order_relaxed)) ||
3417 (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable,
id, std::memory_order_acquire, std::memory_order_acquire))) {
3419 if ((probedKey == empty &&
3420 mainHash->entries[index].key.compare_exchange_strong(empty,
id,
3421 std::memory_order_relaxed,
3422 std::memory_order_relaxed))) {
3424 mainHash->entries[index].value = value;
3433 if (probedKey == details::invalid_thread_id) {
3441 auto newCount = 1 + implicitProducerHashCount.fetch_add(1, std::memory_order_relaxed);
3443 if (newCount >= (mainHash->capacity >> 1) &&
3444 !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire)) {
3449 mainHash = implicitProducerHash.load(std::memory_order_acquire);
3450 if (newCount >= (mainHash->capacity >> 1)) {
3451 auto newCapacity = mainHash->capacity << 1;
3452 while (newCount >= (newCapacity >> 1)) {
3455 auto raw =
static_cast<char *
>((Traits::malloc)(
3456 sizeof(ImplicitProducerHash) + std::alignment_of<ImplicitProducerKVP>::value - 1 +
3457 sizeof(ImplicitProducerKVP) * newCapacity));
3458 if (raw ==
nullptr) {
3460 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3461 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
3465 auto newHash =
new(raw) ImplicitProducerHash;
3466 newHash->capacity = newCapacity;
3467 newHash->entries =
reinterpret_cast<ImplicitProducerKVP *
>(details::align_for<ImplicitProducerKVP>(
3468 raw +
sizeof(ImplicitProducerHash)));
3469 for (
size_t i = 0; i != newCapacity; ++i) {
3470 new(newHash->entries + i) ImplicitProducerKVP;
3471 newHash->entries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
3473 newHash->prev = mainHash;
3474 implicitProducerHash.store(newHash, std::memory_order_release);
3475 implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3478 implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3485 if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)) {
3487 auto producer =
static_cast<ImplicitProducer *
>(recycle_or_create_producer(
false,
3489 if (producer ==
nullptr) {
3490 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3494 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3497 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 3498 producer->threadExitListener.callback = &ConcurrentQueue::implicit_producer_thread_exited_callback;
3499 producer->threadExitListener.userData = producer;
3500 details::ThreadExitNotifier::subscribe(&producer->threadExitListener);
3503 auto index = hashedId;
3505 index &= mainHash->capacity - 1;
3506 auto probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
3508 auto empty = details::invalid_thread_id;
3509 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 3510 auto reusable = details::invalid_thread_id2;
3511 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty,
id, std::memory_order_relaxed, std::memory_order_relaxed)) ||
3512 (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable,
id, std::memory_order_acquire, std::memory_order_acquire))) {
3514 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty,
id,
3515 std::memory_order_relaxed,
3516 std::memory_order_relaxed))) {
3518 mainHash->entries[index].value = producer;
3529 mainHash = implicitProducerHash.load(std::memory_order_acquire);
3533 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 3534 void implicit_producer_thread_exited(ImplicitProducer* producer)
3537 details::ThreadExitNotifier::unsubscribe(&producer->threadExitListener);
3540 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH 3541 debug::DebugLock lock(implicitProdMutex);
3543 auto hash = implicitProducerHash.load(std::memory_order_acquire);
3544 assert(hash !=
nullptr);
3545 auto id = details::thread_id();
3546 auto hashedId = details::hash_thread_id(
id);
3547 details::thread_id_t probedKey;
3551 for (; hash !=
nullptr; hash = hash->prev) {
3552 auto index = hashedId;
3554 index &= hash->capacity - 1;
3555 probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3556 if (probedKey ==
id) {
3557 hash->entries[index].key.store(details::invalid_thread_id2, std::memory_order_release);
3561 }
while (probedKey != details::invalid_thread_id);
3565 producer->inactive.store(
true, std::memory_order_release);
3568 static void implicit_producer_thread_exited_callback(
void* userData)
3570 auto producer =
static_cast<ImplicitProducer*
>(userData);
3571 auto queue = producer->parent;
3572 queue->implicit_producer_thread_exited(producer);
3580 template<
typename U>
3581 static inline U *create_array(
size_t count) {
3583 auto p =
static_cast<U *
>((Traits::malloc)(
sizeof(U) * count));
3588 for (
size_t i = 0; i != count; ++i) {
3594 template<
typename U>
3595 static inline void destroy_array(U *p,
size_t count) {
3598 for (
size_t i = count; i != 0;) {
3605 template<
typename U>
3606 static inline U *create() {
3607 auto p = (Traits::malloc)(
sizeof(U));
3608 return p !=
nullptr ?
new(p) U :
nullptr;
3611 template<
typename U,
typename A1>
3612 static inline U *create(A1 &&a1) {
3613 auto p = (Traits::malloc)(
sizeof(U));
3614 return p !=
nullptr ?
new(p) U(std::forward<A1>(a1)) :
nullptr;
3617 template<
typename U>
3618 static inline void destroy(U *p) {
3626 std::atomic<ProducerBase *> producerListTail;
3627 std::atomic<std::uint32_t> producerCount;
3629 std::atomic<size_t> initialBlockPoolIndex;
3630 Block *initialBlockPool;
3631 size_t initialBlockPoolSize;
3633 #if !MCDBGQ_USEDEBUGFREELIST 3634 FreeList<Block> freeList;
3636 debug::DebugFreeList<Block> freeList;
3639 std::atomic<ImplicitProducerHash *> implicitProducerHash;
3640 std::atomic<size_t> implicitProducerHashCount;
3641 ImplicitProducerHash initialImplicitProducerHash;
3642 std::array<ImplicitProducerKVP, INITIAL_IMPLICIT_PRODUCER_HASH_SIZE> initialImplicitProducerHashEntries;
3643 std::atomic_flag implicitProducerHashResizeInProgress;
3645 std::atomic<std::uint32_t> nextExplicitConsumerId;
3646 std::atomic<std::uint32_t> globalExplicitConsumerOffset;
3648 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH 3649 debug::DebugMutex implicitProdMutex;
3652 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG 3653 std::atomic<ExplicitProducer*> explicitProducers;
3654 std::atomic<ImplicitProducer*> implicitProducers;
3659 template<
typename T,
typename Traits>
3660 ProducerToken::ProducerToken(ConcurrentQueue<T, Traits> &queue)
3661 : producer(queue.recycle_or_create_producer(true)) {
3662 if (producer !=
nullptr) {
3663 producer->token =
this;
3667 template<
typename T,
typename Traits>
3668 ProducerToken::ProducerToken(BlockingConcurrentQueue<T, Traits> &queue)
3670 reinterpret_cast<ConcurrentQueue<T, Traits> *>(&queue)->recycle_or_create_producer(true)) {
3671 if (producer !=
nullptr) {
3672 producer->token =
this;
3676 template<
typename T,
typename Traits>
3677 ConsumerToken::ConsumerToken(ConcurrentQueue<T, Traits> &queue)
3678 : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr) {
3679 initialOffset = queue.nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
3680 lastKnownGlobalOffset = -1;
3683 template<
typename T,
typename Traits>
3684 ConsumerToken::ConsumerToken(BlockingConcurrentQueue<T, Traits> &queue)
3685 : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr) {
3686 initialOffset =
reinterpret_cast<ConcurrentQueue <T, Traits> *
>(&queue)->nextExplicitConsumerId.fetch_add(
3687 1, std::memory_order_release);
3688 lastKnownGlobalOffset = -1;
3691 template<
typename T,
typename Traits>
3692 inline void swap(ConcurrentQueue<T, Traits> &a, ConcurrentQueue<T, Traits> &b) MOODYCAMEL_NOEXCEPT {
3696 inline void swap(ProducerToken &a, ProducerToken &b) MOODYCAMEL_NOEXCEPT {
3700 inline void swap(ConsumerToken &a, ConsumerToken &b) MOODYCAMEL_NOEXCEPT {
3704 template<
typename T,
typename Traits>
3705 inline void swap(
typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP &a,
3706 typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP &b) MOODYCAMEL_NOEXCEPT {
3714 #if defined(__GNUC__) 3715 #pragma GCC diagnostic pop 3718 #endif // DMLC_CONCURRENTQUEUE_H_ 3719 Definition: optional.h:241
unsigned index_t
this defines the unsigned integer type that can normally be used to store feature index ...
Definition: data.h:32
namespace for dmlc
Definition: array_view.h:12