diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 66 |
1 files changed, 40 insertions, 26 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 40f249bc11..ad06b6ecaa 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -202,19 +202,6 @@ bool Queue::acquire(const QueuedMessage& msg) { return false; } -/** - * Return true if the message can be excluded. This is currently the - * case if the queue is exclusive and has an exclusive consumer that - * doesn't want the message or has a single consumer that doesn't want - * the message (covers the JMS topic case). - */ -bool Queue::canExcludeUnwanted() -{ - Mutex::ScopedLock locker(consumerLock); - return hasExclusiveOwner() && (exclusive || consumerCount == 1); -} - - bool Queue::getNextMessage(QueuedMessage& m, Consumer& c) { if (c.preAcquires()) { @@ -252,15 +239,8 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c) } } else { //consumer will never want this message - if (canExcludeUnwanted()) { - //hack for no-local on JMS topics; get rid of this message - QPID_LOG(debug, "Excluding message from '" << name << "'"); - pop(); - } else { - //leave it for another consumer - QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); - return false; - } + QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); + return false; } } } @@ -291,22 +271,35 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer& c) return false; } +/** + * notify listeners that there may be messages to process + */ void Queue::notify() { - //notify listeners that there may be messages to process - for_each(listeners.begin(), listeners.end(), mem_fun(&Consumer::notify)); + if (listeners.empty()) return; + + Listeners copy(listeners); listeners.clear(); + + sys::ScopedLock<Guard> g(notifierLock);//prevent consumers being deleted while held in copy + { + Mutex::ScopedUnlock u(messageLock); + for_each(copy.begin(), copy.end(), mem_fun(&Consumer::notify)); + } } void Queue::removeListener(Consumer& c) { Mutex::ScopedLock locker(messageLock); - listeners.erase(&c); + notifierLock.wait(messageLock);//wait until no notifies are in progress + Listeners::iterator i = find(listeners.begin(), listeners.end(), &c); + if (i != listeners.end()) listeners.erase(i); } void Queue::addListener(Consumer& c) { - listeners.insert(&c); + Listeners::iterator i = find(listeners.begin(), listeners.end(), &c); + if (i == listeners.end()) listeners.push_back(&c); } bool Queue::dispatch(Consumer& c) @@ -682,6 +675,27 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) { } } +/* + * Use of Guard requires an external lock to be held before calling + * any of its methods + */ +Queue::Guard::Guard() : count(0) {} + +void Queue::Guard::lock() +{ + count++; +} + +void Queue::Guard::unlock() +{ + if (--count == 0) condition.notifyAll(); +} + +void Queue::Guard::wait(sys::Mutex& m) +{ + while (count) condition.wait(m); +} + ManagementObject::shared_ptr Queue::GetManagementObject (void) const { return dynamic_pointer_cast<ManagementObject> (mgmtObject); |