diff options
Diffstat (limited to 'cpp/src/qpid/sys/ConcurrentQueue.h')
-rw-r--r-- | cpp/src/qpid/sys/ConcurrentQueue.h | 90 |
1 files changed, 45 insertions, 45 deletions
diff --git a/cpp/src/qpid/sys/ConcurrentQueue.h b/cpp/src/qpid/sys/ConcurrentQueue.h index 917afc5704..cf8199954e 100644 --- a/cpp/src/qpid/sys/ConcurrentQueue.h +++ b/cpp/src/qpid/sys/ConcurrentQueue.h @@ -22,7 +22,7 @@ * */ -#include "qpid/sys/Monitor.h" +#include "qpid/sys/Waitable.h" #include "qpid/sys/ScopedIncrement.h" #include <boost/bind.hpp> @@ -39,73 +39,73 @@ namespace sys { * * Also allows consuming threads to wait until an item is available. */ -template <class T> class ConcurrentQueue { +template <class T> class ConcurrentQueue : public Waitable { public: - ConcurrentQueue() : waiters(0), shutdown(false) {} + struct ShutdownException {}; + + ConcurrentQueue() : shutdownFlag(false) {} - /** Threads in wait() are woken with ShutdownException before - * destroying the queue. - */ - ~ConcurrentQueue() { - Mutex::ScopedLock l(lock); - shutdown = true; - lock.notifyAll(); - while (waiters > 0) - lock.wait(); + /** Waiting threads are notified by ~Waitable */ + ~ConcurrentQueue() { shutdown(); } + + bool shutdown(bool wait=true) { + ScopedLock l(lock); + if (!shutdownFlag) { + shutdownFlag=true; + lock.notifyAll(); + if (wait) lock.waitAll(); + shutdownFlag=true; + return true; + } + return false; } - + /** Push a data item onto the back of the queue */ void push(const T& data) { Mutex::ScopedLock l(lock); queue.push_back(data); + lock.notify(); } /** If the queue is non-empty, pop the front item into data and * return true. If the queue is empty, return false */ - bool pop(T& data) { + bool tryPop(T& data) { Mutex::ScopedLock l(lock); - return popInternal(data); + if (shutdownFlag || queue.empty()) + return false; + data = queue.front(); + queue.pop_front(); + return true; } - /** Wait up to deadline for a data item to be available. - *@return true if data was available, false if timed out. + /** Wait up to a timeout for a data item to be available. + *@return true if data was available, false if timed out or shut down. *@throws ShutdownException if the queue is destroyed. */ - bool waitPop(T& data, Duration timeout) { - Mutex::ScopedLock l(lock); - ScopedIncrement<size_t> w( - waiters, boost::bind(&ConcurrentQueue::noWaiters, this)); + bool waitPop(T& data, Duration timeout=TIME_INFINITE) { + ScopedLock l(lock); AbsTime deadline(now(), timeout); - while (queue.empty() && lock.wait(deadline)) - ; - return popInternal(data); - } - - private: - - bool popInternal(T& data) { - if (shutdown) - throw ShutdownException(); + { + ScopedWait(*this); + while (!shutdownFlag && queue.empty()) + if (!lock.wait(deadline)) + return false; + } if (queue.empty()) return false; - else { - data = queue.front(); - queue.pop_front(); - return true; - } + data = queue.front(); + queue.pop_front(); + return true; } + + bool isShutdown() { ScopedLock l(lock); return shutdownFlag; } - void noWaiters() { - assert(waiters == 0); - if (shutdown) - lock.notify(); // Notify dtor thread. - } - - Monitor lock; + protected: + Waitable lock; + private: std::deque<T> queue; - size_t waiters; - bool shutdown; + bool shutdownFlag; }; }} // namespace qpid::sys |