Motion Master
Loading...
Searching...
No Matches
thread_safe_queue.h
Go to the documentation of this file.
1#pragma once
2
3#include <condition_variable>
4#include <mutex>
5#include <optional>
6#include <queue>
7
9
22template <typename T>
24 private:
26 std::queue<T> queue_;
27
29 size_t capacity_;
30
32 mutable std::mutex mtx_;
33
35 std::condition_variable not_empty_cv_;
36
39 std::condition_variable not_full_cv_;
40
41 public:
43 explicit ThreadSafeQueue(size_t capacity = 0) : capacity_(capacity) {}
44
47
57 void push(T value) {
58 std::unique_lock<std::mutex> lock(mtx_);
59 if (capacity_ > 0) {
60 not_full_cv_.wait(lock, [this] { return queue_.size() < capacity_; });
61 }
62 queue_.push(std::move(value));
63 not_empty_cv_.notify_one();
64 }
65
77 bool try_push(T value) {
78 std::lock_guard<std::mutex> lock(mtx_);
79 if (capacity_ > 0 && queue_.size() >= capacity_) return false;
80 queue_.push(std::move(value));
81 not_empty_cv_.notify_one();
82 return true;
83 }
84
95 std::unique_lock<std::mutex> lock(mtx_);
96 not_empty_cv_.wait(lock, [this] { return !queue_.empty(); });
97 T value = std::move(queue_.front());
98 queue_.pop();
99 not_full_cv_.notify_one();
100 return value;
101 }
102
113 std::optional<T> try_pop() {
114 std::lock_guard<std::mutex> lock(mtx_);
115 if (queue_.empty()) return std::nullopt;
116 T value = std::move(queue_.front());
117 queue_.pop();
118 not_full_cv_.notify_one();
119 return value;
120 }
121
129 bool empty() const {
130 std::lock_guard<std::mutex> lock(mtx_);
131 return queue_.empty();
132 }
133
141 size_t size() const {
142 std::lock_guard<std::mutex> lock(mtx_);
143 return queue_.size();
144 }
145
152 size_t capacity() const { return capacity_; }
153
154 // -------- Moodycamel-style API --------
155 // Provides methods (enqueue, try_enqueue, emplace, try_emplace, wait_dequeue,
156 // try_dequeue, peek) that match moodycamel::ReaderWriterQueue for easy
157 // replacement.
158
168 void enqueue(T value) { push(std::move(value)); }
169
181 bool try_enqueue(T value) { return try_push(std::move(value)); }
182
194 template <typename... Args>
195 void emplace(Args&&... args) {
196 std::unique_lock<std::mutex> lock(mtx_);
197 if (capacity_ > 0) {
198 not_full_cv_.wait(lock, [this] { return queue_.size() < capacity_; });
199 }
200 queue_.emplace(std::forward<Args>(args)...);
201 not_empty_cv_.notify_one();
202 }
203
216 template <typename... Args>
217 bool try_emplace(Args&&... args) {
218 std::lock_guard<std::mutex> lock(mtx_);
219 if (capacity_ > 0 && queue_.size() >= capacity_) {
220 return false;
221 }
222 queue_.emplace(std::forward<Args>(args)...);
223 not_empty_cv_.notify_one();
224 return true;
225 }
226
236 T wait_dequeue() { return wait_and_pop(); }
237
249 bool try_dequeue(T& out) {
250 auto opt = try_pop();
251 if (!opt) {
252 return false;
253 }
254 out = std::move(*opt);
255 return true;
256 }
257
267 std::optional<T> try_dequeue() { return try_pop(); }
268
278 bool peek(T& out) const {
279 std::lock_guard<std::mutex> lock(mtx_);
280 if (queue_.empty()) {
281 return false;
282 }
283 out = queue_.front(); // copy or move depending on T
284 return true;
285 }
286
297 std::optional<T> peek() const {
298 std::lock_guard<std::mutex> lock(mtx_);
299 if (queue_.empty()) {
300 return std::nullopt;
301 }
302 return queue_.front(); // copy
303 }
304};
305
306} // namespace mm::core::containers
ThreadSafeQueue(size_t capacity=0)
Construct a queue with optional capacity.
Definition thread_safe_queue.h:43
T wait_dequeue()
Wait for and dequeue a value from the queue.
Definition thread_safe_queue.h:236
bool peek(T &out) const
Peek at the front item without removing it.
Definition thread_safe_queue.h:278
ThreadSafeQueue & operator=(const ThreadSafeQueue &)=delete
bool try_enqueue(T value)
Try to enqueue a value without blocking.
Definition thread_safe_queue.h:181
bool try_push(T value)
Try to push a value into the queue without blocking.
Definition thread_safe_queue.h:77
void push(T value)
Push a value into the queue.
Definition thread_safe_queue.h:57
bool try_emplace(Args &&... args)
Try to emplace a value into the queue in place without blocking.
Definition thread_safe_queue.h:217
bool empty() const
Check whether the queue is empty.
Definition thread_safe_queue.h:129
T wait_and_pop()
Wait for and pop a value from the queue.
Definition thread_safe_queue.h:94
void emplace(Args &&... args)
Emplace a value into the queue in place (blocking if bounded and full).
Definition thread_safe_queue.h:195
ThreadSafeQueue(const ThreadSafeQueue &)=delete
std::optional< T > peek() const
Peek at the front item without removing it.
Definition thread_safe_queue.h:297
std::optional< T > try_pop()
Try to pop a value from the queue without blocking.
Definition thread_safe_queue.h:113
bool try_dequeue(T &out)
Try to dequeue a value without blocking.
Definition thread_safe_queue.h:249
size_t size() const
Get the current number of items in the queue.
Definition thread_safe_queue.h:141
size_t capacity() const
Get the maximum capacity of the queue.
Definition thread_safe_queue.h:152
std::optional< T > try_dequeue()
Try to dequeue a value without blocking.
Definition thread_safe_queue.h:267
void enqueue(T value)
Enqueue a value into the queue (blocking if bounded and full).
Definition thread_safe_queue.h:168
uint8_t * value
Definition co_dictionary.h:9
Definition thread_safe_queue.h:8