diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 145 |
1 files changed, 82 insertions, 63 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 969d510e26..0e822d3d4a 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -240,7 +240,7 @@ void Queue::requeue(const QueuedMessage& msg){ } mgntDeqStats(msg.payload); } else { - messages->reinsert(msg); + messages->release(msg); listeners.populate(copy); // for persistLastNode - don't force a message twice to disk, but force it if no force before @@ -306,7 +306,7 @@ void Queue::notifyListener() bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) { - checkNotDeleted(); + checkNotDeleted(c); if (c->preAcquires()) { switch (consumeNextMessage(m, c)) { case CONSUMED: @@ -327,48 +327,43 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ while (true) { Mutex::ScopedLock locker(messageLock); QueuedMessage msg; + if (allocator->nextConsumableMessage(c, msg)) { + if (msg.payload->hasExpired()) { + QPID_LOG(debug, "Message expired from queue '" << name << "'"); + c->setPosition(msg.position); + dequeue(0, msg); + if (mgmtObject) { + mgmtObject->inc_discardsTtl(); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsTtl(); + } - if (!allocator->nextConsumableMessage(c, msg)) { // no next available - QPID_LOG(debug, "No messages available to dispatch to consumer " << - c->getName() << " on queue '" << name << "'"); - listeners.addListener(c); - return NO_MESSAGES; - } - - if (msg.payload->hasExpired()) { - QPID_LOG(debug, "Message expired from queue '" << name << "'"); - c->setPosition(msg.position); - acquire( msg.position, msg, locker); - dequeue( 0, msg ); - if (mgmtObject) { - mgmtObject->inc_discardsTtl(); - if (brokerMgmtObject) - brokerMgmtObject->inc_discardsTtl(); + continue; } - continue; - } - // a message is available for this consumer - can the consumer use it? - - if (c->filter(msg.payload)) { - if (c->accept(msg.payload)) { - bool ok = allocator->allocate( c->getName(), msg ); // inform allocator - (void) ok; assert(ok); - ok = acquire( msg.position, msg, locker); - (void) ok; assert(ok); - m = msg; - c->setPosition(m.position); - return CONSUMED; + if (c->filter(msg.payload)) { + if (c->accept(msg.payload)) { + bool ok = allocator->allocate( c->getName(), msg ); // inform allocator + (void) ok; assert(ok); + observeAcquire(msg, locker); + m = msg; + return CONSUMED; + } else { + //message(s) are available but consumer hasn't got enough credit + QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); + messages->release(msg); + return CANT_CONSUME; + } } else { - //message(s) are available but consumer hasn't got enough credit - QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); + //consumer will never want this message + QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); + messages->release(msg); return CANT_CONSUME; } } else { - //consumer will never want this message - QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); - c->setPosition(msg.position); - return CANT_CONSUME; + QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); + listeners.addListener(c); + return NO_MESSAGES; } } } @@ -431,7 +426,6 @@ bool Queue::dispatch(Consumer::shared_ptr c) } bool Queue::find(SequenceNumber pos, QueuedMessage& msg) const { - Mutex::ScopedLock locker(messageLock); if (messages->find(pos, msg)) return true; @@ -493,7 +487,7 @@ void Queue::cancel(Consumer::shared_ptr c){ QueuedMessage Queue::get(){ Mutex::ScopedLock locker(messageLock); QueuedMessage msg(this); - if (messages->pop(msg)) + if (messages->consume(msg)) observeAcquire(msg, locker); return msg; } @@ -687,6 +681,7 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> // Update observers and message state: observeAcquire(*qmsg, locker); dequeue(0, *qmsg); + QPID_LOG(debug, "Purged message at " << qmsg->position << " from " << getName()); // now reroute if necessary if (dest.get()) { assert(qmsg->payload); @@ -718,24 +713,11 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty, return c.matches.size(); } -/** Acquire the front (oldest) message from the in-memory queue. - * assumes messageLock held by caller - */ -void Queue::pop(const Mutex::ScopedLock& locker) -{ - assertClusterSafe(); - QueuedMessage msg; - if (messages->pop(msg)) { - observeAcquire(msg, locker); - ++dequeueSincePurge; - } -} - /** Acquire the message at the given position, return true and msg if acquire succeeds */ bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg, const Mutex::ScopedLock& locker) { - if (messages->remove(position, msg)) { + if (messages->acquire(position, msg)) { observeAcquire(msg, locker); ++dequeueSincePurge; return true; @@ -952,12 +934,14 @@ void Queue::dequeueCommitted(const QueuedMessage& msg) * Removes the first (oldest) message from the in-memory delivery queue as well dequeing * it from the logical (and persistent if applicable) queue */ -void Queue::popAndDequeue(const Mutex::ScopedLock& held) +bool Queue::popAndDequeue(QueuedMessage& msg, const Mutex::ScopedLock& locker) { - if (!messages->empty()) { - QueuedMessage msg = messages->front(); - pop(held); + if (messages->consume(msg)) { + observeAcquire(msg, locker); dequeue(0, msg); + return true; + } else { + return false; } } @@ -969,6 +953,7 @@ void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) { mgntDeqStats(msg.payload); if (policy.get()) policy->dequeued(msg); + messages->deleted(msg); for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ (*i)->dequeued(msg); @@ -1167,8 +1152,9 @@ void Queue::destroyed() unbind(broker->getExchanges()); { Mutex::ScopedLock locker(messageLock); - while(!messages->empty()){ - DeliverableMessage msg(messages->front().payload); + QueuedMessage m; + while(popAndDequeue(m, locker)) { + DeliverableMessage msg(m.payload); if (alternateExchange.get()) { if (brokerMgmtObject) brokerMgmtObject->inc_abandonedViaAlt(); @@ -1177,7 +1163,6 @@ void Queue::destroyed() if (brokerMgmtObject) brokerMgmtObject->inc_abandoned(); } - popAndDequeue(locker); } if (alternateExchange.get()) alternateExchange->decAlternateUsers(); @@ -1191,6 +1176,10 @@ void Queue::destroyed() } if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>(); notifyDeleted(); + { + Mutex::ScopedLock locker(messageLock); + observers.clear(); + } } void Queue::notifyDeleted() @@ -1477,6 +1466,7 @@ void Queue::query(qpid::types::Variant::Map& results) const void Queue::setPosition(SequenceNumber n) { Mutex::ScopedLock locker(messageLock); sequence = n; + QPID_LOG(trace, "Set position to " << sequence << " on " << getName()); } SequenceNumber Queue::getPosition() { @@ -1549,9 +1539,9 @@ QueueListeners& Queue::getListeners() { return listeners; } Messages& Queue::getMessages() { return *messages; } const Messages& Queue::getMessages() const { return *messages; } -void Queue::checkNotDeleted() +void Queue::checkNotDeleted(const Consumer::shared_ptr& c) { - if (deleted) { + if (deleted && !c->hideDeletedError()) { throw ResourceDeletedException(QPID_MSG("Queue " << getName() << " has been deleted.")); } } @@ -1562,6 +1552,12 @@ void Queue::addObserver(boost::shared_ptr<QueueObserver> observer) observers.insert(observer); } +void Queue::removeObserver(boost::shared_ptr<QueueObserver> observer) +{ + Mutex::ScopedLock locker(messageLock); + observers.erase(observer); +} + void Queue::flush() { ScopedUse u(barrier); @@ -1584,7 +1580,7 @@ bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key, } -const Broker* Queue::getBroker() +Broker* Queue::getBroker() { return broker; } @@ -1593,6 +1589,29 @@ void Queue::setDequeueSincePurge(uint32_t value) { dequeueSincePurge = value; } +namespace{ +class FindLowest +{ + public: + FindLowest() : init(false) {} + void process(const QueuedMessage& message) { + QPID_LOG(debug, "FindLowest processing: " << message.position); + if (!init || message.position < lowest) lowest = message.position; + init = true; + } + bool getLowest(qpid::framing::SequenceNumber& result) { + if (init) { + result = lowest; + return true; + } else { + return false; + } + } + private: + bool init; + qpid::framing::SequenceNumber lowest; +}; +} Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {} |