summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2010-05-15 17:05:01 +0000
committerGordon Sim <gsim@apache.org>2010-05-15 17:05:01 +0000
commitd745be62c6477f03c5e4734a39c2be49943561c0 (patch)
tree4307689e41914ae30b7b809c50573db2d0df01af /cpp/src
parent94c33a9ea95ee56bb9e12018d053bfe68d86927f (diff)
downloadqpid-python-d745be62c6477f03c5e4734a39c2be49943561c0.tar.gz
QPID-2588: Prevent queue being destroyed while still in use.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@944683 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.cpp4
-rw-r--r--cpp/src/qpid/broker/PersistableQueue.h2
-rw-r--r--cpp/src/qpid/broker/Queue.cpp47
-rw-r--r--cpp/src/qpid/broker/Queue.h26
-rw-r--r--cpp/src/tests/IncompleteMessageList.cpp2
5 files changed, 68 insertions, 13 deletions
diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp
index 21fb421168..e5fbb71cbd 100644
--- a/cpp/src/qpid/broker/PersistableMessage.cpp
+++ b/cpp/src/qpid/broker/PersistableMessage.cpp
@@ -52,8 +52,8 @@ void PersistableMessage::flush()
}
for (syncList::iterator i = copy.begin(); i != copy.end(); ++i) {
PersistableQueue::shared_ptr q(i->lock());
- if (q && q->isValid()) {
- store->flush(*q);
+ if (q) {
+ q->flush();
}
}
}
diff --git a/cpp/src/qpid/broker/PersistableQueue.h b/cpp/src/qpid/broker/PersistableQueue.h
index bcf4268158..655d26bc74 100644
--- a/cpp/src/qpid/broker/PersistableQueue.h
+++ b/cpp/src/qpid/broker/PersistableQueue.h
@@ -60,7 +60,7 @@ public:
};
virtual void setExternalQueueStore(ExternalQueueStore* inst) = 0;
- virtual bool isValid() = 0;
+ virtual void flush() = 0;
inline ExternalQueueStore* getExternalQueueStore() const {return externalQueueStore;};
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 90b0d4cc52..42c678cb68 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -103,7 +103,8 @@ Queue::Queue(const string& _name, bool _autodelete,
eventMgr(0),
insertSeqNo(0),
broker(b),
- deleted(false)
+ deleted(false),
+ barrier(*this)
{
if (parent != 0 && broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
@@ -718,14 +719,17 @@ void Queue::setLastNodeFailure()
}
}
+
// return true if store exists,
bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck)
{
+ ScopedUse u(barrier);
+ if (!u.acquired) return false;
+
if (policy.get() && !suppressPolicyCheck) {
Messages dequeues;
{
Mutex::ScopedLock locker(messageLock);
- if (deleted) return false;
policy->tryEnqueue(msg);
policy->getPendingDequeues(dequeues);
}
@@ -765,9 +769,12 @@ void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
// return true if store exists,
bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
+ ScopedUse u(barrier);
+ if (!u.acquired) return false;
+
{
Mutex::ScopedLock locker(messageLock);
- if (deleted || !isEnqueued(msg)) return false;
+ if (!isEnqueued(msg)) return false;
if (!ctxt) {
dequeued(msg);
}
@@ -893,10 +900,10 @@ void Queue::destroy()
popAndDequeue();
}
alternateExchange->decAlternateUsers();
- deleted = true;
}
if (store) {
+ barrier.destroy();
store->flush(*this);
store->destroy(*this);
store = 0;//ensure we make no more calls to the store for this queue
@@ -1172,8 +1179,34 @@ void Queue::checkNotDeleted()
}
}
-bool Queue::isValid()
+void Queue::flush()
{
- Mutex::ScopedLock locker(messageLock);
- return !deleted;
+ ScopedUse u(barrier);
+ if (u.acquired && store) store->flush(*this);
+}
+
+Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
+
+bool Queue::UsageBarrier::acquire()
+{
+ Monitor::ScopedLock l(parent.messageLock);
+ if (parent.deleted) {
+ return false;
+ } else {
+ ++count;
+ return true;
+ }
+}
+
+void Queue::UsageBarrier::release()
+{
+ Monitor::ScopedLock l(parent.messageLock);
+ if (--count == 0) parent.messageLock.notifyAll();
+}
+
+void Queue::UsageBarrier::destroy()
+{
+ Monitor::ScopedLock l(parent.messageLock);
+ parent.deleted = true;
+ while (count) parent.messageLock.wait();
}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 51b64ba036..cdfa8a1a1f 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -59,6 +59,8 @@ namespace qpid {
using std::string;
+
+
/**
* The brokers representation of an amqp queue. Messages are
* delivered to a queue from where they can be dispatched to
@@ -68,6 +70,25 @@ namespace qpid {
class Queue : public boost::enable_shared_from_this<Queue>,
public PersistableQueue, public management::Manageable {
+ struct UsageBarrier
+ {
+ Queue& parent;
+ uint count;
+
+ UsageBarrier(Queue&);
+ bool acquire();
+ void release();
+ void destroy();
+ };
+
+ struct ScopedUse
+ {
+ UsageBarrier& barrier;
+ const bool acquired;
+ ScopedUse(UsageBarrier& b) : barrier(b), acquired(barrier.acquire()) {}
+ ~ScopedUse() { if (acquired) barrier.release(); }
+ };
+
typedef std::deque<QueuedMessage> Messages;
typedef std::map<string,boost::intrusive_ptr<Message> > LVQ;
enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
@@ -90,7 +111,7 @@ namespace qpid {
Messages pendingDequeues;//used to avoid dequeuing during recovery
LVQ lvq;
mutable qpid::sys::Mutex consumerLock;
- mutable qpid::sys::Mutex messageLock;
+ mutable qpid::sys::Monitor messageLock;
mutable qpid::sys::Mutex ownershipLock;
mutable uint64_t persistenceId;
framing::FieldTable settings;
@@ -108,6 +129,7 @@ namespace qpid {
std::string seqNoKey;
Broker* broker;
bool deleted;
+ UsageBarrier barrier;
void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -342,7 +364,7 @@ namespace qpid {
*/
void recoverPrepared(boost::intrusive_ptr<Message>& msg);
- bool isValid();
+ void flush();
};
}
}
diff --git a/cpp/src/tests/IncompleteMessageList.cpp b/cpp/src/tests/IncompleteMessageList.cpp
index 303d83cd66..10782572e5 100644
--- a/cpp/src/tests/IncompleteMessageList.cpp
+++ b/cpp/src/tests/IncompleteMessageList.cpp
@@ -114,7 +114,7 @@ QPID_AUTO_TEST_CASE(testSyncProcessWithIncomplete)
IncompleteMessageList list;
SequenceNumber counter(1);
MockStore store;
- store.queue = Queue::shared_ptr(new Queue("mock-queue"));
+ store.queue = Queue::shared_ptr(new Queue("mock-queue", false, &store));
//fill up list with messages
for (int i = 0; i < 5; i++) {
boost::intrusive_ptr<Message> msg(new Message(counter++));