diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 1293 |
1 files changed, 492 insertions, 801 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index d5267c78dc..0dd4cb7b10 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -20,23 +20,23 @@ */ #include "qpid/broker/Queue.h" - #include "qpid/broker/Broker.h" -#include "qpid/broker/QueueEvents.h" +#include "qpid/broker/QueueCursor.h" +#include "qpid/broker/QueueDepth.h" +#include "qpid/broker/QueueSettings.h" #include "qpid/broker/Exchange.h" -#include "qpid/broker/Fairshare.h" #include "qpid/broker/DeliverableMessage.h" -#include "qpid/broker/LegacyLVQ.h" -#include "qpid/broker/MessageDeque.h" -#include "qpid/broker/MessageMap.h" #include "qpid/broker/MessageStore.h" +#include "qpid/broker/MessageDeque.h" +#include "qpid/broker/MessageDistributor.h" +#include "qpid/broker/FifoDistributor.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/QueueRegistry.h" -#include "qpid/broker/QueueFlowLimit.h" -#include "qpid/broker/ThresholdAlerts.h" -#include "qpid/broker/FifoDistributor.h" -#include "qpid/broker/MessageGroupManager.h" +//TODO: get rid of this +#include "qpid/broker/amqp_0_10/MessageTransfer.h" + +#include "qpid/amqp_0_10/Codecs.h" #include "qpid/StringUtils.h" #include "qpid/log/Statement.h" #include "qpid/management/ManagementAgent.h" @@ -76,26 +76,8 @@ namespace _qmf = qmf::org::apache::qpid::broker; namespace { -const std::string qpidMaxSize("qpid.max_size"); -const std::string qpidMaxCount("qpid.max_count"); -const std::string qpidNoLocal("no-local"); -const std::string qpidTraceIdentity("qpid.trace.id"); -const std::string qpidTraceExclude("qpid.trace.exclude"); -const std::string qpidLastValueQueueKey("qpid.last_value_queue_key"); -const std::string qpidLastValueQueue("qpid.last_value_queue"); -const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse"); -const std::string qpidPersistLastNode("qpid.persist_last_node"); -const std::string qpidVQMatchProperty("qpid.LVQ_key"); -const std::string qpidQueueEventGeneration("qpid.queue_event_generation"); -const std::string qpidAutoDeleteTimeout("qpid.auto_delete_timeout"); -//following feature is not ready for general use as it doesn't handle -//the case where a message is enqueued on more than one queue well enough: -const std::string qpidInsertSequenceNumbers("qpid.insert_sequence_numbers"); - -const int ENQUEUE_ONLY=1; -const int ENQUEUE_AND_DEQUEUE=2; - -inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg, + +inline void mgntEnqStats(const Message& msg, _qmf::Queue* mgmtObject, _qmf::Broker* brokerMgmtObject) { @@ -103,12 +85,12 @@ inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg, _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics(); - uint64_t contentSize = msg->contentSize(); + uint64_t contentSize = msg.getContentSize(); qStats->msgTotalEnqueues +=1; bStats->msgTotalEnqueues += 1; qStats->byteTotalEnqueues += contentSize; bStats->byteTotalEnqueues += contentSize; - if (msg->isPersistent ()) { + if (msg.isPersistent ()) { qStats->msgPersistEnqueues += 1; bStats->msgPersistEnqueues += 1; qStats->bytePersistEnqueues += contentSize; @@ -119,20 +101,20 @@ inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg, } } -inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg, +inline void mgntDeqStats(const Message& msg, _qmf::Queue* mgmtObject, _qmf::Broker* brokerMgmtObject) { if (mgmtObject != 0){ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics(); - uint64_t contentSize = msg->contentSize(); + uint64_t contentSize = msg.getContentSize(); qStats->msgTotalDequeues += 1; bStats->msgTotalDequeues += 1; qStats->byteTotalDequeues += contentSize; bStats->byteTotalDequeues += contentSize; - if (msg->isPersistent ()){ + if (msg.isPersistent ()){ qStats->msgPersistDequeues += 1; bStats->msgPersistDequeues += 1; qStats->bytePersistDequeues += contentSize; @@ -143,43 +125,81 @@ inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg, } } -} // namespace +QueueSettings merge(const QueueSettings& inputs, const Broker::Options& globalOptions) +{ + QueueSettings settings(inputs); + if (!settings.maxDepth.hasSize() && globalOptions.queueLimit) { + settings.maxDepth.setSize(globalOptions.queueLimit); + } + return settings; +} + +} -Queue::Queue(const string& _name, bool _autodelete, +Queue::TxPublish::TxPublish(const Message& m, boost::shared_ptr<Queue> q) : message(m), queue(q), prepared(false) {} +bool Queue::TxPublish::prepare(TransactionContext* ctxt) throw() +{ + try { + prepared = queue->enqueue(ctxt, message); + return true; + } catch (const std::exception& e) { + QPID_LOG(error, "Failed to prepare: " << e.what()); + return false; + } +} +void Queue::TxPublish::commit() throw() +{ + try { + if (prepared) queue->process(message); + } catch (const std::exception& e) { + QPID_LOG(error, "Failed to commit: " << e.what()); + } +} +void Queue::TxPublish::rollback() throw() +{ + try { + if (prepared) queue->enqueueAborted(message); + } catch (const std::exception& e) { + QPID_LOG(error, "Failed to rollback: " << e.what()); + } +} + +Queue::Queue(const string& _name, const QueueSettings& _settings, MessageStore* const _store, - const OwnershipToken* const _owner, Manageable* parent, Broker* b) : name(_name), - autodelete(_autodelete), store(_store), - owner(_owner), + owner(0), consumerCount(0), browserCount(0), exclusive(0), - noLocal(false), persistLastNode(false), inLastNodeFailure(false), messages(new MessageDeque()), persistenceId(0), - policyExceeded(false), + settings(b ? merge(_settings, b->getOptions()) : _settings), mgmtObject(0), brokerMgmtObject(0), eventMode(0), - insertSeqNo(0), broker(b), deleted(false), barrier(*this), - autoDeleteTimeout(0), allocator(new FifoDistributor( *messages )) { + if (settings.maxDepth.hasCount()) current.setCount(0); + if (settings.maxDepth.hasSize()) current.setSize(0); + if (settings.traceExcludes.size()) { + split(traceExclude, settings.traceExcludes, ", "); + } + qpid::amqp_0_10::translate(settings.asMap(), encodableSettings); if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); if (agent != 0) { - mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, _autodelete); - mgmtObject->set_exclusive(_owner != 0); + mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete); + mgmtObject->set_arguments(settings.asMap()); agent->addObject(mgmtObject, 0, store != 0); brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject(); if (brokerMgmtObject) @@ -197,32 +217,36 @@ Queue::~Queue() } } -bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg) +bool isLocalTo(const OwnershipToken* token, const Message& msg) { - return token && token->isLocal(msg->getPublisher()); + return token && token->isLocal(msg.getPublisher()); } -bool Queue::isLocal(boost::intrusive_ptr<Message>& msg) +bool Queue::isLocal(const Message& msg) { //message is considered local if it was published on the same //connection as that of the session which declared this queue //exclusive (owner) or which has an exclusive subscription //(exclusive) - return noLocal && (isLocalTo(owner, msg) || isLocalTo(exclusive, msg)); + return settings.noLocal && (isLocalTo(owner, msg) || isLocalTo(exclusive, msg)); } -bool Queue::isExcluded(boost::intrusive_ptr<Message>& msg) +bool Queue::isExcluded(const Message& msg) { - return traceExclude.size() && msg->isExcluded(traceExclude); + return traceExclude.size() && msg.isExcluded(traceExclude); } -void Queue::deliver(boost::intrusive_ptr<Message> msg){ +void Queue::deliver(Message msg, TxBuffer* txn){ + //TODO: move some of this out of the queue and into the publishing + //'link' for whatever protocol is used; that would let protocol + //specific stuff be kept out the queue + // Check for deferred delivery in a cluster. if (broker && broker->deferDelivery(name, msg)) return; - if (msg->isImmediate() && getConsumerCount() == 0) { + if (broker::amqp_0_10::MessageTransfer::isImmediateDeliveryRequired(msg) && getConsumerCount() == 0) { if (alternateExchange) { - DeliverableMessage deliverable(msg); + DeliverableMessage deliverable(msg, 0); alternateExchange->route(deliverable); } } else if (isLocal(msg)) { @@ -232,47 +256,38 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){ //drop message QPID_LOG(info, "Dropping excluded message from " << getName()); } else { - enqueue(0, msg); - push(msg); - QPID_LOG(debug, "Message " << msg << " enqueued on " << name); + if (txn) { + TxOp::shared_ptr op(new TxPublish(msg, shared_from_this())); + txn->enlist(op); + } else { + if (enqueue(0, msg)) { + push(msg); + QPID_LOG(debug, "Message " << msg << " enqueued on " << name); + } else { + QPID_LOG(debug, "Message " << msg << " dropped from " << name); + } + } } } -void Queue::recoverPrepared(boost::intrusive_ptr<Message>& msg) +void Queue::recoverPrepared(const Message& msg) { Mutex::ScopedLock locker(messageLock); - if (policy.get()) policy->recoverEnqueued(msg); + current += QueueDepth(1, msg.getContentSize()); } -void Queue::recover(boost::intrusive_ptr<Message>& msg) +void Queue::recover(Message& msg) { - { - Mutex::ScopedLock locker(messageLock); - if (policy.get()) policy->recoverEnqueued(msg); - } - + recoverPrepared(msg); push(msg, true); - if (store){ - // setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure - msg->addToSyncList(shared_from_this(), store); - } - - if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) { - //content has not been loaded, need to ensure that lazy loading mode is set: - //TODO: find a nicer way to do this - msg->releaseContent(store); - // NOTE: The log message in this section are used for flow-to-disk testing (which checks the log for the - // presence of this message). Do not change this without also checking these tests. - QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" << - std::hex << msg->getPersistenceId() << std::dec << ": Content released after recovery"); - } } -void Queue::process(boost::intrusive_ptr<Message>& msg){ +void Queue::process(Message& msg) +{ push(msg); if (mgmtObject != 0){ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); - const uint64_t contentSize = msg->contentSize(); + const uint64_t contentSize = msg.getContentSize(); qStats->msgTxnEnqueues += 1; qStats->byteTxnEnqueues += contentSize; mgmtObject->statisticsUpdated(); @@ -285,46 +300,22 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){ } } -void Queue::requeue(const QueuedMessage& msg){ +void Queue::release(const QueueCursor& position, bool markRedelivered) +{ assertClusterSafe(); QueueListeners::NotificationSet copy; { - if (!isEnqueued(msg)) return; - if (deleted) { - // - // If the queue has been deleted, requeued messages must be sent to the alternate exchange - // if one is configured. - // - if (alternateExchange.get()) { - DeliverableMessage dmsg(msg.payload); - alternateExchange->routeWithAlternate(dmsg); - if (brokerMgmtObject) - brokerMgmtObject->inc_abandonedViaAlt(); - } else { - if (brokerMgmtObject) - brokerMgmtObject->inc_abandoned(); - } - mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject); - } else { - { - Mutex::ScopedLock locker(messageLock); - messages->release(msg); - observeRequeue(msg, locker); + Mutex::ScopedLock locker(messageLock); + if (!deleted) { + Message* message = messages->release(position); + if (message) { + if (!markRedelivered) message->undeliver(); listeners.populate(copy); - } - - if (mgmtObject) { - mgmtObject->inc_releases(); - if (brokerMgmtObject) - brokerMgmtObject->inc_releases(); - } - - // for persistLastNode - don't force a message twice to disk, but force it if no force before - if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) { - msg.payload->forcePersistent(); - if (msg.payload->isForcedPersistent() ){ - boost::intrusive_ptr<Message> payload = msg.payload; - enqueue(0, payload); + observeRequeue(*message, locker); + if (mgmtObject) { + mgmtObject->inc_releases(); + if (brokerMgmtObject) + brokerMgmtObject->inc_releases(); } } } @@ -332,163 +323,118 @@ void Queue::requeue(const QueuedMessage& msg){ copy.notify(); } -bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) +bool Queue::dequeueMessageAt(const SequenceNumber& position) { - assertClusterSafe(); - QPID_LOG(debug, "Attempting to acquire message at " << position); - if (acquire(position, message)) { - QPID_LOG(debug, "Acquired message at " << position << " from " << name); - return true; - } else { - QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position"); - return false; - } -} - -bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer) -{ - assertClusterSafe(); - QPID_LOG(debug, consumer << " attempting to acquire message at " << msg.position); - bool ok; + boost::intrusive_ptr<PersistableMessage> pmsg; { Mutex::ScopedLock locker(messageLock); - ok = allocator->allocate( consumer, msg ); - } - if (!ok) { - QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name); - return false; - } - - QueuedMessage copy(msg); - if (acquire( msg.position, copy)) { - QPID_LOG(debug, "Acquired message at " << msg.position << " from " << name); - return true; + assertClusterSafe(); + QPID_LOG(debug, "Attempting to dequeue message at " << position); + QueueCursor cursor; + Message* msg = messages->find(position, &cursor); + if (msg) { + if (msg->isPersistent()) pmsg = msg->getPersistentContext(); + observeDequeue(*msg, locker); + messages->deleted(cursor); + } else { + QPID_LOG(debug, "Could not dequeue message at " << position << "; no such message"); + return false; + } } - QPID_LOG(debug, "Could not acquire message at " << msg.position << " from " << name << "; no message at that position"); - return false; + dequeueFromStore(pmsg); + return true; } -void Queue::notifyListener() +bool Queue::acquire(const QueueCursor& position, const std::string& consumer) { + Mutex::ScopedLock locker(messageLock); assertClusterSafe(); - QueueListeners::NotificationSet set; - { - Mutex::ScopedLock locker(messageLock); - if (messages->size()) { - listeners.populate(set); - } - } - set.notify(); -} + Message* msg; -bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) -{ - checkNotDeleted(c); - if (c->preAcquires()) { - switch (consumeNextMessage(m, c)) { - case CONSUMED: - return true; - case CANT_CONSUME: - notifyListener();//let someone else try - case NO_MESSAGES: - default: + msg = messages->find(position); + if (msg) { + QPID_LOG(debug, consumer << " attempting to acquire message at " << msg->getSequence()); + if (!allocator->acquire(consumer, *msg)) { + QPID_LOG(debug, "Not permitted to acquire msg at " << msg->getSequence() << " from '" << name); return false; + } else { + observeAcquire(*msg, locker); + QPID_LOG(debug, "Acquired message at " << msg->getSequence() << " from " << name); + return true; } } else { - return browseNextMessage(m, c); + QPID_LOG(debug, "Failed to acquire message which no longer exists on " << name); + return false; } } -Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) +bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c) { + checkNotDeleted(c); + QueueListeners::NotificationSet set; while (true) { - QueuedMessage msg; - bool found; - { - Mutex::ScopedLock locker(messageLock); - found = allocator->nextConsumableMessage(c, msg); - if (!found) listeners.addListener(c); - } - if (!found) { - QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); - return NO_MESSAGES; - } - - if (msg.payload->hasExpired()) { - QPID_LOG(debug, "Message expired from queue '" << name << "'"); - c->setPosition(msg.position); - dequeue(0, msg); - if (mgmtObject) { - mgmtObject->inc_discardsTtl(); - if (brokerMgmtObject) - brokerMgmtObject->inc_discardsTtl(); - } - continue; - } - - if (c->filter(msg.payload)) { - if (c->accept(msg.payload)) { - { - Mutex::ScopedLock locker(messageLock); - bool ok = allocator->allocate( c->getName(), msg ); // inform allocator - (void) ok; assert(ok); - observeAcquire(msg, locker); - } + //TODO: reduce lock scope + Mutex::ScopedLock locker(messageLock); + Message* msg = messages->next(*c); + if (msg) { + if (msg->hasExpired()) { + QPID_LOG(debug, "Message expired from queue '" << name << "'"); + observeDequeue(*msg, locker); + //ERROR: don't hold lock across call to store!! + if (msg->isPersistent()) dequeueFromStore(msg->getPersistentContext()); if (mgmtObject) { - mgmtObject->inc_acquires(); + mgmtObject->inc_discardsTtl(); if (brokerMgmtObject) - brokerMgmtObject->inc_acquires(); + brokerMgmtObject->inc_discardsTtl(); } - m = msg; - return CONSUMED; - } else { - //message(s) are available but consumer hasn't got enough credit - QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); + messages->deleted(*c); + continue; } - } else { - //consumer will never want this message - QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); - } - - Mutex::ScopedLock locker(messageLock); - messages->release(msg); - return CANT_CONSUME; - } -} -bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) -{ - while (true) { - QueuedMessage msg; - bool found; - { - Mutex::ScopedLock locker(messageLock); - found = allocator->nextBrowsableMessage(c, msg); - if (!found) listeners.addListener(c); - } - if (!found) { // no next available - QPID_LOG(debug, "No browsable messages available for consumer " << - c->getName() << " on queue '" << name << "'"); - return false; - } - - if (c->filter(msg.payload) && !msg.payload->hasExpired()) { - if (c->accept(msg.payload)) { - //consumer wants the message - c->setPosition(msg.position); - m = msg; - return true; + if (c->filter(*msg)) { + if (c->accept(*msg)) { + if (c->preAcquires()) { + QPID_LOG(debug, "Attempting to acquire message " << msg << " from '" << name << "' with state " << msg->getState()); + if (allocator->acquire(c->getName(), *msg)) { + if (mgmtObject) { + mgmtObject->inc_acquires(); + if (brokerMgmtObject) + brokerMgmtObject->inc_acquires(); + } + observeAcquire(*msg, locker); + msg->deliver(); + } else { + QPID_LOG(debug, "Could not acquire message from '" << name << "'"); + continue; //try another message + } + } + QPID_LOG(debug, "Message retrieved from '" << name << "'"); + m = *msg; + return true; + } else { + //message(s) are available but consumer hasn't got enough credit + QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); + if (c->preAcquires()) { + //let someone else try + listeners.populate(set); + } + break; + } } else { - //browser hasn't got enough credit for the message - QPID_LOG(debug, "Browser can't currently accept message from '" << name << "'"); - return false; + //consumer will never want this message, try another one + QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); + if (c->preAcquires()) { + //let someone else try to take this one + listeners.populate(set); + } } } else { - //consumer will never want this message, continue seeking - QPID_LOG(debug, "Browser skipping message from '" << name << "'"); - c->setPosition(msg.position); + QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); + listeners.addListener(c); + return false; } } + set.notify(); return false; } @@ -507,23 +453,28 @@ void Queue::removeListener(Consumer::shared_ptr c) bool Queue::dispatch(Consumer::shared_ptr c) { - QueuedMessage msg(this); + Message msg; if (getNextMessage(msg, c)) { - c->deliver(msg); + c->deliver(*c, msg); return true; } else { return false; } } -bool Queue::find(SequenceNumber pos, QueuedMessage& msg) const { +bool Queue::find(SequenceNumber pos, Message& msg) const +{ Mutex::ScopedLock locker(messageLock); - if (messages->find(pos, msg)) + Message* ptr = messages->find(pos, 0); + if (ptr) { + msg = *ptr; return true; + } return false; } -void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ +void Queue::consume(Consumer::shared_ptr c, bool requestExclusive) +{ assertClusterSafe(); { Mutex::ScopedLock locker(messageLock); @@ -550,7 +501,7 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ browserCount++; consumerCount++; //reset auto deletion timer if necessary - if (autoDeleteTimeout && autoDeleteTask) { + if (settings.autoDeleteDelay && autoDeleteTask) { autoDeleteTask->cancel(); } observeConsumerAdd(*c, locker); @@ -559,7 +510,8 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ mgmtObject->inc_consumerCount (); } -void Queue::cancel(Consumer::shared_ptr c){ +void Queue::cancel(Consumer::shared_ptr c) +{ removeListener(c); { Mutex::ScopedLock locker(messageLock); @@ -572,65 +524,6 @@ void Queue::cancel(Consumer::shared_ptr c){ mgmtObject->dec_consumerCount (); } -QueuedMessage Queue::get(){ - QueuedMessage msg(this); - bool ok; - { - Mutex::ScopedLock locker(messageLock); - ok = messages->consume(msg); - if (ok) observeAcquire(msg, locker); - } - - if (ok && mgmtObject) { - mgmtObject->inc_acquires(); - if (brokerMgmtObject) - brokerMgmtObject->inc_acquires(); - } - - return msg; -} - -namespace { -bool collectIf(QueuedMessage& qm, Messages::Predicate predicate, - std::deque<QueuedMessage>& collection) -{ - if (predicate(qm)) { - collection.push_back(qm); - return true; - } else { - return false; - } -} - -bool isExpired(const QueuedMessage& qm) { return qm.payload->hasExpired(); } -} // namespace - -void Queue::dequeueIf(Messages::Predicate predicate, - std::deque<QueuedMessage>& dequeued) -{ - { - Mutex::ScopedLock locker(messageLock); - messages->removeIf(boost::bind(&collectIf, _1, predicate, boost::ref(dequeued))); - } - if (!dequeued.empty()) { - if (mgmtObject) { - mgmtObject->inc_acquires(dequeued.size()); - if (brokerMgmtObject) - brokerMgmtObject->inc_acquires(dequeued.size()); - } - for (std::deque<QueuedMessage>::const_iterator i = dequeued.begin(); - i != dequeued.end(); ++i) { - { - // KAG: should be safe to retake lock after the removeIf, since - // no other thread can touch these messages after the removeIf() call - Mutex::ScopedLock locker(messageLock); - observeAcquire(*i, locker); - } - dequeue( 0, *i ); - } - } -} - /** *@param lapse: time since the last purgeExpired */ @@ -642,13 +535,17 @@ void Queue::purgeExpired(sys::Duration lapse) { dequeueSincePurge -= count; int seconds = int64_t(lapse)/qpid::sys::TIME_SEC; if (seconds == 0 || count / seconds < 1) { - std::deque<QueuedMessage> dequeued; - dequeueIf(boost::bind(&isExpired, _1), dequeued); - if (dequeued.size()) { - if (mgmtObject) { - mgmtObject->inc_discardsTtl(dequeued.size()); - if (brokerMgmtObject) - brokerMgmtObject->inc_discardsTtl(dequeued.size()); + uint32_t count = remove(0, boost::bind(&Message::hasExpired, _1), 0, CONSUMER); + QPID_LOG(debug, "Purged " << count << " expired messages from " << getName()); + // + // Report the count of discarded-by-ttl messages + // + if (mgmtObject && count) { + mgmtObject->inc_acquires(count); + mgmtObject->inc_discardsTtl(count); + if (brokerMgmtObject) { + brokerMgmtObject->inc_acquires(count); + brokerMgmtObject->inc_discardsTtl(count); } } } @@ -663,7 +560,7 @@ namespace { static const std::string typeKey; static const std::string paramsKey; static MessageFilter *create( const ::qpid::types::Variant::Map *filter ); - virtual bool match( const QueuedMessage& ) const { return true; } + virtual bool match( const Message& ) const { return true; } virtual ~MessageFilter() {} protected: MessageFilter() {}; @@ -687,13 +584,9 @@ namespace { static const std::string valueKey; HeaderMatchFilter( const std::string& _header, const std::string& _value ) : MessageFilter (), header(_header), value(_value) {} - bool match( const QueuedMessage& msg ) const + bool match( const Message& msg ) const { - const qpid::framing::FieldTable* headers = msg.payload->getApplicationHeaders(); - if (!headers) return false; - FieldTable::ValuePtr h = headers->get(header); - if (!h || !h->convertsTo<std::string>()) return false; - return h->get<std::string>() == value; + return msg.getPropertyAsString(header) == value; } private: const std::string header; @@ -730,36 +623,68 @@ namespace { return new MessageFilter(); } - // used by removeIf() to collect all messages matching a filter, maximum match count is - // optional. - struct Collector { - const uint32_t maxMatches; - MessageFilter& filter; - std::deque<QueuedMessage> matches; - Collector(MessageFilter& filter, uint32_t max) - : maxMatches(max), filter(filter) {} - bool operator() (QueuedMessage& qm) - { - if (maxMatches == 0 || matches.size() < maxMatches) { - if (filter.match( qm )) { - matches.push_back(qm); - return true; - } - } + bool reroute(boost::shared_ptr<Exchange> e, const Message& m) + { + if (e) { + DeliverableMessage d(m, 0); + d.getMessage().clearTrace(); + e->routeWithAlternate(d); + return true; + } else { return false; } - }; - + } + void moveTo(boost::shared_ptr<Queue> q, Message& m) + { + if (q) { + q->deliver(m); + } + } } // end namespace +uint32_t Queue::remove(const uint32_t maxCount, MessagePredicate p, MessageFunctor f, SubscriptionType type) +{ + std::deque<Message> removed; + { + QueueCursor c(type); + uint32_t count(0); + Mutex::ScopedLock locker(messageLock); + Message* m = messages->next(c); + while (m){ + if (!p || p(*m)) { + if (!maxCount || count++ < maxCount) { + if (m->getState() == AVAILABLE) { + //don't actually acquire, just act as if we did + observeAcquire(*m, locker); + } + observeDequeue(*m, locker); + removed.push_back(*m);//takes a copy of the message + if (!messages->deleted(c)) { + QPID_LOG(warning, "Failed to correctly remove message from " << name << "; state is not consistent!"); + assert(false); + } + } else { + break; + } + } + m = messages->next(c); + } + } + for (std::deque<Message>::iterator i = removed.begin(); i != removed.end(); ++i) { + if (f) f(*i);//ERROR? need to clear old persistent context? + if (i->isPersistent()) dequeueFromStore(i->getPersistentContext());//do this outside of lock and after any re-routing + } + return removed.size(); +} + /** * purge - for purging all or some messages on a queue * depending on the purge_request * - * purge_request == 0 then purge all messages - * == N then purge N messages from queue - * Sometimes purge_request == 1 to unblock the top of queue + * qty == 0 then purge all messages + * == N then purge N messages from queue + * Sometimes qty == 1 to unblock the top of queue * * The dest exchange may be supplied to re-route messages through the exchange. * It is safe to re-route messages such that they arrive back on the same queue, @@ -768,172 +693,53 @@ namespace { * An optional filter can be supplied that will be applied against each message. The * message is purged only if the filter matches. See MessageDistributor for more detail. */ -uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest, +uint32_t Queue::purge(const uint32_t qty, boost::shared_ptr<Exchange> dest, const qpid::types::Variant::Map *filter) { std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter)); - Collector c(*mf.get(), purge_request); - - { - Mutex::ScopedLock locker(messageLock); - messages->removeIf( boost::bind<bool>(boost::ref(c), _1) ); - } + uint32_t count = remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&reroute, dest, _1), CONSUMER/*?*/); - if (!c.matches.empty()) { - if (mgmtObject) { - mgmtObject->inc_acquires(c.matches.size()); - if (dest.get()) { - mgmtObject->inc_reroutes(c.matches.size()); - if (brokerMgmtObject) { - brokerMgmtObject->inc_acquires(c.matches.size()); - brokerMgmtObject->inc_reroutes(c.matches.size()); - } - } else { - mgmtObject->inc_discardsPurge(c.matches.size()); - if (brokerMgmtObject) { - brokerMgmtObject->inc_acquires(c.matches.size()); - brokerMgmtObject->inc_discardsPurge(c.matches.size()); - } - } - } - - for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin(); - qmsg != c.matches.end(); ++qmsg) { - - { - // KAG: should be safe to retake lock after the removeIf, since - // no other thread can touch these messages after the removeIf call - Mutex::ScopedLock locker(messageLock); - observeAcquire(*qmsg, locker); + if (mgmtObject && count) { + mgmtObject->inc_acquires(count); + if (dest.get()) { + mgmtObject->inc_reroutes(count); + if (brokerMgmtObject) { + brokerMgmtObject->inc_acquires(count); + brokerMgmtObject->inc_reroutes(count); } - dequeue(0, *qmsg); - QPID_LOG(debug, "Purged message at " << qmsg->position << " from " << getName()); - // now reroute if necessary - if (dest.get()) { - assert(qmsg->payload); - qmsg->payload->clearTrace(); - DeliverableMessage dmsg(qmsg->payload); - dest->routeWithAlternate(dmsg); + } else { + mgmtObject->inc_discardsPurge(count); + if (brokerMgmtObject) { + brokerMgmtObject->inc_acquires(count); + brokerMgmtObject->inc_discardsPurge(count); } } } - return c.matches.size(); + + return count; } uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty, const qpid::types::Variant::Map *filter) { std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter)); - Collector c(*mf.get(), qty); - - { - Mutex::ScopedLock locker(messageLock); - messages->removeIf( boost::bind<bool>(boost::ref(c), _1) ); - } - - - if (!c.matches.empty()) { - // Update observers and message state: - - if (mgmtObject) { - mgmtObject->inc_acquires(c.matches.size()); - if (brokerMgmtObject) - brokerMgmtObject->inc_acquires(c.matches.size()); - } - - for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin(); - qmsg != c.matches.end(); ++qmsg) { - { - Mutex::ScopedLock locker(messageLock); - observeAcquire(*qmsg, locker); - } - dequeue(0, *qmsg); - // and move to destination Queue. - assert(qmsg->payload); - destq->deliver(qmsg->payload); - } - } - return c.matches.size(); + return remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&moveTo, destq, _1), CONSUMER/*?*/); } -/** Acquire the message at the given position, return true and msg if acquire succeeds */ -bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg) +void Queue::push(Message& message, bool /*isRecovery*/) { - bool ok; - { - Mutex::ScopedLock locker(messageLock); - ok = messages->acquire(position, msg); - if (ok) observeAcquire(msg, locker); - } - if (ok) { - if (mgmtObject) { - mgmtObject->inc_acquires(); - if (brokerMgmtObject) - brokerMgmtObject->inc_acquires(); - } - ++dequeueSincePurge; - return true; - } - return false; -} - -void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ assertClusterSafe(); QueueListeners::NotificationSet copy; - QueuedMessage removed, qm(this, msg); - bool dequeueRequired = false; { Mutex::ScopedLock locker(messageLock); - qm.position = ++sequence; - if (messages->push(qm, removed)) { - dequeueRequired = true; - observeAcquire(removed, locker); - } - observeEnqueue(qm, locker); - if (policy.get()) { - policy->enqueued(qm); - } + message.setSequence(++sequence); + messages->publish(message); listeners.populate(copy); - } - if (insertSeqNo) msg->insertCustomProperty(seqNoKey, qm.position); - - mgntEnqStats(msg, mgmtObject, brokerMgmtObject); - - if (dequeueRequired) { - if (mgmtObject) { - mgmtObject->inc_acquires(); - mgmtObject->inc_discardsLvq(); - if (brokerMgmtObject) { - brokerMgmtObject->inc_acquires(); - brokerMgmtObject->inc_discardsLvq(); - } - } - if (isRecovery) { - //can't issue new requests for the store until - //recovery is complete - Mutex::ScopedLock locker(messageLock); - pendingDequeues.push_back(removed); - } else { - dequeue(0, removed); - } + observeEnqueue(message, locker); } copy.notify(); } -void isEnqueueComplete(uint32_t* result, const QueuedMessage& message) -{ - if (message.payload->isIngressComplete()) (*result)++; -} - -/** function only provided for unit tests, or code not in critical message path */ -uint32_t Queue::getEnqueueCompleteMessageCount() const -{ - uint32_t count = 0; - Mutex::ScopedLock locker(messageLock); - messages->foreach(boost::bind(&isEnqueueComplete, &count, _1)); - return count; -} - uint32_t Queue::getMessageCount() const { Mutex::ScopedLock locker(messageLock); @@ -949,7 +755,7 @@ uint32_t Queue::getConsumerCount() const bool Queue::canAutoDelete() const { Mutex::ScopedLock locker(messageLock); - return autodelete && !consumerCount && !owner; + return settings.autodelete && !consumerCount && !owner; } void Queue::clearLastNodeFailure() @@ -957,14 +763,9 @@ void Queue::clearLastNodeFailure() inLastNodeFailure = false; } -void Queue::forcePersistent(QueuedMessage& message) +void Queue::forcePersistent(const Message& /*message*/) { - if(!message.payload->isStoredOnQueue(shared_from_this())) { - message.payload->forcePersistent(); - if (message.payload->isForcedPersistent() ){ - enqueue(0, message.payload); - } - } + //TODO } void Queue::setLastNodeFailure() @@ -982,153 +783,129 @@ void Queue::setLastNodeFailure() } -// return true if store exists, -bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck) +/* + * return true if enqueue succeeded and message should be made + * available; returning false will result in the message being dropped + */ +bool Queue::enqueue(TransactionContext* ctxt, Message& msg) { ScopedUse u(barrier); if (!u.acquired) return false; - if (policy.get() && !suppressPolicyCheck) { - std::deque<QueuedMessage> dequeues; - { - Mutex::ScopedLock locker(messageLock); - try { - policy->tryEnqueue(msg); - } catch(ResourceLimitExceededException&) { - if (mgmtObject) { - mgmtObject->inc_discardsOverflow(); - if (brokerMgmtObject) - brokerMgmtObject->inc_discardsOverflow(); - } - throw; - } - policy->getPendingDequeues(dequeues); - } - //depending on policy, may have some dequeues that need to performed without holding the lock - - // - // Count the dequeues as ring-discards. We know that these aren't rejects because - // policy->tryEnqueue would have thrown an exception. - // - if (mgmtObject && !dequeues.empty()) { - mgmtObject->inc_discardsRing(dequeues.size()); - if (brokerMgmtObject) - brokerMgmtObject->inc_discardsRing(dequeues.size()); + { + Mutex::ScopedLock locker(messageLock); + if (!checkDepth(QueueDepth(1, msg.getContentSize()), msg)) { + return false; } - - for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); } if (inLastNodeFailure && persistLastNode){ - msg->forcePersistent(); + forcePersistent(msg); } - if (traceId.size()) { - msg->addTraceId(traceId); + if (settings.traceId.size()) { + msg.addTraceId(settings.traceId); } - if ((msg->isPersistent() || msg->checkContentReleasable()) && store) { + if (msg.isPersistent() && store) { // mark the message as being enqueued - the store MUST CALL msg->enqueueComplete() // when it considers the message stored. - msg->enqueueAsync(shared_from_this(), store); - boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg); + boost::intrusive_ptr<PersistableMessage> pmsg = msg.getPersistentContext(); + assert(pmsg); + pmsg->enqueueAsync(shared_from_this(), store); store->enqueue(ctxt, pmsg, *this); - return true; } - if (!store) { - //Messages enqueued on a transient queue should be prevented - //from having their content released as it may not be - //recoverable by these queue for delivery - msg->blockContentRelease(); - } - return false; + return true; } -void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg) +void Queue::enqueueAborted(const Message& msg) { + //Called when any transactional enqueue is aborted (including but + //not limited to a recovered dtx transaction) Mutex::ScopedLock locker(messageLock); - if (policy.get()) policy->enqueueAborted(msg); + current -= QueueDepth(1, msg.getContentSize()); } -// return true if store exists, -bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) +void Queue::enqueueCommited(Message& msg) { - ScopedUse u(barrier); - if (!u.acquired) return false; - { - Mutex::ScopedLock locker(messageLock); - if (!isEnqueued(msg)) return false; - if (!ctxt) { - if (policy.get()) policy->dequeued(msg); - messages->deleted(msg); - observeDequeue(msg, locker); - } + //called when a recovered dtx enqueue operation is committed; the + //message is already on disk and space has been reserved in policy + //but it should now be made available + process(msg); +} +void Queue::dequeueAborted(Message& msg) +{ + //called when a recovered dtx dequeue operation is aborted; the + //message should be added back to the queue + push(msg); +} +void Queue::dequeueCommited(const Message& msg) +{ + //called when a recovered dtx dequeue operation is committed; the + //message will at this point have already been removed from the + //store and will not be available for delivery. The only action + //required is to ensure the observers are notified and the + //management stats are correctly decremented + Mutex::ScopedLock locker(messageLock); + observeDequeue(msg, locker); + if (mgmtObject != 0) { + mgmtObject->inc_msgTxnDequeues(); + mgmtObject->inc_byteTxnDequeues(msg.getContentSize()); } +} - if (!ctxt) { - mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject); - } - // This check prevents messages which have been forced persistent on one queue from dequeuing - // from another on which no forcing has taken place and thus causing a store error. - bool fp = msg.payload->isForcedPersistent(); - if (!fp || (fp && msg.payload->isStoredOnQueue(shared_from_this()))) { - if ((msg.payload->isPersistent() || msg.payload->checkContentReleasable()) && store) { - msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue - boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload); - store->dequeue(ctxt, pmsg, *this); - return true; - } +void Queue::dequeueFromStore(boost::intrusive_ptr<PersistableMessage> msg) +{ + ScopedUse u(barrier); + if (u.acquired && msg && store) { + store->dequeue(0, msg, *this); } - return false; } -void Queue::dequeueCommitted(const QueuedMessage& msg) +void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor) { + ScopedUse u(barrier); + if (!u.acquired) return; + boost::intrusive_ptr<PersistableMessage> pmsg; { Mutex::ScopedLock locker(messageLock); - if (policy.get()) policy->dequeued(msg); - messages->deleted(msg); - observeDequeue(msg, locker); + Message* msg = messages->find(cursor); + if (msg) { + if (msg->isPersistent()) pmsg = msg->getPersistentContext(); + if (!ctxt) { + observeDequeue(*msg, locker); + messages->deleted(cursor);//message pointer not valid after this + } + } else { + return; + } } - mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject); - if (mgmtObject != 0) { - _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); - const uint64_t contentSize = msg.payload->contentSize(); - qStats->msgTxnDequeues += 1; - qStats->byteTxnDequeues += contentSize; - mgmtObject->statisticsUpdated(); + if (store && pmsg) { + store->dequeue(ctxt, pmsg, *this); + } +} + +void Queue::dequeueCommitted(const QueueCursor& cursor) +{ + Mutex::ScopedLock locker(messageLock); + Message* msg = messages->find(cursor); + if (msg) { + const uint64_t contentSize = msg->getContentSize(); + observeDequeue(*msg, locker); + if (mgmtObject != 0) { + mgmtObject->inc_msgTxnDequeues(); + mgmtObject->inc_byteTxnDequeues(contentSize); + } if (brokerMgmtObject) { _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics(); bStats->msgTxnDequeues += 1; bStats->byteTxnDequeues += contentSize; brokerMgmtObject->statisticsUpdated(); } - } -} - -/** - * Removes the first (oldest) message from the in-memory delivery queue as well dequeing - * it from the logical (and persistent if applicable) queue - */ -bool Queue::popAndDequeue(QueuedMessage& msg) -{ - bool popped; - { - Mutex::ScopedLock locker(messageLock); - popped = messages->consume(msg); - if (popped) observeAcquire(msg, locker); - } - if (popped) { - if (mgmtObject) { - mgmtObject->inc_acquires(); - if (brokerMgmtObject) - brokerMgmtObject->inc_acquires(); - } - dequeue(0, msg); - return true; + messages->deleted(cursor); } else { - return false; + QPID_LOG(error, "Could not find dequeued message on commit"); } } @@ -1136,8 +913,10 @@ bool Queue::popAndDequeue(QueuedMessage& msg) * Updates policy and management when a message has been dequeued, * Requires messageLock be held by caller. */ -void Queue::observeDequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&) +void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock&) { + current -= QueueDepth(1, msg.getContentSize()); + mgntDeqStats(msg, mgmtObject, brokerMgmtObject); for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ (*i)->dequeued(msg); @@ -1150,7 +929,7 @@ void Queue::observeDequeue(const QueuedMessage& msg, const qpid::sys::Mutex::Sco /** updates queue observers when a message has become unavailable for transfer. * Requires messageLock be held by caller. */ -void Queue::observeAcquire(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&) +void Queue::observeAcquire(const Message& msg, const Mutex::ScopedLock&) { for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ @@ -1164,7 +943,7 @@ void Queue::observeAcquire(const QueuedMessage& msg, const qpid::sys::Mutex::Sco /** updates queue observers when a message has become re-available for transfer * Requires messageLock be held by caller. */ -void Queue::observeRequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&) +void Queue::observeRequeue(const Message& msg, const Mutex::ScopedLock&) { for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ @@ -1202,13 +981,11 @@ void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::Sc } -void Queue::create(const FieldTable& _settings) +void Queue::create() { - settings = _settings; if (store) { - store->create(*this, _settings); + store->create(*this, settings.storeSettings); } - configureImpl(_settings); } @@ -1258,112 +1035,21 @@ bool getBoolSetting(const qpid::framing::FieldTable& settings, const std::string } } -void Queue::configure(const FieldTable& _settings) +void Queue::abandoned(const Message& message) { - settings = _settings; - configureImpl(settings); -} - -void Queue::configureImpl(const FieldTable& _settings) -{ - eventMode = _settings.getAsInt(qpidQueueEventGeneration); - if (eventMode && broker) { - broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY); - } - - if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && - (!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) { - if ( NullMessageStore::isNullStore(store)) { - QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName()); - } else if (broker && !(broker->getQueueEvents().isSync()) ) { - QPID_LOG(warning, "Flow to disk not valid with async Queue Events:" << getName()); - } - FieldTable copy(_settings); - copy.erase(QueuePolicy::typeKey); - setPolicy(QueuePolicy::createQueuePolicy(getName(), copy)); - } else { - setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings)); - } - if (broker && broker->getManagementAgent()) { - ThresholdAlerts::observe(*this, *(broker->getManagementAgent()), _settings, broker->getOptions().queueThresholdEventRatio); - } - - //set this regardless of owner to allow use of no-local with exclusive consumers also - noLocal = getBoolSetting(_settings, qpidNoLocal); - QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal); - - std::string lvqKey = _settings.getAsString(qpidLastValueQueueKey); - if (lvqKey.size()) { - QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey); - messages = std::auto_ptr<Messages>(new MessageMap(lvqKey)); - allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages )); - } else if (getBoolSetting(_settings, qpidLastValueQueueNoBrowse)) { - QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on"); - messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker); - allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages )); - } else if (getBoolSetting(_settings, qpidLastValueQueue)) { - QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue"); - messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker); - allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages )); - } else { - std::auto_ptr<Messages> m = Fairshare::create(_settings); - if (m.get()) { - messages = m; - allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages )); - QPID_LOG(debug, "Configured queue " << getName() << " as priority queue."); - } else { // default (FIFO) queue type - // override default message allocator if message groups configured. - boost::shared_ptr<MessageGroupManager> mgm(MessageGroupManager::create( getName(), *messages, _settings)); - if (mgm) { - allocator = mgm; - addObserver(mgm); - } - } - } - - persistLastNode = getBoolSetting(_settings, qpidPersistLastNode); - if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName()); - - traceId = _settings.getAsString(qpidTraceIdentity); - std::string excludeList = _settings.getAsString(qpidTraceExclude); - if (excludeList.size()) { - split(traceExclude, excludeList, ", "); - } - QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId - << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements"); - - FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers); - if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>()); - - autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout); - if (autoDeleteTimeout) - QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout); - - if (mgmtObject != 0) { - mgmtObject->set_arguments(ManagementAgent::toMap(_settings)); - } - - QueueFlowLimit::observe(*this, _settings); + if (reroute(alternateExchange, message) && brokerMgmtObject) + brokerMgmtObject->inc_abandonedViaAlt(); + else if (brokerMgmtObject) + brokerMgmtObject->inc_abandoned(); } void Queue::destroyed() { unbind(broker->getExchanges()); - - QueuedMessage m; - while(popAndDequeue(m)) { - DeliverableMessage msg(m.payload); - if (alternateExchange.get()) { - if (brokerMgmtObject) - brokerMgmtObject->inc_abandonedViaAlt(); - alternateExchange->routeWithAlternate(msg); - } else { - if (brokerMgmtObject) - brokerMgmtObject->inc_abandoned(); - } - } - if (alternateExchange.get()) + remove(0, 0, boost::bind(&Queue::abandoned, this, _1), REPLICATOR/*even acquired message are treated as abandoned*/); + if (alternateExchange.get()) { alternateExchange->decAlternateUsers(); + } if (store) { barrier.destroy(); @@ -1401,20 +1087,6 @@ void Queue::unbind(ExchangeRegistry& exchanges) bindings.unbind(exchanges, shared_from_this()); } -void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy) -{ - Mutex::ScopedLock locker(messageLock); - policy = _policy; - if (policy.get()) - policy->setQueue(this); -} - -const QueuePolicy* Queue::getPolicy() -{ - Mutex::ScopedLock locker(messageLock); - return policy.get(); -} - uint64_t Queue::getPersistenceId() const { return persistenceId; @@ -1434,10 +1106,7 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const void Queue::encode(Buffer& buffer) const { buffer.putShortString(name); - buffer.put(settings); - if (policy.get()) { - buffer.put(*policy); - } + buffer.put(encodableSettings); buffer.putShortString(alternateExchange.get() ? alternateExchange->getName() : std::string("")); } @@ -1445,21 +1114,19 @@ uint32_t Queue::encodedSize() const { return name.size() + 1/*short string size octet*/ + (alternateExchange.get() ? alternateExchange->getName().size() : 0) + 1 /* short string */ - + settings.encodedSize() - + (policy.get() ? (*policy).encodedSize() : 0); + + encodableSettings.encodedSize(); } Queue::shared_ptr Queue::restore( QueueRegistry& queues, Buffer& buffer ) { string name; buffer.getShortString(name); - FieldTable settings; - buffer.get(settings); + FieldTable ft; + buffer.get(ft); boost::shared_ptr<Exchange> alternate; - std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true, false, 0, alternate, settings, true); - if (result.first->policy.get() && buffer.available() >= result.first->policy->encodedSize()) { - buffer.get ( *(result.first->policy) ); - } + QueueSettings settings(true, false); + settings.populate(ft, settings.storeSettings); + std::pair<Queue::shared_ptr, bool> result = queues.declare(name, settings, alternate, true); if (buffer.available()) { string altExch; buffer.getShortString(altExch); @@ -1523,8 +1190,8 @@ struct AutoDeleteTask : qpid::sys::TimerTask void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue, const std::string& connectionId, const std::string& userId) { - if (queue->autoDeleteTimeout && queue->canAutoDelete()) { - AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC)); + if (queue->settings.autoDeleteDelay && queue->canAutoDelete()) { + AbsTime time(now(), Duration(queue->settings.autoDeleteDelay * TIME_SEC)); queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, connectionId, userId, time)); broker.getClusterTimer().add(queue->autoDeleteTask); QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated"); @@ -1543,12 +1210,15 @@ void Queue::releaseExclusiveOwnership() { Mutex::ScopedLock locker(ownershipLock); owner = 0; + if (mgmtObject) { + mgmtObject->set_exclusive(false); + } } bool Queue::setExclusiveOwner(const OwnershipToken* const o) { //reset auto deletion timer if necessary - if (autoDeleteTimeout && autoDeleteTask) { + if (settings.autoDeleteDelay && autoDeleteTask) { autoDeleteTask->cancel(); } Mutex::ScopedLock locker(ownershipLock); @@ -1556,6 +1226,9 @@ bool Queue::setExclusiveOwner(const OwnershipToken* const o) return false; } else { owner = o; + if (mgmtObject) { + mgmtObject->set_exclusive(true); + } return true; } } @@ -1687,7 +1360,7 @@ namespace { struct After { framing::SequenceNumber seq; After(framing::SequenceNumber s) : seq(s) {} - bool operator()(const QueuedMessage& qm) { return qm.position > seq; } + bool operator()(const Message& m) { return m.getSequence() > seq; } }; } // namespace @@ -1695,12 +1368,10 @@ struct After { void Queue::setPosition(SequenceNumber n) { Mutex::ScopedLock locker(messageLock); if (n < sequence) { - std::deque<QueuedMessage> dequeued; - dequeueIf(After(n), dequeued); - messages->setPosition(n); + remove(0, After(n), MessagePredicate(), BROWSER); } sequence = n; - QPID_LOG(trace, "Set position to " << sequence << " on " << getName()); + QPID_LOG(debug, "Set position to " << sequence << " on " << getName()); } SequenceNumber Queue::getPosition() { @@ -1721,25 +1392,16 @@ void Queue::recoveryComplete(ExchangeRegistry& exchanges) << "\": exchange does not exist."); } //process any pending dequeues - std::deque<QueuedMessage> pd; - { - Mutex::ScopedLock locker(messageLock); - pendingDequeues.swap(pd); + for (std::vector<Message>::iterator i = pendingDequeues.begin(); i != pendingDequeues.end(); ++i) { + dequeueFromStore(i->getPersistentContext()); } - for_each(pd.begin(), pd.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); -} - -void Queue::insertSequenceNumbers(const std::string& key) -{ - seqNoKey = key; - insertSeqNo = !seqNoKey.empty(); - QPID_LOG(debug, "Inserting sequence numbers as " << key); + pendingDequeues.clear(); } /** updates queue observers and state when a message has become available for transfer * Requires messageLock be held by caller. */ -void Queue::observeEnqueue(const QueuedMessage& m, const qpid::sys::Mutex::ScopedLock&) +void Queue::observeEnqueue(const Message& m, const Mutex::ScopedLock&) { for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) { try { @@ -1748,32 +1410,7 @@ void Queue::observeEnqueue(const QueuedMessage& m, const qpid::sys::Mutex::Scope QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what()); } } -} - -void Queue::updateEnqueued(const QueuedMessage& m) -{ - if (m.payload) { - boost::intrusive_ptr<Message> payload = m.payload; - enqueue(0, payload, true); - { - Mutex::ScopedLock locker(messageLock); - messages->updateAcquired(m); - observeEnqueue(m, locker); - if (policy.get()) { - policy->recoverEnqueued(payload); - policy->enqueued(m); - } - } - mgntEnqStats(m.payload, mgmtObject, brokerMgmtObject); - } else { - QPID_LOG(warning, "Queue informed of enqueued message that has no payload"); - } -} - -bool Queue::isEnqueued(const QueuedMessage& msg) -{ - Mutex::ScopedLock locker(messageLock); - return !policy.get() || policy->isEnqueued(msg); + mgntEnqStats(m, mgmtObject, brokerMgmtObject); } // Note: accessing listeners outside of lock is dangerous. Caller must ensure the queue's @@ -1835,28 +1472,82 @@ void Queue::setDequeueSincePurge(uint32_t value) { dequeueSincePurge = value; } -namespace{ -class FindLowest +void Queue::reject(const QueueCursor& cursor) { - public: - FindLowest() : init(false) {} - void process(const QueuedMessage& message) { - QPID_LOG(debug, "FindLowest processing: " << message.position); - if (!init || message.position < lowest) lowest = message.position; - init = true; - } - bool getLowest(qpid::framing::SequenceNumber& result) { - if (init) { - result = lowest; - return true; + Exchange::shared_ptr alternate = getAlternateExchange(); + Message copy; + boost::intrusive_ptr<PersistableMessage> pmsg; + { + Mutex::ScopedLock locker(messageLock); + Message* message = messages->find(cursor); + if (message) { + if (alternate) copy = *message; + if (message->isPersistent()) pmsg = message->getPersistentContext(); + countRejected(); + observeDequeue(*message, locker); + messages->deleted(cursor); } else { - return false; + return; } } - private: - bool init; - qpid::framing::SequenceNumber lowest; -}; + if (alternate) { + copy.resetDeliveryCount(); + DeliverableMessage delivery(copy, 0); + alternate->routeWithAlternate(delivery); + QPID_LOG(info, "Routed rejected message from " << getName() << " to " + << alternate->getName()); + } else { + //just drop it + QPID_LOG(info, "Dropping rejected message from " << getName()); + } + dequeueFromStore(pmsg); +} + +bool Queue::checkDepth(const QueueDepth& increment, const Message&) +{ + if (current && (settings.maxDepth - current < increment)) { + if (mgmtObject) { + mgmtObject->inc_discardsOverflow(); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsOverflow(); + } + throw ResourceLimitExceededException(QPID_MSG("Maximum depth exceeded on " << name << ": current=[" << current << "], max=[" << settings.maxDepth << "]")); + } else { + current += increment; + return true; + } +} + +bool Queue::seek(QueueCursor& cursor, MessagePredicate predicate) +{ + Mutex::ScopedLock locker(messageLock); + //hold lock across calls to predicate, or take copy of message? + //currently hold lock, may want to revise depending on any new use + //cases + Message* message = messages->next(cursor); + while (message && (predicate && !predicate(*message))) { + message = messages->next(cursor); + } + return message != 0; +} + +bool Queue::seek(QueueCursor& cursor, MessagePredicate predicate, qpid::framing::SequenceNumber start) +{ + Mutex::ScopedLock locker(messageLock); + //hold lock across calls to predicate, or take copy of message? + //currently hold lock, may want to revise depending on any new use + //cases + Message* message; + message = messages->find(start, &cursor); + if (message && (!predicate || predicate(*message))) return true; + + return seek(cursor, predicate); +} + +bool Queue::seek(QueueCursor& cursor, qpid::framing::SequenceNumber start) +{ + Mutex::ScopedLock locker(messageLock); + return messages->find(start, &cursor); } Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {} |