summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp47
1 files changed, 40 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 90b0d4cc52..42c678cb68 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -103,7 +103,8 @@ Queue::Queue(const string& _name, bool _autodelete,
eventMgr(0),
insertSeqNo(0),
broker(b),
- deleted(false)
+ deleted(false),
+ barrier(*this)
{
if (parent != 0 && broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
@@ -718,14 +719,17 @@ void Queue::setLastNodeFailure()
}
}
+
// return true if store exists,
bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck)
{
+ ScopedUse u(barrier);
+ if (!u.acquired) return false;
+
if (policy.get() && !suppressPolicyCheck) {
Messages dequeues;
{
Mutex::ScopedLock locker(messageLock);
- if (deleted) return false;
policy->tryEnqueue(msg);
policy->getPendingDequeues(dequeues);
}
@@ -765,9 +769,12 @@ void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
// return true if store exists,
bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
+ ScopedUse u(barrier);
+ if (!u.acquired) return false;
+
{
Mutex::ScopedLock locker(messageLock);
- if (deleted || !isEnqueued(msg)) return false;
+ if (!isEnqueued(msg)) return false;
if (!ctxt) {
dequeued(msg);
}
@@ -893,10 +900,10 @@ void Queue::destroy()
popAndDequeue();
}
alternateExchange->decAlternateUsers();
- deleted = true;
}
if (store) {
+ barrier.destroy();
store->flush(*this);
store->destroy(*this);
store = 0;//ensure we make no more calls to the store for this queue
@@ -1172,8 +1179,34 @@ void Queue::checkNotDeleted()
}
}
-bool Queue::isValid()
+void Queue::flush()
{
- Mutex::ScopedLock locker(messageLock);
- return !deleted;
+ ScopedUse u(barrier);
+ if (u.acquired && store) store->flush(*this);
+}
+
+Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
+
+bool Queue::UsageBarrier::acquire()
+{
+ Monitor::ScopedLock l(parent.messageLock);
+ if (parent.deleted) {
+ return false;
+ } else {
+ ++count;
+ return true;
+ }
+}
+
+void Queue::UsageBarrier::release()
+{
+ Monitor::ScopedLock l(parent.messageLock);
+ if (--count == 0) parent.messageLock.notifyAll();
+}
+
+void Queue::UsageBarrier::destroy()
+{
+ Monitor::ScopedLock l(parent.messageLock);
+ parent.deleted = true;
+ while (count) parent.messageLock.wait();
}