diff options
Diffstat (limited to 'src/components/include/utils/message_queue.h')
-rw-r--r-- | src/components/include/utils/message_queue.h | 50 |
1 files changed, 30 insertions, 20 deletions
diff --git a/src/components/include/utils/message_queue.h b/src/components/include/utils/message_queue.h index e0b3336728..d163ed0886 100644 --- a/src/components/include/utils/message_queue.h +++ b/src/components/include/utils/message_queue.h @@ -34,10 +34,10 @@ #define SRC_COMPONENTS_INCLUDE_UTILS_MESSAGE_QUEUE_H_ #include <queue> +#include <algorithm> #include "utils/conditional_variable.h" #include "utils/lock.h" -#include "utils/logger.h" #include "utils/prioritized_queue.h" /** @@ -84,10 +84,11 @@ template<typename T, class Q = std::queue<T> > class MessageQueue { void push(const T& element); /** - * \brief Removes element from the queue and returns it. - * \return To element of the queue. + * \brief Removes element from the queue and returns it + * \param element Element to be returned + * \return True on success, false if queue is empty */ - T pop(); + bool pop(T& element); /** * \brief Conditional wait. @@ -95,6 +96,12 @@ template<typename T, class Q = std::queue<T> > class MessageQueue { void wait(); /** + * \brief waitUntilEmpty message queue + * Wait until message queue is empty + */ + void WaitUntilEmpty(); + + /** * \brief Shutdown the queue. * This leads to waking up everyone waiting on the queue * Queue being shut down can be drained ( with pop() ) @@ -127,10 +134,6 @@ template<typename T, class Q> MessageQueue<T, Q>::MessageQueue() } template<typename T, class Q> MessageQueue<T, Q>::~MessageQueue() { - if (!queue_.empty()) { - CREATE_LOGGERPTR_LOCAL(logger_, "Utils") - LOG4CXX_ERROR(logger_, "Destruction of non-drained queue"); - } } template<typename T, class Q> void MessageQueue<T, Q>::wait() { @@ -140,6 +143,13 @@ template<typename T, class Q> void MessageQueue<T, Q>::wait() { } } +template<typename T, class Q> void MessageQueue<T, Q>::WaitUntilEmpty() { + sync_primitives::AutoLock auto_lock(queue_lock_); + while ((!shutting_down_) && !queue_.empty()) { + queue_new_items_.Wait(auto_lock); + } +} + template<typename T, class Q> size_t MessageQueue<T, Q>::size() const { sync_primitives::AutoLock auto_lock(queue_lock_); return queue_.size(); @@ -158,31 +168,31 @@ template<typename T, class Q> void MessageQueue<T, Q>::push(const T& element) { { sync_primitives::AutoLock auto_lock(queue_lock_); if (shutting_down_) { - CREATE_LOGGERPTR_LOCAL(logger_, "Utils") - LOG4CXX_ERROR(logger_, "Runtime error, pushing into queue" - " that is being shut down"); - return; + return; } - queue_.push(element); + queue_.push(element); } queue_new_items_.Broadcast(); } -template<typename T, class Q> T MessageQueue<T, Q>::pop() { +template<typename T, class Q> bool MessageQueue<T, Q>::pop(T& element) { sync_primitives::AutoLock auto_lock(queue_lock_); if (queue_.empty()) { - CREATE_LOGGERPTR_LOCAL(logger_, "Utils") - LOG4CXX_ERROR(logger_, "Runtime error, popping out of empty queue"); - NOTREACHED(); + return false; } - T result = queue_.front(); + element = queue_.front(); queue_.pop(); - return result; + queue_new_items_.NotifyOne(); + return true; } template<typename T, class Q> void MessageQueue<T, Q>::Shutdown() { sync_primitives::AutoLock auto_lock(queue_lock_); shutting_down_ = true; + if (!queue_.empty()) { + Queue empty_queue; + std::swap(queue_, empty_queue); + } queue_new_items_.Broadcast(); } @@ -191,7 +201,7 @@ template<typename T, class Q> void MessageQueue<T, Q>::Reset() { shutting_down_ = false; if (!queue_.empty()) { Queue empty_queue; - queue_.swap(empty_queue); + std::swap(queue_, empty_queue); } } |