diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 291 |
1 files changed, 127 insertions, 164 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index c43ab8c231..4dba60cd0d 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -31,7 +31,8 @@ #include <iostream> #include <boost/bind.hpp> #include "QueueRegistry.h" - +#include <algorithm> +#include <functional> using namespace qpid::broker; using namespace qpid::sys; @@ -40,6 +41,8 @@ using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; +using std::for_each; +using std::mem_fun; Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, @@ -50,10 +53,9 @@ Queue::Queue(const string& _name, bool _autodelete, autodelete(_autodelete), store(_store), owner(_owner), - next(0), - persistenceId(0), - serializer(false), - dispatchCallback(*this) + consumerCount(0), + exclusive(false), + persistenceId(0) { if (parent != 0) { @@ -73,9 +75,8 @@ Queue::~Queue() void Queue::notifyDurableIOComplete() { - // signal SemanticHander to ack completed dequeues - // then dispatch to ack... - serializer.execute(dispatchCallback); + Mutex::ScopedLock locker(messageLock); + notify(); } @@ -110,7 +111,6 @@ void Queue::deliver(intrusive_ptr<Message>& msg){ push(msg); } QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); - serializer.execute(dispatchCallback); } } @@ -148,17 +148,13 @@ void Queue::process(intrusive_ptr<Message>& msg){ mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); } } - serializer.execute(dispatchCallback); - } void Queue::requeue(const QueuedMessage& msg){ - { - Mutex::ScopedLock locker(messageLock); - msg.payload->enqueueComplete(); // mark the message as enqueued - messages.push_front(msg); - } - serializer.execute(dispatchCallback); + Mutex::ScopedLock locker(messageLock); + msg.payload->enqueueComplete(); // mark the message as enqueued + messages.push_front(msg); + notify(); } bool Queue::acquire(const QueuedMessage& msg) { @@ -172,186 +168,170 @@ bool Queue::acquire(const QueuedMessage& msg) { return false; } -void Queue::requestDispatch(Consumer::ptr c){ - if (!c || c->preAcquires()) { - serializer.execute(dispatchCallback); - } else { - DispatchFunctor f(*this, c); - serializer.execute(f); - } -} - -void Queue::flush(DispatchCompletion& completion) -{ - DispatchFunctor f(*this, &completion); - serializer.execute(f); -} - /** * Return true if the message can be excluded. This is currently the - * case if the queue has an exclusive consumer that will never want - * the message, or if the queue is exclusive to a single connection - * and has a single consumer (covers the JMS topic case). + * case if the queue is exclusive and has an exclusive consumer that + * doesn't want the message or has a single consumer that doesn't want + * the message (covers the JMS topic case). */ -bool Queue::exclude(intrusive_ptr<Message> msg) +bool Queue::canExcludeUnwanted() +{ + Mutex::ScopedLock locker(consumerLock); + return hasExclusiveOwner() && (exclusive || consumerCount == 1); +} + + +bool Queue::getNextMessage(QueuedMessage& m, Consumer& c) { - RWlock::ScopedWlock locker(consumerLock); - if (exclusive) { - return !exclusive->filter(msg); - } else if (hasExclusiveOwner() && acquirers.size() == 1) { - return !acquirers[0]->filter(msg); + if (c.preAcquires()) { + return consumeNextMessage(m, c); } else { - return false; + return browseNextMessage(m, c); } } -Consumer::ptr Queue::allocate() +bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c) { - RWlock::ScopedWlock locker(consumerLock); - - if (acquirers.empty()) { - return Consumer::ptr(); - } else if (exclusive){ - return exclusive; - } else { - next = next % acquirers.size(); - return acquirers[next++]; + while (true) { + Mutex::ScopedLock locker(messageLock); + if (messages.empty()) { + QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); + addListener(c); + return false; + } else { + QueuedMessage msg = messages.front(); + if (!msg.payload->isEnqueueComplete()) { + QPID_LOG(debug, "Messages not ready to dispatch on queue '" << name << "'"); + addListener(c); + return false; + } + + if (c.filter(msg.payload)) { + if (c.accept(msg.payload)) { + m = msg; + pop(); + return true; + } else { + //message(s) are available but consumer hasn't got enough credit + QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); + return false; + } + } else { + //consumer will never want this message + if (canExcludeUnwanted()) { + //hack for no-local on JMS topics; get rid of this message + QPID_LOG(debug, "Excluding message from '" << name << "'"); + pop(); + } else { + //leave it for another consumer + QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); + return false; + } + } + } } } -bool Queue::dispatch(QueuedMessage& msg) + +bool Queue::browseNextMessage(QueuedMessage& m, Consumer& c) { - QPID_LOG(info, "Dispatch message " << msg.position << " from queue " << name); - //additions to the acquirers will result in a separate dispatch - //request, so won't result in anyone being missed - uint counter = getAcquirerCount(); - Consumer::ptr c = allocate(); - while (c && counter--){ - if (c->deliver(msg)) { - return true; + QueuedMessage msg(this); + while (seek(msg, c)) { + if (c.filter(msg.payload)) { + if (c.accept(msg.payload)) { + //consumer wants the message + c.position = msg.position; + m = msg; + return true; + } else { + //consumer hasn't got enough credit for the message + QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); + return false; + } } else { - c = allocate(); + //consumer will never want this message, continue seeking + c.position = msg.position; + QPID_LOG(debug, "Browser skipping message from '" << name << "'"); } } return false; } -bool Queue::getNextMessage(QueuedMessage& msg) +void Queue::notify() { - Mutex::ScopedLock locker(messageLock); - if (messages.empty()) { - QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); - return false; - } else { - msg = messages.front(); - return true; - } + //notify listeners that there may be messages to process + for_each(listeners.begin(), listeners.end(), mem_fun(&Consumer::notify)); + listeners.clear(); } -void Queue::dispatch() +void Queue::removeListener(Consumer& c) { - QueuedMessage msg(this); - while (getNextMessage(msg) && msg.payload->isEnqueueComplete()){ - if (dispatch(msg)) { - pop(); - } else if (exclude(msg.payload)) { - pop(); - dequeue(0, msg.payload); - QPID_LOG(debug, "Message " << msg.payload << " filtered out of " << name << "[" << this << "]"); - } else { - break; - } - } - serviceAllBrowsers(); -} - -void Queue::serviceAllBrowsers() + Mutex::ScopedLock locker(messageLock); + listeners.erase(&c); +} + +void Queue::addListener(Consumer& c) { - Consumers copy; - { - RWlock::ScopedRlock locker(consumerLock); - if (browsers.empty()) return;//shortcut - copy = browsers; - } - for (Consumers::iterator i = copy.begin(); i != copy.end(); i++) { - serviceBrowser(*i); - } -} - -void Queue::serviceBrowser(Consumer::ptr browser) + listeners.insert(&c); +} + +bool Queue::dispatch(Consumer& c) { QueuedMessage msg(this); - while (seek(msg, browser->position) && browser->deliver(msg)) { - browser->position = msg.position; + if (getNextMessage(msg, c)) { + c.deliver(msg); + return true; + } else { + return false; } } -bool Queue::seek(QueuedMessage& msg, const framing::SequenceNumber& position) { +bool Queue::seek(QueuedMessage& msg, Consumer& c) { Mutex::ScopedLock locker(messageLock); - if (!messages.empty() && messages.back().position > position) { - if (position < messages.front().position) { + if (!messages.empty() && messages.back().position > c.position) { + if (c.position < messages.front().position) { msg = messages.front(); return true; } else { - uint index = (position - messages.front().position) + 1; + uint index = (c.position - messages.front().position) + 1; if (index < messages.size()) { msg = messages[index]; return true; } } } + addListener(c); return false; } -void Queue::consume(Consumer::ptr c, bool requestExclusive){ - RWlock::ScopedWlock locker(consumerLock); +void Queue::consume(Consumer&, bool requestExclusive){ + Mutex::ScopedLock locker(consumerLock); if(exclusive) { throw AccessRefusedException( QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); - } - if(requestExclusive) { - if(acquirers.empty() && browsers.empty()) { - exclusive = c; - } else { + } else if(requestExclusive) { + if(consumerCount) { throw AccessRefusedException( QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); - } - } - if (c->preAcquires()) { - acquirers.push_back(c); - } else { - Mutex::ScopedLock locker(messageLock); - if (messages.empty()) { - c->position = SequenceNumber(sequence.getValue() - 1); } else { - c->position = SequenceNumber(messages.front().position.getValue() - 1); + exclusive = true; } - browsers.push_back(c); } + consumerCount++; if (mgmtObject != 0){ mgmtObject->inc_consumers (); } } -void Queue::cancel(Consumer::ptr c){ - RWlock::ScopedWlock locker(consumerLock); - if (c->preAcquires()) { - cancel(c, acquirers); - } else { - cancel(c, browsers); - } +void Queue::cancel(Consumer& c){ + removeListener(c); + Mutex::ScopedLock locker(consumerLock); + consumerCount--; + if(exclusive) exclusive = false; if (mgmtObject != 0){ mgmtObject->dec_consumers (); } - if(exclusive == c) exclusive.reset(); -} - -void Queue::cancel(Consumer::ptr c, Consumers& consumers) -{ - Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c); - if (i != consumers.end()) - consumers.erase(i); } QueuedMessage Queue::dequeue(){ @@ -382,14 +362,16 @@ uint32_t Queue::purge(){ return count; } +/** + * Assumes messageLock is held + */ void Queue::pop(){ - Mutex::ScopedLock locker(messageLock); if (policy.get()) policy->dequeued(messages.front().payload->contentSize()); messages.pop_front(); } void Queue::push(intrusive_ptr<Message>& msg){ - Mutex::ScopedLock locker(messageLock); + Mutex::ScopedLock locker(messageLock); messages.push_back(QueuedMessage(this, msg, ++sequence)); if (policy.get()) { policy->enqueued(msg->contentSize()); @@ -397,6 +379,7 @@ void Queue::push(intrusive_ptr<Message>& msg){ msg->releaseContent(store); } } + notify(); } /** function only provided for unit tests, or code not in critical message path */ @@ -412,18 +395,13 @@ uint32_t Queue::getMessageCount() const{ } uint32_t Queue::getConsumerCount() const{ - RWlock::ScopedRlock locker(consumerLock); - return acquirers.size() + browsers.size(); -} - -uint32_t Queue::getAcquirerCount() const{ - RWlock::ScopedRlock locker(consumerLock); - return acquirers.size(); + Mutex::ScopedLock locker(consumerLock); + return consumerCount; } bool Queue::canAutoDelete() const{ - RWlock::ScopedRlock locker(consumerLock); - return autodelete && acquirers.empty() && browsers.empty(); + Mutex::ScopedLock locker(consumerLock); + return autodelete && !consumerCount; } // return true if store exists, @@ -601,21 +579,6 @@ bool Queue::hasExclusiveConsumer() const return exclusive; } -void Queue::DispatchFunctor::operator()() -{ - try { - if (consumer && !consumer->preAcquires()) { - queue.serviceBrowser(consumer); - }else{ - queue.dispatch(); - } - } catch (const std::exception& e) { - QPID_LOG(error, "Exception on dispatch: " << e.what()); - } - - if (sync) sync->completed(); -} - ManagementObject::shared_ptr Queue::GetManagementObject (void) const { return dynamic_pointer_cast<ManagementObject> (mgmtObject); |