summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp66
1 files changed, 40 insertions, 26 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 40f249bc11..ad06b6ecaa 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -202,19 +202,6 @@ bool Queue::acquire(const QueuedMessage& msg) {
return false;
}
-/**
- * Return true if the message can be excluded. This is currently the
- * case if the queue is exclusive and has an exclusive consumer that
- * doesn't want the message or has a single consumer that doesn't want
- * the message (covers the JMS topic case).
- */
-bool Queue::canExcludeUnwanted()
-{
- Mutex::ScopedLock locker(consumerLock);
- return hasExclusiveOwner() && (exclusive || consumerCount == 1);
-}
-
-
bool Queue::getNextMessage(QueuedMessage& m, Consumer& c)
{
if (c.preAcquires()) {
@@ -252,15 +239,8 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c)
}
} else {
//consumer will never want this message
- if (canExcludeUnwanted()) {
- //hack for no-local on JMS topics; get rid of this message
- QPID_LOG(debug, "Excluding message from '" << name << "'");
- pop();
- } else {
- //leave it for another consumer
- QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
- return false;
- }
+ QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
+ return false;
}
}
}
@@ -291,22 +271,35 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer& c)
return false;
}
+/**
+ * notify listeners that there may be messages to process
+ */
void Queue::notify()
{
- //notify listeners that there may be messages to process
- for_each(listeners.begin(), listeners.end(), mem_fun(&Consumer::notify));
+ if (listeners.empty()) return;
+
+ Listeners copy(listeners);
listeners.clear();
+
+ sys::ScopedLock<Guard> g(notifierLock);//prevent consumers being deleted while held in copy
+ {
+ Mutex::ScopedUnlock u(messageLock);
+ for_each(copy.begin(), copy.end(), mem_fun(&Consumer::notify));
+ }
}
void Queue::removeListener(Consumer& c)
{
Mutex::ScopedLock locker(messageLock);
- listeners.erase(&c);
+ notifierLock.wait(messageLock);//wait until no notifies are in progress
+ Listeners::iterator i = find(listeners.begin(), listeners.end(), &c);
+ if (i != listeners.end()) listeners.erase(i);
}
void Queue::addListener(Consumer& c)
{
- listeners.insert(&c);
+ Listeners::iterator i = find(listeners.begin(), listeners.end(), &c);
+ if (i == listeners.end()) listeners.push_back(&c);
}
bool Queue::dispatch(Consumer& c)
@@ -682,6 +675,27 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
}
}
+/*
+ * Use of Guard requires an external lock to be held before calling
+ * any of its methods
+ */
+Queue::Guard::Guard() : count(0) {}
+
+void Queue::Guard::lock()
+{
+ count++;
+}
+
+void Queue::Guard::unlock()
+{
+ if (--count == 0) condition.notifyAll();
+}
+
+void Queue::Guard::wait(sys::Mutex& m)
+{
+ while (count) condition.wait(m);
+}
+
ManagementObject::shared_ptr Queue::GetManagementObject (void) const
{
return dynamic_pointer_cast<ManagementObject> (mgmtObject);