Motion Master
Loading...
Searching...
No Matches
thread_safe_queue.h
Go to the documentation of this file.
1#pragma once
2
3#include <condition_variable>
4#include <mutex>
5#include <optional>
6#include <queue>
7
9
22template <typename T>
24 private:
26 std::queue<T> queue_;
27
29 bool aborted_ = false;
30
32 size_t capacity_;
33
35 mutable std::mutex mtx_;
36
38 std::condition_variable not_empty_cv_;
39
41 std::condition_variable not_full_cv_;
42
43 public:
45 explicit ThreadSafeQueue(size_t capacity = 0) : capacity_(capacity) {}
46
49
70 void abort() {
71 std::unique_lock<std::mutex> lock(mtx_);
72 aborted_ = true;
73 not_empty_cv_.notify_all(); // Force wake-up
74 not_full_cv_.notify_all();
75 }
76
89 bool aborted() const {
90 std::lock_guard<std::mutex> lock(mtx_);
91 return aborted_;
92 }
93
112 bool push(T value) {
113 std::unique_lock<std::mutex> lock(mtx_);
114
115 if (capacity_ > 0) {
116 not_full_cv_.wait(
117 lock, [this] { return queue_.size() < capacity_ || aborted_; });
118 }
119
120 if (aborted_) {
121 return false; // Do not push if aborted
122 }
123
124 queue_.push(std::move(value));
125 not_empty_cv_.notify_one();
126 return true;
127 }
128
148 bool try_push(T value) {
149 std::lock_guard<std::mutex> lock(mtx_);
150
151 if (aborted_) {
152 return false; // Do not push if aborted
153 }
154
155 if (capacity_ > 0 && queue_.size() >= capacity_) {
156 return false;
157 }
158
159 queue_.push(std::move(value));
160 not_empty_cv_.notify_one();
161 return true;
162 }
163
179 std::unique_lock<std::mutex> lock(mtx_);
180 not_empty_cv_.wait(lock, [this] { return !queue_.empty() || aborted_; });
181 }
182
199 void notify_all() {
200 std::lock_guard<std::mutex> lock(mtx_);
201 not_empty_cv_.notify_all();
202 not_full_cv_.notify_all();
203 }
204
222 std::optional<T> wait_and_pop() {
223 std::unique_lock<std::mutex> lock(mtx_);
224
225 not_empty_cv_.wait(lock, [this] { return !queue_.empty() || aborted_; });
226
227 if (queue_.empty()) {
228 return std::nullopt;
229 }
230
231 T value = std::move(queue_.front());
232 queue_.pop();
233 not_full_cv_.notify_one();
234 return value;
235 }
236
253 std::optional<T> try_pop() {
254 std::lock_guard<std::mutex> lock(mtx_);
255
256 if (queue_.empty()) {
257 return std::nullopt;
258 }
259
260 T value = std::move(queue_.front());
261 queue_.pop();
262 not_full_cv_.notify_one();
263 return value;
264 }
265
276 bool empty() const {
277 std::lock_guard<std::mutex> lock(mtx_);
278 return queue_.empty();
279 }
280
291 size_t size() const {
292 std::lock_guard<std::mutex> lock(mtx_);
293 return queue_.size();
294 }
295
305 size_t capacity() const { return capacity_; }
306
307 // -------- Moodycamel-style API --------
308 // Provides methods (enqueue, try_enqueue, emplace, try_emplace, wait_dequeue,
309 // try_dequeue, peek) that match moodycamel::ReaderWriterQueue for easy
310 // replacement.
311
327 void enqueue(T value) { push(std::move(value)); }
328
346 bool try_enqueue(T value) { return try_push(std::move(value)); }
347
370 template <typename... Args>
371 bool emplace(Args&&... args) {
372 std::unique_lock<std::mutex> lock(mtx_);
373
374 if (capacity_ > 0) {
375 not_full_cv_.wait(
376 lock, [this] { return queue_.size() < capacity_ || aborted_; });
377 }
378
379 if (aborted_) {
380 return false;
381 }
382
383 queue_.emplace(std::forward<Args>(args)...);
384 not_empty_cv_.notify_one();
385 return true;
386 }
387
409 template <typename... Args>
410 bool try_emplace(Args&&... args) {
411 std::lock_guard<std::mutex> lock(mtx_);
412
413 if (aborted_) {
414 return false; // Do not push if aborted
415 }
416
417 if (capacity_ > 0 && queue_.size() >= capacity_) {
418 return false;
419 }
420
421 queue_.emplace(std::forward<Args>(args)...);
422 not_empty_cv_.notify_one();
423 return true;
424 }
425
444 std::optional<T> wait_dequeue() { return wait_and_pop(); }
445
464 bool try_dequeue(T& out) {
465 auto opt = try_pop();
466 if (!opt) {
467 return false;
468 }
469 out = std::move(*opt);
470 return true;
471 }
472
488 std::optional<T> try_dequeue() { return try_pop(); }
489
509 bool peek(T& out) const {
510 std::lock_guard<std::mutex> lock(mtx_);
511
512 if (queue_.empty()) {
513 return false;
514 }
515
516 out = queue_.front(); // copy or move depending on T
517 return true;
518 }
519
537 std::optional<T> peek() const {
538 std::lock_guard<std::mutex> lock(mtx_);
539
540 if (queue_.empty()) {
541 return std::nullopt;
542 }
543
544 return queue_.front(); // copy
545 }
546
565 template <typename Predicate>
566 void removeIf(Predicate pred) {
567 std::lock_guard<std::mutex> lock(mtx_);
568 std::queue<T> newQueue;
569
570 while (!queue_.empty()) {
571 T& item = queue_.front();
572 if (!pred(item)) {
573 newQueue.push(std::move(item));
574 }
575 queue_.pop();
576 }
577
578 queue_ = std::move(newQueue);
579 }
580};
581
582} // namespace mm::core::containers
ThreadSafeQueue(size_t capacity=0)
Construct a queue with an optional capacity limit.
Definition thread_safe_queue.h:45
void removeIf(Predicate pred)
Remove all elements that satisfy a given predicate.
Definition thread_safe_queue.h:566
void notify_all()
Wake all threads currently waiting on the queue.
Definition thread_safe_queue.h:199
bool emplace(Args &&... args)
Construct and insert an element into the queue in place.
Definition thread_safe_queue.h:371
bool peek(T &out) const
Peek at the front element without removing it.
Definition thread_safe_queue.h:509
bool push(T value)
Push an element into the queue, blocking if necessary.
Definition thread_safe_queue.h:112
ThreadSafeQueue & operator=(const ThreadSafeQueue &)=delete
bool try_enqueue(T value)
Attempt to enqueue an element into the queue without blocking.
Definition thread_safe_queue.h:346
bool try_push(T value)
Attempt to push an element into the queue without blocking.
Definition thread_safe_queue.h:148
void wait_for_data()
Block until the queue contains at least one element or is aborted.
Definition thread_safe_queue.h:178
bool try_emplace(Args &&... args)
Attempt to construct and insert an element into the queue without blocking.
Definition thread_safe_queue.h:410
bool empty() const
Check whether the queue is empty.
Definition thread_safe_queue.h:276
std::optional< T > wait_and_pop()
Wait for and remove the front element from the queue.
Definition thread_safe_queue.h:222
ThreadSafeQueue(const ThreadSafeQueue &)=delete
std::optional< T > wait_dequeue()
Wait for and dequeue an element from the queue.
Definition thread_safe_queue.h:444
std::optional< T > peek() const
Peek at the front element without removing it.
Definition thread_safe_queue.h:537
std::optional< T > try_pop()
Attempt to remove and return the front element without blocking.
Definition thread_safe_queue.h:253
bool try_dequeue(T &out)
Attempt to dequeue an element without blocking.
Definition thread_safe_queue.h:464
bool aborted() const
Check whether the queue has been aborted.
Definition thread_safe_queue.h:89
size_t size() const
Get the current number of elements in the queue.
Definition thread_safe_queue.h:291
void abort()
Abort the queue and unblock all waiting threads.
Definition thread_safe_queue.h:70
size_t capacity() const
Get the maximum number of elements the queue can hold.
Definition thread_safe_queue.h:305
std::optional< T > try_dequeue()
Attempt to dequeue an element without blocking.
Definition thread_safe_queue.h:488
void enqueue(T value)
Enqueue an element into the queue (blocking if necessary).
Definition thread_safe_queue.h:327
uint8_t * value
Definition co_dictionary.h:9
Definition thread_safe_queue.h:8