summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-09-28 12:12:41 +0000
committerGordon Sim <gsim@apache.org>2009-09-28 12:12:41 +0000
commit9376922f0fce58400c1e9b5b20f6c6f7b279a55b (patch)
tree52b9422da0032defb0b3b94caef6ec168e264efd /cpp/src
parent46b7c031c27b7c047d7f2361c4d8287ee1578f05 (diff)
downloadqpid-python-9376922f0fce58400c1e9b5b20f6c6f7b279a55b.tar.gz
QPID-2102: Changed QueuePolicy to rely on external locking and require dequeues to be handled by policy user rather.
(r817742 introduced a deadlock in ring queue policy which this checkin fixes) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@819505 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp19
-rw-r--r--cpp/src/qpid/broker/Queue.h12
-rw-r--r--cpp/src/qpid/broker/QueuePolicy.cpp15
-rw-r--r--cpp/src/qpid/broker/QueuePolicy.h6
-rwxr-xr-xcpp/src/tests/ring_queue_test7
5 files changed, 22 insertions, 37 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 80794f791f..5bfec0f24e 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -208,11 +208,10 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){
}
void Queue::requeue(const QueuedMessage& msg){
- if (!isEnqueued(msg)) return;
-
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
+ if (!isEnqueued(msg)) return;
msg.payload->enqueueComplete(); // mark the message as enqueued
messages.push_front(msg);
listeners.populate(copy);
@@ -603,7 +602,6 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName());
}
if (policy.get()) {
- Mutex::ScopedUnlock locker(messageLock);
policy->enqueued(qm);
}
}
@@ -696,7 +694,14 @@ void Queue::setLastNodeFailure()
bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck)
{
if (policy.get() && !suppressPolicyCheck) {
- policy->tryEnqueue(msg);
+ Messages dequeues;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ policy->tryEnqueue(msg);
+ policy->getPendingDequeues(dequeues);
+ }
+ //depending on policy, may have some dequeues that need to performed without holding the lock
+ for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
if (inLastNodeFailure && persistLastNode){
@@ -1072,10 +1077,4 @@ bool Queue::isEnqueued(const QueuedMessage& msg)
return !policy.get() || policy->isEnqueued(msg);
}
-void Queue::addPendingDequeue(const QueuedMessage& msg)
-{
- //assumes lock is held - true at present but rather nasty as this is a public method
- pendingDequeues.push_back(msg);
-}
-
QueueListeners& Queue::getListeners() { return listeners; }
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 9ac5e3f5e9..661e46f619 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -334,18 +334,6 @@ namespace qpid {
*/
void recoveryComplete();
- /**
- * This is a hack to avoid deadlocks in durable ring
- * queues. It is used for dequeueing messages in response
- * to an enqueue while avoid holding lock over call to
- * store.
- *
- * Assumes messageLock is held - true for curent use case
- * (QueuePolicy::tryEnqueue()) but rather nasty as this is a public
- * method
- **/
- void addPendingDequeue(const QueuedMessage &msg);
-
// For cluster update
QueueListeners& getListeners();
};
diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp
index 0f1f7f370f..a8aa674c53 100644
--- a/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -77,7 +77,6 @@ bool QueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
void QueuePolicy::tryEnqueue(boost::intrusive_ptr<Message> m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
if (checkLimit(m)) {
enqueued(m->contentSize());
} else {
@@ -87,13 +86,11 @@ void QueuePolicy::tryEnqueue(boost::intrusive_ptr<Message> m)
void QueuePolicy::recoverEnqueued(boost::intrusive_ptr<Message> m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
enqueued(m->contentSize());
}
void QueuePolicy::enqueueAborted(boost::intrusive_ptr<Message> m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
dequeued(m->contentSize());
}
@@ -101,7 +98,6 @@ void QueuePolicy::enqueued(const QueuedMessage&) {}
void QueuePolicy::dequeued(const QueuedMessage& m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
dequeued(m.payload->contentSize());
}
@@ -141,6 +137,7 @@ void QueuePolicy::setDefaultMaxSize(uint64_t s)
defaultMaxSize = s;
}
+void QueuePolicy::getPendingDequeues(Messages&) {}
@@ -200,14 +197,12 @@ bool before(const QueuedMessage& a, const QueuedMessage& b)
void RingQueuePolicy::enqueued(const QueuedMessage& m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
//need to insert in correct location based on position
queue.insert(lower_bound(queue.begin(), queue.end(), m, before), m);
}
void RingQueuePolicy::dequeued(const QueuedMessage& m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
//find and remove m from queue
if (find(m, pendingDequeues, true) || find(m, queue, true)) {
//now update count and size
@@ -217,7 +212,6 @@ void RingQueuePolicy::dequeued(const QueuedMessage& m)
bool RingQueuePolicy::isEnqueued(const QueuedMessage& m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
//for non-strict ring policy, a message can be replaced (and
//therefore dequeued) before it is accepted or released by
//subscriber; need to detect this
@@ -241,8 +235,6 @@ bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
pendingDequeues.push_back(oldest);
QPID_LOG(debug, "Ring policy triggered in " << name
<< ": removed message " << oldest.position << " to make way for new message");
- qpid::sys::Mutex::ScopedUnlock u(lock);
- oldest.queue->dequeue(0, oldest);
return true;
} else {
QPID_LOG(debug, "Ring policy could not be triggered in " << name
@@ -254,6 +246,11 @@ bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
}
}
+void RingQueuePolicy::getPendingDequeues(Messages& result)
+{
+ result = pendingDequeues;
+}
+
bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove)
{
for (Messages::iterator i = q.begin(); i != q.end(); i++) {
diff --git a/cpp/src/qpid/broker/QueuePolicy.h b/cpp/src/qpid/broker/QueuePolicy.h
index 65c52304f2..b2937e94c7 100644
--- a/cpp/src/qpid/broker/QueuePolicy.h
+++ b/cpp/src/qpid/broker/QueuePolicy.h
@@ -47,6 +47,7 @@ class QueuePolicy
static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue);
public:
+ typedef std::deque<QueuedMessage> Messages;
static QPID_BROKER_EXTERN const std::string maxCountKey;
static QPID_BROKER_EXTERN const std::string maxSizeKey;
static QPID_BROKER_EXTERN const std::string typeKey;
@@ -68,7 +69,7 @@ class QueuePolicy
void encode(framing::Buffer& buffer) const;
void decode ( framing::Buffer& buffer );
uint32_t encodedSize() const;
-
+ virtual void getPendingDequeues(Messages& result);
static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings);
static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
@@ -80,7 +81,6 @@ class QueuePolicy
const QueuePolicy&);
protected:
const std::string name;
- qpid::sys::Mutex lock;
QueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
@@ -105,8 +105,8 @@ class RingQueuePolicy : public QueuePolicy
void dequeued(const QueuedMessage&);
bool isEnqueued(const QueuedMessage&);
bool checkLimit(boost::intrusive_ptr<Message> msg);
+ void getPendingDequeues(Messages& result);
private:
- typedef std::deque<QueuedMessage> Messages;
Messages pendingDequeues;
Messages queue;
const bool strict;
diff --git a/cpp/src/tests/ring_queue_test b/cpp/src/tests/ring_queue_test
index 5805989d7e..553746eb49 100755
--- a/cpp/src/tests/ring_queue_test
+++ b/cpp/src/tests/ring_queue_test
@@ -48,7 +48,7 @@ receive() {
cleanup() {
rm -f sender_${QUEUE_NAME}_* receiver_${QUEUE_NAME}_*
- qpid-config $BROKER_URL add queue $QUEUE_NAME
+ qpid-config $BROKER_URL del queue $QUEUE_NAME --force
}
log() {
@@ -64,10 +64,11 @@ validate() {
if [[ $RECEIVERS -eq 0 ]]; then
#queue should have $LIMIT messages on it, but need to send an eos also
sender --routing-key $QUEUE_NAME --send-eos 1 < /dev/null
- if [[ $(receiver --queue $QUEUE_NAME --browse | wc -l) -eq $(( $LIMIT - 1)) ]]; then
+ received=$(receiver --queue $QUEUE_NAME --browse | wc -l)
+ if [[ received -eq $(( $LIMIT - 1)) ]]; then
log "queue contains $LIMIT messages as expected"
else
- fail "queue does not contain the expected $LIMIT messages"
+ fail "queue does not contain the expected $LIMIT messages (received $received)"
fi
elif [[ $CONCURRENT -eq 0 ]]; then
#sum of length of all output files should be equal to $LIMIT - $RECEIVERS (1 eos message each)