diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 66 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 16 |
2 files changed, 53 insertions, 29 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 40f249bc11..ad06b6ecaa 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -202,19 +202,6 @@ bool Queue::acquire(const QueuedMessage& msg) { return false; } -/** - * Return true if the message can be excluded. This is currently the - * case if the queue is exclusive and has an exclusive consumer that - * doesn't want the message or has a single consumer that doesn't want - * the message (covers the JMS topic case). - */ -bool Queue::canExcludeUnwanted() -{ - Mutex::ScopedLock locker(consumerLock); - return hasExclusiveOwner() && (exclusive || consumerCount == 1); -} - - bool Queue::getNextMessage(QueuedMessage& m, Consumer& c) { if (c.preAcquires()) { @@ -252,15 +239,8 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c) } } else { //consumer will never want this message - if (canExcludeUnwanted()) { - //hack for no-local on JMS topics; get rid of this message - QPID_LOG(debug, "Excluding message from '" << name << "'"); - pop(); - } else { - //leave it for another consumer - QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); - return false; - } + QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); + return false; } } } @@ -291,22 +271,35 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer& c) return false; } +/** + * notify listeners that there may be messages to process + */ void Queue::notify() { - //notify listeners that there may be messages to process - for_each(listeners.begin(), listeners.end(), mem_fun(&Consumer::notify)); + if (listeners.empty()) return; + + Listeners copy(listeners); listeners.clear(); + + sys::ScopedLock<Guard> g(notifierLock);//prevent consumers being deleted while held in copy + { + Mutex::ScopedUnlock u(messageLock); + for_each(copy.begin(), copy.end(), mem_fun(&Consumer::notify)); + } } void Queue::removeListener(Consumer& c) { Mutex::ScopedLock locker(messageLock); - listeners.erase(&c); + notifierLock.wait(messageLock);//wait until no notifies are in progress + Listeners::iterator i = find(listeners.begin(), listeners.end(), &c); + if (i != listeners.end()) listeners.erase(i); } void Queue::addListener(Consumer& c) { - listeners.insert(&c); + Listeners::iterator i = find(listeners.begin(), listeners.end(), &c); + if (i == listeners.end()) listeners.push_back(&c); } bool Queue::dispatch(Consumer& c) @@ -682,6 +675,27 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) { } } +/* + * Use of Guard requires an external lock to be held before calling + * any of its methods + */ +Queue::Guard::Guard() : count(0) {} + +void Queue::Guard::lock() +{ + count++; +} + +void Queue::Guard::unlock() +{ + if (--count == 0) condition.notifyAll(); +} + +void Queue::Guard::wait(sys::Mutex& m) +{ + while (count) condition.wait(m); +} + ManagementObject::shared_ptr Queue::GetManagementObject (void) const { return dynamic_pointer_cast<ManagementObject> (mgmtObject); diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index f56cee0f22..29c6005d60 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -38,7 +38,6 @@ #include <vector> #include <memory> #include <deque> -#include <set> #include <boost/shared_ptr.hpp> #include <boost/enable_shared_from_this.hpp> @@ -62,9 +61,20 @@ namespace qpid { */ class Queue : public boost::enable_shared_from_this<Queue>, public PersistableQueue, public management::Manageable { - typedef std::set<Consumer*> Listeners; + typedef qpid::InlineVector<Consumer*, 5> Listeners; typedef std::deque<QueuedMessage> Messages; + class Guard + { + qpid::sys::Condition condition; + size_t count; + public: + Guard(); + void lock(); + void unlock(); + void wait(sys::Mutex&); + }; + const string name; const bool autodelete; MessageStore* store; @@ -79,6 +89,7 @@ namespace qpid { mutable qpid::sys::Mutex consumerLock; mutable qpid::sys::Mutex messageLock; mutable qpid::sys::Mutex ownershipLock; + Guard notifierLock; mutable uint64_t persistenceId; framing::FieldTable settings; std::auto_ptr<QueuePolicy> policy; @@ -95,7 +106,6 @@ namespace qpid { bool getNextMessage(QueuedMessage& msg, Consumer& c); bool consumeNextMessage(QueuedMessage& msg, Consumer& c); bool browseNextMessage(QueuedMessage& msg, Consumer& c); - bool canExcludeUnwanted(); void notify(); void removeListener(Consumer&); |