3#include <condition_variable>
29 bool aborted_ =
false;
35 mutable std::mutex mtx_;
38 std::condition_variable not_empty_cv_;
41 std::condition_variable not_full_cv_;
71 std::unique_lock<std::mutex> lock(mtx_);
73 not_empty_cv_.notify_all();
74 not_full_cv_.notify_all();
90 std::lock_guard<std::mutex> lock(mtx_);
113 std::unique_lock<std::mutex> lock(mtx_);
117 lock, [
this] {
return queue_.size() < capacity_ || aborted_; });
124 queue_.push(std::move(
value));
125 not_empty_cv_.notify_one();
149 std::lock_guard<std::mutex> lock(mtx_);
155 if (capacity_ > 0 && queue_.size() >= capacity_) {
159 queue_.push(std::move(
value));
160 not_empty_cv_.notify_one();
179 std::unique_lock<std::mutex> lock(mtx_);
180 not_empty_cv_.wait(lock, [
this] {
return !queue_.empty() || aborted_; });
200 std::lock_guard<std::mutex> lock(mtx_);
201 not_empty_cv_.notify_all();
202 not_full_cv_.notify_all();
223 std::unique_lock<std::mutex> lock(mtx_);
225 not_empty_cv_.wait(lock, [
this] {
return !queue_.empty() || aborted_; });
227 if (queue_.empty()) {
231 T
value = std::move(queue_.front());
233 not_full_cv_.notify_one();
254 std::lock_guard<std::mutex> lock(mtx_);
256 if (queue_.empty()) {
260 T
value = std::move(queue_.front());
262 not_full_cv_.notify_one();
277 std::lock_guard<std::mutex> lock(mtx_);
278 return queue_.empty();
292 std::lock_guard<std::mutex> lock(mtx_);
293 return queue_.size();
370 template <
typename... Args>
372 std::unique_lock<std::mutex> lock(mtx_);
376 lock, [
this] {
return queue_.size() < capacity_ || aborted_; });
383 queue_.emplace(std::forward<Args>(args)...);
384 not_empty_cv_.notify_one();
409 template <
typename... Args>
411 std::lock_guard<std::mutex> lock(mtx_);
417 if (capacity_ > 0 && queue_.size() >= capacity_) {
421 queue_.emplace(std::forward<Args>(args)...);
422 not_empty_cv_.notify_one();
469 out = std::move(*opt);
510 std::lock_guard<std::mutex> lock(mtx_);
512 if (queue_.empty()) {
516 out = queue_.front();
537 std::optional<T>
peek()
const {
538 std::lock_guard<std::mutex> lock(mtx_);
540 if (queue_.empty()) {
544 return queue_.front();
565 template <
typename Predicate>
567 std::lock_guard<std::mutex> lock(mtx_);
568 std::queue<T> newQueue;
570 while (!queue_.empty()) {
571 T& item = queue_.front();
573 newQueue.push(std::move(item));
578 queue_ = std::move(newQueue);
ThreadSafeQueue(size_t capacity=0)
Construct a queue with an optional capacity limit.
Definition thread_safe_queue.h:45
void removeIf(Predicate pred)
Remove all elements that satisfy a given predicate.
Definition thread_safe_queue.h:566
void notify_all()
Wake all threads currently waiting on the queue.
Definition thread_safe_queue.h:199
bool emplace(Args &&... args)
Construct and insert an element into the queue in place.
Definition thread_safe_queue.h:371
bool peek(T &out) const
Peek at the front element without removing it.
Definition thread_safe_queue.h:509
bool push(T value)
Push an element into the queue, blocking if necessary.
Definition thread_safe_queue.h:112
ThreadSafeQueue & operator=(const ThreadSafeQueue &)=delete
bool try_enqueue(T value)
Attempt to enqueue an element into the queue without blocking.
Definition thread_safe_queue.h:346
bool try_push(T value)
Attempt to push an element into the queue without blocking.
Definition thread_safe_queue.h:148
void wait_for_data()
Block until the queue contains at least one element or is aborted.
Definition thread_safe_queue.h:178
bool try_emplace(Args &&... args)
Attempt to construct and insert an element into the queue without blocking.
Definition thread_safe_queue.h:410
bool empty() const
Check whether the queue is empty.
Definition thread_safe_queue.h:276
std::optional< T > wait_and_pop()
Wait for and remove the front element from the queue.
Definition thread_safe_queue.h:222
ThreadSafeQueue(const ThreadSafeQueue &)=delete
std::optional< T > wait_dequeue()
Wait for and dequeue an element from the queue.
Definition thread_safe_queue.h:444
std::optional< T > peek() const
Peek at the front element without removing it.
Definition thread_safe_queue.h:537
std::optional< T > try_pop()
Attempt to remove and return the front element without blocking.
Definition thread_safe_queue.h:253
bool try_dequeue(T &out)
Attempt to dequeue an element without blocking.
Definition thread_safe_queue.h:464
bool aborted() const
Check whether the queue has been aborted.
Definition thread_safe_queue.h:89
size_t size() const
Get the current number of elements in the queue.
Definition thread_safe_queue.h:291
void abort()
Abort the queue and unblock all waiting threads.
Definition thread_safe_queue.h:70
size_t capacity() const
Get the maximum number of elements the queue can hold.
Definition thread_safe_queue.h:305
std::optional< T > try_dequeue()
Attempt to dequeue an element without blocking.
Definition thread_safe_queue.h:488
void enqueue(T value)
Enqueue an element into the queue (blocking if necessary).
Definition thread_safe_queue.h:327
uint8_t * value
Definition co_dictionary.h:9
Definition thread_safe_queue.h:8