7 #ifndef DMLC_CONCURRENCY_H_ 8 #define DMLC_CONCURRENCY_H_ 17 #include <condition_variable> 32 #if defined(__clang__) 33 #pragma clang diagnostic push 34 #pragma clang diagnostic ignored "-Wbraced-scalar-init" 35 #endif // defined(__clang__) 38 #if defined(__clang__) 39 #pragma clang diagnostic pop 40 #endif // defined(__clang__) 46 inline void lock() noexcept(
true);
50 inline void unlock() noexcept(
true);
53 std::atomic_flag lock_;
88 void Push(E&& e,
int priority = 0);
101 template <
typename E>
102 void PushFront(E&& e,
int priority = 0);
117 void SignalForKill();
128 inline bool operator<(
const Entry &b)
const {
129 return priority < b.priority;
134 std::condition_variable cv_;
135 std::atomic<bool> exit_now_;
138 std::vector<Entry> priority_queue_;
140 std::deque<T> fifo_queue_;
148 while (lock_.test_and_set(std::memory_order_acquire)) {
153 lock_.clear(std::memory_order_release);
156 template <
typename T, ConcurrentQueueType type>
158 : exit_now_{
false}, nwait_consumer_{0} {}
160 template <
typename T, ConcurrentQueueType type>
161 template <
typename E>
163 static_assert(std::is_same<
typename std::remove_cv<
164 typename std::remove_reference<E>::type>::type,
166 "Types must match.");
169 std::lock_guard<std::mutex> lock{mutex_};
171 fifo_queue_.emplace_back(std::forward<E>(e));
172 notify = nwait_consumer_ != 0;
175 entry.data = std::move(e);
176 entry.priority = priority;
177 priority_queue_.push_back(std::move(entry));
178 std::push_heap(priority_queue_.begin(), priority_queue_.end());
179 notify = nwait_consumer_ != 0;
182 if (notify) cv_.notify_one();
185 template <
typename T, ConcurrentQueueType type>
186 template <
typename E>
188 static_assert(std::is_same<
typename std::remove_cv<
189 typename std::remove_reference<E>::type>::type,
191 "Types must match.");
194 std::lock_guard<std::mutex> lock{mutex_};
196 fifo_queue_.emplace_front(std::forward<E>(e));
197 notify = nwait_consumer_ != 0;
200 entry.data = std::move(e);
201 entry.priority = priority;
202 priority_queue_.push_back(std::move(entry));
203 std::push_heap(priority_queue_.begin(), priority_queue_.end());
204 notify = nwait_consumer_ != 0;
207 if (notify) cv_.notify_one();
210 template <
typename T, ConcurrentQueueType type>
212 std::unique_lock<std::mutex> lock{mutex_};
215 cv_.wait(lock, [
this] {
216 return !fifo_queue_.empty() || exit_now_.load();
219 if (!exit_now_.load()) {
220 *rv = std::move(fifo_queue_.front());
221 fifo_queue_.pop_front();
228 cv_.wait(lock, [
this] {
229 return !priority_queue_.empty() || exit_now_.load();
232 if (!exit_now_.load()) {
233 std::pop_heap(priority_queue_.begin(), priority_queue_.end());
234 *rv = std::move(priority_queue_.back().data);
235 priority_queue_.pop_back();
243 template <
typename T, ConcurrentQueueType type>
246 std::lock_guard<std::mutex> lock{mutex_};
247 exit_now_.store(
true);
252 template <
typename T, ConcurrentQueueType type>
254 std::lock_guard<std::mutex> lock{mutex_};
256 return fifo_queue_.size();
258 return priority_queue_.size();
262 #endif // DMLC_USE_CXX11 263 #endif // DMLC_CONCURRENCY_H_ void lock() noexcept(true)
Acquire lock.
Definition: concurrency.h:147
bool Pop(T *rv)
Pop element from the queue.
Definition: concurrency.h:211
ConcurrentBlockingQueue()
Definition: concurrency.h:157
size_t Size()
Get the size of the queue.
Definition: concurrency.h:253
ConcurrentQueueType
type of concurrent queue
Definition: concurrency.h:61
void unlock() noexcept(true)
Release lock.
Definition: concurrency.h:152
void Push(E &&e, int priority=0)
Push element to the end of the queue.
Definition: concurrency.h:162
void SignalForKill()
Signal the queue for destruction.
Definition: concurrency.h:244
namespace for dmlc
Definition: array_view.h:12
#define DISALLOW_COPY_AND_ASSIGN(T)
Disable copy constructor and assignment operator.
Definition: base.h:174
void PushFront(E &&e, int priority=0)
Push element to the front of the queue. Only works for FIFO queue. For priority queue it is the same ...
Definition: concurrency.h:187
Spinlock()
Definition: concurrency.h:36
Simple userspace spinlock implementation.
Definition: concurrency.h:25
Cocurrent blocking queue.
Definition: concurrency.h:73