diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 140 |
1 files changed, 75 insertions, 65 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 789ad581f5..bacc612e11 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -65,7 +65,7 @@ using std::mem_fun; namespace _qmf = qmf::org::apache::qpid::broker; -namespace +namespace { const std::string qpidMaxSize("qpid.max_size"); const std::string qpidMaxCount("qpid.max_count"); @@ -87,16 +87,16 @@ const int ENQUEUE_ONLY=1; const int ENQUEUE_AND_DEQUEUE=2; } -Queue::Queue(const string& _name, bool _autodelete, +Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, const OwnershipToken* const _owner, Manageable* parent, Broker* b) : - name(_name), + name(_name), autodelete(_autodelete), store(_store), - owner(_owner), + owner(_owner), consumerCount(0), exclusive(0), noLocal(false), @@ -179,9 +179,9 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ if (policy.get()) policy->recoverEnqueued(msg); push(msg, true); - if (store){ + if (store){ // setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure - msg->addToSyncList(shared_from_this(), store); + msg->addToSyncList(shared_from_this(), store); } if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) { @@ -206,13 +206,13 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){ void Queue::requeue(const QueuedMessage& msg){ assertClusterSafe(); QueueListeners::NotificationSet copy; - { + { Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return; messages->reinsert(msg); listeners.populate(copy); - // for persistLastNode - don't force a message twice to disk, but force it if no force before + // for persistLastNode - don't force a message twice to disk, but force it if no force before if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) { msg.payload->forcePersistent(); if (msg.payload->isForcedPersistent() ){ @@ -224,7 +224,7 @@ void Queue::requeue(const QueuedMessage& msg){ copy.notify(); } -bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) +bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) { Mutex::ScopedLock locker(messageLock); assertClusterSafe(); @@ -268,7 +268,7 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) case NO_MESSAGES: default: return false; - } + } } else { return browseNextMessage(m, c); } @@ -278,7 +278,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ { while (true) { Mutex::ScopedLock locker(messageLock); - if (messages->empty()) { + if (messages->empty()) { QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); listeners.addListener(c); return NO_MESSAGES; @@ -291,7 +291,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ } if (c->filter(msg.payload)) { - if (c->accept(msg.payload)) { + if (c->accept(msg.payload)) { m = msg; pop(); return CONSUMED; @@ -304,7 +304,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ //consumer will never want this message QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); return CANT_CONSUME; - } + } } } } @@ -358,7 +358,7 @@ bool Queue::dispatch(Consumer::shared_ptr c) } } -// Find the next message +// Find the next message bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) { Mutex::ScopedLock locker(messageLock); if (messages->next(c->position, msg)) { @@ -426,19 +426,25 @@ bool collect_if_expired(std::deque<QueuedMessage>& expired, QueuedMessage& messa } } -void Queue::purgeExpired() +/** + *@param lapse: time since the last purgeExpired + */ +void Queue::purgeExpired(sys::Duration lapse) { //As expired messages are discarded during dequeue also, only //bother explicitly expiring if the rate of dequeues since last - //attempt is less than one per second. - - if (dequeueTracker.sampleRatePerSecond() < 1) { + //attempt is less than one per second. + int count = dequeueSincePurge.get(); + dequeueSincePurge -= count; + int seconds = int64_t(lapse)/sys::TIME_SEC; + if (seconds == 0 || count / seconds < 1) { std::deque<QueuedMessage> expired; { Mutex::ScopedLock locker(messageLock); messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1)); } - for_each(expired.begin(), expired.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); + for_each(expired.begin(), expired.end(), + boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); } } @@ -457,7 +463,7 @@ void Queue::purgeExpired() uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest) { Mutex::ScopedLock locker(messageLock); - uint32_t purge_count = purge_request; // only comes into play if >0 + uint32_t purge_count = purge_request; // only comes into play if >0 std::deque<DeliverableMessage> rerouteQueue; uint32_t count = 0; @@ -490,7 +496,7 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) { Mutex::ScopedLock locker(messageLock); - uint32_t move_count = qty; // only comes into play if qty >0 + uint32_t move_count = qty; // only comes into play if qty >0 uint32_t count = 0; // count how many were moved for returning while((!qty || move_count--) && !messages->empty()) { @@ -508,7 +514,7 @@ void Queue::pop() { assertClusterSafe(); messages->pop(); - ++dequeueTracker; + ++dequeueSincePurge; } void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ @@ -517,10 +523,10 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ QueuedMessage removed; bool dequeueRequired = false; { - Mutex::ScopedLock locker(messageLock); + Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); - + dequeueRequired = messages->push(qm, removed); listeners.populate(copy); enqueued(qm); @@ -599,7 +605,7 @@ void Queue::setLastNodeFailure() } -// return true if store exists, +// return true if store exists, bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck) { ScopedUse u(barrier); @@ -613,13 +619,13 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg policy->getPendingDequeues(dequeues); } //depending on policy, may have some dequeues that need to performed without holding the lock - for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); + for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); } if (inLastNodeFailure && persistLastNode){ msg->forcePersistent(); } - + if (traceId.size()) { //copy on write: take deep copy of message before modifying it //as the frames may already be available for delivery on other @@ -649,10 +655,10 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg) { Mutex::ScopedLock locker(messageLock); - if (policy.get()) policy->enqueueAborted(msg); + if (policy.get()) policy->enqueueAborted(msg); } -// return true if store exists, +// return true if store exists, bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { ScopedUse u(barrier); @@ -661,7 +667,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return false; - if (!ctxt) { + if (!ctxt) { dequeued(msg); } } @@ -682,7 +688,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) void Queue::dequeueCommitted(const QueuedMessage& msg) { Mutex::ScopedLock locker(messageLock); - dequeued(msg); + dequeued(msg); if (mgmtObject != 0) { mgmtObject->inc_msgTxnDequeues(); mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize()); @@ -737,8 +743,8 @@ int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::stri return v->get<int>(); } else if (v->convertsTo<std::string>()){ std::string s = v->get<std::string>(); - try { - return boost::lexical_cast<int>(s); + try { + return boost::lexical_cast<int>(s); } catch(const boost::bad_lexical_cast&) { QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s); return 0; @@ -762,7 +768,7 @@ void Queue::configureImpl(const FieldTable& _settings) broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY); } - if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && + if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && (!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) { if ( NullMessageStore::isNullStore(store)) { QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName()); @@ -800,7 +806,7 @@ void Queue::configureImpl(const FieldTable& _settings) QPID_LOG(debug, "Configured queue " << getName() << " as priority queue."); } } - + persistLastNode= _settings.get(qpidPersistLastNode); if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName()); @@ -809,15 +815,15 @@ void Queue::configureImpl(const FieldTable& _settings) if (excludeList.size()) { split(traceExclude, excludeList, ", "); } - QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId + QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements"); FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers); if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>()); autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout); - if (autoDeleteTimeout) - QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout); + if (autoDeleteTimeout) + QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout); if (mgmtObject != 0) { mgmtObject->set_arguments(ManagementAgent::toMap(_settings)); @@ -881,9 +887,9 @@ const QueuePolicy* Queue::getPolicy() return policy.get(); } -uint64_t Queue::getPersistenceId() const -{ - return persistenceId; +uint64_t Queue::getPersistenceId() const +{ + return persistenceId; } void Queue::setPersistenceId(uint64_t _persistenceId) const @@ -897,11 +903,11 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const persistenceId = _persistenceId; } -void Queue::encode(Buffer& buffer) const +void Queue::encode(Buffer& buffer) const { buffer.putShortString(name); buffer.put(settings); - if (policy.get()) { + if (policy.get()) { buffer.put(*policy); } buffer.putShortString(alternateExchange.get() ? alternateExchange->getName() : std::string("")); @@ -954,7 +960,7 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange() void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue) { - if (broker.getQueues().destroyIf(queue->getName(), + if (broker.getQueues().destroyIf(queue->getName(), boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) { QPID_LOG(debug, "Auto-deleting " << queue->getName()); queue->destroyed(); @@ -966,7 +972,7 @@ struct AutoDeleteTask : qpid::sys::TimerTask Broker& broker; Queue::shared_ptr queue; - AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime) + AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime) : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {} void fire() @@ -984,27 +990,27 @@ void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue) if (queue->autoDeleteTimeout && queue->canAutoDelete()) { AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC)); queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time)); - broker.getClusterTimer().add(queue->autoDeleteTask); + broker.getClusterTimer().add(queue->autoDeleteTask); QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated"); } else { tryAutoDeleteImpl(broker, queue); } } -bool Queue::isExclusiveOwner(const OwnershipToken* const o) const -{ +bool Queue::isExclusiveOwner(const OwnershipToken* const o) const +{ Mutex::ScopedLock locker(ownershipLock); - return o == owner; + return o == owner; } -void Queue::releaseExclusiveOwnership() -{ +void Queue::releaseExclusiveOwnership() +{ Mutex::ScopedLock locker(ownershipLock); - owner = 0; + owner = 0; } -bool Queue::setExclusiveOwner(const OwnershipToken* const o) -{ +bool Queue::setExclusiveOwner(const OwnershipToken* const o) +{ //reset auto deletion timer if necessary if (autoDeleteTimeout && autoDeleteTask) { autoDeleteTask->cancel(); @@ -1013,25 +1019,25 @@ bool Queue::setExclusiveOwner(const OwnershipToken* const o) if (owner) { return false; } else { - owner = o; + owner = o; return true; } } -bool Queue::hasExclusiveOwner() const -{ +bool Queue::hasExclusiveOwner() const +{ Mutex::ScopedLock locker(ownershipLock); - return owner != 0; + return owner != 0; } -bool Queue::hasExclusiveConsumer() const -{ - return exclusive; +bool Queue::hasExclusiveConsumer() const +{ + return exclusive; } void Queue::setExternalQueueStore(ExternalQueueStore* inst) { - if (externalQueueStore!=inst && externalQueueStore) - delete externalQueueStore; + if (externalQueueStore!=inst && externalQueueStore) + delete externalQueueStore; externalQueueStore = inst; if (inst) { @@ -1197,6 +1203,10 @@ const Broker* Queue::getBroker() return broker; } +void Queue::setDequeueSincePurge(uint32_t value) { + dequeueSincePurge = AtomicValue<uint32_t>(value); +} + Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {} |