summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/ConcurrentQueue.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/ConcurrentQueue.h')
-rw-r--r--cpp/src/qpid/sys/ConcurrentQueue.h90
1 files changed, 45 insertions, 45 deletions
diff --git a/cpp/src/qpid/sys/ConcurrentQueue.h b/cpp/src/qpid/sys/ConcurrentQueue.h
index 917afc5704..cf8199954e 100644
--- a/cpp/src/qpid/sys/ConcurrentQueue.h
+++ b/cpp/src/qpid/sys/ConcurrentQueue.h
@@ -22,7 +22,7 @@
*
*/
-#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Waitable.h"
#include "qpid/sys/ScopedIncrement.h"
#include <boost/bind.hpp>
@@ -39,73 +39,73 @@ namespace sys {
*
* Also allows consuming threads to wait until an item is available.
*/
-template <class T> class ConcurrentQueue {
+template <class T> class ConcurrentQueue : public Waitable {
public:
- ConcurrentQueue() : waiters(0), shutdown(false) {}
+ struct ShutdownException {};
+
+ ConcurrentQueue() : shutdownFlag(false) {}
- /** Threads in wait() are woken with ShutdownException before
- * destroying the queue.
- */
- ~ConcurrentQueue() {
- Mutex::ScopedLock l(lock);
- shutdown = true;
- lock.notifyAll();
- while (waiters > 0)
- lock.wait();
+ /** Waiting threads are notified by ~Waitable */
+ ~ConcurrentQueue() { shutdown(); }
+
+ bool shutdown(bool wait=true) {
+ ScopedLock l(lock);
+ if (!shutdownFlag) {
+ shutdownFlag=true;
+ lock.notifyAll();
+ if (wait) lock.waitAll();
+ shutdownFlag=true;
+ return true;
+ }
+ return false;
}
-
+
/** Push a data item onto the back of the queue */
void push(const T& data) {
Mutex::ScopedLock l(lock);
queue.push_back(data);
+ lock.notify();
}
/** If the queue is non-empty, pop the front item into data and
* return true. If the queue is empty, return false
*/
- bool pop(T& data) {
+ bool tryPop(T& data) {
Mutex::ScopedLock l(lock);
- return popInternal(data);
+ if (shutdownFlag || queue.empty())
+ return false;
+ data = queue.front();
+ queue.pop_front();
+ return true;
}
- /** Wait up to deadline for a data item to be available.
- *@return true if data was available, false if timed out.
+ /** Wait up to a timeout for a data item to be available.
+ *@return true if data was available, false if timed out or shut down.
*@throws ShutdownException if the queue is destroyed.
*/
- bool waitPop(T& data, Duration timeout) {
- Mutex::ScopedLock l(lock);
- ScopedIncrement<size_t> w(
- waiters, boost::bind(&ConcurrentQueue::noWaiters, this));
+ bool waitPop(T& data, Duration timeout=TIME_INFINITE) {
+ ScopedLock l(lock);
AbsTime deadline(now(), timeout);
- while (queue.empty() && lock.wait(deadline))
- ;
- return popInternal(data);
- }
-
- private:
-
- bool popInternal(T& data) {
- if (shutdown)
- throw ShutdownException();
+ {
+ ScopedWait(*this);
+ while (!shutdownFlag && queue.empty())
+ if (!lock.wait(deadline))
+ return false;
+ }
if (queue.empty())
return false;
- else {
- data = queue.front();
- queue.pop_front();
- return true;
- }
+ data = queue.front();
+ queue.pop_front();
+ return true;
}
+
+ bool isShutdown() { ScopedLock l(lock); return shutdownFlag; }
- void noWaiters() {
- assert(waiters == 0);
- if (shutdown)
- lock.notify(); // Notify dtor thread.
- }
-
- Monitor lock;
+ protected:
+ Waitable lock;
+ private:
std::deque<T> queue;
- size_t waiters;
- bool shutdown;
+ bool shutdownFlag;
};
}} // namespace qpid::sys