25 #ifndef __mqtt_thread_queue_h 26 #define __mqtt_thread_queue_h 30 #include <condition_variable> 68 template <
typename T,
class Container=std::deque<T>>
84 mutable std::mutex lock_;
86 std::condition_variable notEmptyCond_;
88 std::condition_variable notFullCond_;
92 std::queue<T,Container> que_;
95 using guard = std::lock_guard<std::mutex>;
97 using unique_guard = std::unique_lock<std::mutex>;
152 unique_guard g(lock_);
153 notFullCond_.wait(g, [
this]{
return que_.size() < cap_;});
155 que_.emplace(std::move(val));
157 notEmptyCond_.notify_one();
166 unique_guard g(lock_);
167 if (que_.size() >= cap_)
170 que_.emplace(std::move(val));
172 notEmptyCond_.notify_one();
184 template <
typename Rep,
class Period>
186 unique_guard g(lock_);
187 if (!notFullCond_.wait_for(g, relTime, [
this]{return que_.size() < cap_;}))
190 que_.emplace(std::move(val));
192 notEmptyCond_.notify_one();
205 template <
class Clock,
class Duration>
207 unique_guard g(lock_);
208 if (!notFullCond_.wait_until(g, absTime, [
this]{return que_.size() < cap_;}))
211 que_.emplace(std::move(val));
213 notEmptyCond_.notify_one();
226 unique_guard g(lock_);
227 notEmptyCond_.wait(g, [
this]{
return !que_.empty();});
229 *val = std::move(que_.front());
232 notFullCond_.notify_one();
241 unique_guard g(lock_);
242 notEmptyCond_.wait(g, [
this]{
return !que_.empty();});
247 notFullCond_.notify_one();
262 unique_guard g(lock_);
266 *val = std::move(que_.front());
269 notFullCond_.notify_one();
282 template <
typename Rep,
class Period>
287 unique_guard g(lock_);
288 if (!notEmptyCond_.wait_for(g, relTime, [
this]{return !que_.empty();}))
291 *val = std::move(que_.front());
294 notFullCond_.notify_one();
307 template <
class Clock,
class Duration>
312 unique_guard g(lock_);
313 if (!notEmptyCond_.wait_until(g, absTime, [
this]{return !que_.empty();}))
316 *val = std::move(que_.front());
319 notFullCond_.notify_one();
328 #endif // __mqtt_thread_queue_h void capacity(size_type cap)
Definition: thread_queue.h:133
bool empty() const
Definition: thread_queue.h:115
thread_queue(size_t cap)
Definition: thread_queue.h:109
bool try_get(value_type *val)
Definition: thread_queue.h:258
bool try_get_until(value_type *val, const std::chrono::time_point< Clock, Duration > &absTime)
Definition: thread_queue.h:308
Container container_type
Definition: thread_queue.h:73
static constexpr size_type MAX_CAPACITY
Definition: thread_queue.h:80
typename Container::size_type size_type
Definition: thread_queue.h:77
thread_queue()
Definition: thread_queue.h:103
Definition: thread_queue.h:69
bool try_put_until(value_type val, const std::chrono::time_point< Clock, Duration > &absTime)
Definition: thread_queue.h:206
size_type capacity() const
Definition: thread_queue.h:123
bool try_put(value_type val)
Definition: thread_queue.h:165
void put(value_type val)
Definition: thread_queue.h:151
bool try_get_for(value_type *val, const std::chrono::duration< Rep, Period > &relTime)
Definition: thread_queue.h:283
Definition: async_client.h:49
bool try_put_for(value_type val, const std::chrono::duration< Rep, Period > &relTime)
Definition: thread_queue.h:185
T value_type
Definition: thread_queue.h:75
size_type size() const
Definition: thread_queue.h:141