summaryrefslogtreecommitdiff
path: root/src/components/include/utils/message_queue.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/components/include/utils/message_queue.h')
-rw-r--r--src/components/include/utils/message_queue.h50
1 files changed, 30 insertions, 20 deletions
diff --git a/src/components/include/utils/message_queue.h b/src/components/include/utils/message_queue.h
index e0b3336728..d163ed0886 100644
--- a/src/components/include/utils/message_queue.h
+++ b/src/components/include/utils/message_queue.h
@@ -34,10 +34,10 @@
#define SRC_COMPONENTS_INCLUDE_UTILS_MESSAGE_QUEUE_H_
#include <queue>
+#include <algorithm>
#include "utils/conditional_variable.h"
#include "utils/lock.h"
-#include "utils/logger.h"
#include "utils/prioritized_queue.h"
/**
@@ -84,10 +84,11 @@ template<typename T, class Q = std::queue<T> > class MessageQueue {
void push(const T& element);
/**
- * \brief Removes element from the queue and returns it.
- * \return To element of the queue.
+ * \brief Removes element from the queue and returns it
+ * \param element Element to be returned
+ * \return True on success, false if queue is empty
*/
- T pop();
+ bool pop(T& element);
/**
* \brief Conditional wait.
@@ -95,6 +96,12 @@ template<typename T, class Q = std::queue<T> > class MessageQueue {
void wait();
/**
+ * \brief waitUntilEmpty message queue
+ * Wait until message queue is empty
+ */
+ void WaitUntilEmpty();
+
+ /**
* \brief Shutdown the queue.
* This leads to waking up everyone waiting on the queue
* Queue being shut down can be drained ( with pop() )
@@ -127,10 +134,6 @@ template<typename T, class Q> MessageQueue<T, Q>::MessageQueue()
}
template<typename T, class Q> MessageQueue<T, Q>::~MessageQueue() {
- if (!queue_.empty()) {
- CREATE_LOGGERPTR_LOCAL(logger_, "Utils")
- LOG4CXX_ERROR(logger_, "Destruction of non-drained queue");
- }
}
template<typename T, class Q> void MessageQueue<T, Q>::wait() {
@@ -140,6 +143,13 @@ template<typename T, class Q> void MessageQueue<T, Q>::wait() {
}
}
+template<typename T, class Q> void MessageQueue<T, Q>::WaitUntilEmpty() {
+ sync_primitives::AutoLock auto_lock(queue_lock_);
+ while ((!shutting_down_) && !queue_.empty()) {
+ queue_new_items_.Wait(auto_lock);
+ }
+}
+
template<typename T, class Q> size_t MessageQueue<T, Q>::size() const {
sync_primitives::AutoLock auto_lock(queue_lock_);
return queue_.size();
@@ -158,31 +168,31 @@ template<typename T, class Q> void MessageQueue<T, Q>::push(const T& element) {
{
sync_primitives::AutoLock auto_lock(queue_lock_);
if (shutting_down_) {
- CREATE_LOGGERPTR_LOCAL(logger_, "Utils")
- LOG4CXX_ERROR(logger_, "Runtime error, pushing into queue"
- " that is being shut down");
- return;
+ return;
}
- queue_.push(element);
+ queue_.push(element);
}
queue_new_items_.Broadcast();
}
-template<typename T, class Q> T MessageQueue<T, Q>::pop() {
+template<typename T, class Q> bool MessageQueue<T, Q>::pop(T& element) {
sync_primitives::AutoLock auto_lock(queue_lock_);
if (queue_.empty()) {
- CREATE_LOGGERPTR_LOCAL(logger_, "Utils")
- LOG4CXX_ERROR(logger_, "Runtime error, popping out of empty queue");
- NOTREACHED();
+ return false;
}
- T result = queue_.front();
+ element = queue_.front();
queue_.pop();
- return result;
+ queue_new_items_.NotifyOne();
+ return true;
}
template<typename T, class Q> void MessageQueue<T, Q>::Shutdown() {
sync_primitives::AutoLock auto_lock(queue_lock_);
shutting_down_ = true;
+ if (!queue_.empty()) {
+ Queue empty_queue;
+ std::swap(queue_, empty_queue);
+ }
queue_new_items_.Broadcast();
}
@@ -191,7 +201,7 @@ template<typename T, class Q> void MessageQueue<T, Q>::Reset() {
shutting_down_ = false;
if (!queue_.empty()) {
Queue empty_queue;
- queue_.swap(empty_queue);
+ std::swap(queue_, empty_queue);
}
}