diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 146 |
1 files changed, 138 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index dd23760922..969d510e26 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -109,6 +109,7 @@ Queue::Queue(const string& _name, bool _autodelete, persistenceId(0), policyExceeded(false), mgmtObject(0), + brokerMgmtObject(0), eventMode(0), insertSeqNo(0), broker(b), @@ -123,14 +124,20 @@ Queue::Queue(const string& _name, bool _autodelete, if (agent != 0) { mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, _autodelete, _owner != 0); agent->addObject(mgmtObject, 0, store != 0); + brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject(); + if (brokerMgmtObject) + brokerMgmtObject->inc_queueCount(); } } } Queue::~Queue() { - if (mgmtObject != 0) + if (mgmtObject != 0) { mgmtObject->resourceDestroy(); + if (brokerMgmtObject) + brokerMgmtObject->dec_queueCount(); + } } bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg) @@ -204,6 +211,10 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){ if (mgmtObject != 0){ mgmtObject->inc_msgTxnEnqueues (); mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); + if (brokerMgmtObject) { + brokerMgmtObject->inc_msgTxnEnqueues (); + brokerMgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); + } } } @@ -221,7 +232,13 @@ void Queue::requeue(const QueuedMessage& msg){ if (alternateExchange.get()) { DeliverableMessage dmsg(msg.payload); alternateExchange->routeWithAlternate(dmsg); + if (brokerMgmtObject) + brokerMgmtObject->inc_abandonedViaAlt(); + } else { + if (brokerMgmtObject) + brokerMgmtObject->inc_abandoned(); } + mgntDeqStats(msg.payload); } else { messages->reinsert(msg); listeners.populate(copy); @@ -234,8 +251,8 @@ void Queue::requeue(const QueuedMessage& msg){ enqueue(0, payload); } } + observeRequeue(msg, locker); } - observeRequeue(msg, locker); } copy.notify(); } @@ -323,6 +340,11 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ c->setPosition(msg.position); acquire( msg.position, msg, locker); dequeue( 0, msg ); + if (mgmtObject) { + mgmtObject->inc_discardsTtl(); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsTtl(); + } continue; } @@ -504,6 +526,15 @@ void Queue::purgeExpired(qpid::sys::Duration lapse) messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1)); } + // + // Report the count of discarded-by-ttl messages + // + if (mgmtObject && !expired.empty()) { + mgmtObject->inc_discardsTtl(expired.size()); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsTtl(expired.size()); + } + for (std::deque<QueuedMessage>::const_iterator i = expired.begin(); i != expired.end(); ++i) { { @@ -638,6 +669,19 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> Mutex::ScopedLock locker(messageLock); messages->removeIf( boost::bind<bool>(boost::ref(c), _1) ); + + if (mgmtObject && !c.matches.empty()) { + if (dest.get()) { + mgmtObject->inc_reroutes(c.matches.size()); + if (brokerMgmtObject) + brokerMgmtObject->inc_reroutes(c.matches.size()); + } else { + mgmtObject->inc_discardsPurge(c.matches.size()); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsPurge(c.matches.size()); + } + } + for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin(); qmsg != c.matches.end(); ++qmsg) { // Update observers and message state: @@ -710,8 +754,14 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence); dequeueRequired = messages->push(qm, removed); - if (dequeueRequired) + if (dequeueRequired) { observeAcquire(removed, locker); + if (mgmtObject) { + mgmtObject->inc_discardsLvq(); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsLvq(); + } + } listeners.populate(copy); observeEnqueue(qm, locker); } @@ -799,10 +849,30 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg std::deque<QueuedMessage> dequeues; { Mutex::ScopedLock locker(messageLock); - policy->tryEnqueue(msg); + try { + policy->tryEnqueue(msg); + } catch(ResourceLimitExceededException&) { + if (mgmtObject) { + mgmtObject->inc_discardsOverflow(); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsOverflow(); + } + throw; + } policy->getPendingDequeues(dequeues); } //depending on policy, may have some dequeues that need to performed without holding the lock + + // + // Count the dequeues as ring-discards. We know that these aren't rejects because + // policy->tryEnqueue would have thrown an exception. + // + if (mgmtObject && !dequeues.empty()) { + mgmtObject->inc_discardsRing(dequeues.size()); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsRing(dequeues.size()); + } + for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); } @@ -871,6 +941,10 @@ void Queue::dequeueCommitted(const QueuedMessage& msg) if (mgmtObject != 0) { mgmtObject->inc_msgTxnDequeues(); mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize()); + if (brokerMgmtObject) { + brokerMgmtObject->inc_msgTxnDequeues(); + brokerMgmtObject->inc_byteTxnDequeues(msg.payload->contentSize()); + } } } @@ -893,8 +967,8 @@ void Queue::popAndDequeue(const Mutex::ScopedLock& held) */ void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) { - if (policy.get()) policy->dequeued(msg); mgntDeqStats(msg.payload); + if (policy.get()) policy->dequeued(msg); for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ (*i)->dequeued(msg); @@ -909,6 +983,12 @@ void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) */ void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&) { + if (mgmtObject) { + mgmtObject->inc_acquires(); + if (brokerMgmtObject) + brokerMgmtObject->inc_acquires(); + } + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ (*i)->acquired(msg); @@ -923,6 +1003,12 @@ void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&) */ void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) { + if (mgmtObject) { + mgmtObject->inc_releases(); + if (brokerMgmtObject) + brokerMgmtObject->inc_releases(); + } + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ (*i)->requeued(msg); @@ -1079,14 +1165,22 @@ void Queue::configureImpl(const FieldTable& _settings) void Queue::destroyed() { unbind(broker->getExchanges()); - if (alternateExchange.get()) { + { Mutex::ScopedLock locker(messageLock); while(!messages->empty()){ DeliverableMessage msg(messages->front().payload); - alternateExchange->routeWithAlternate(msg); + if (alternateExchange.get()) { + if (brokerMgmtObject) + brokerMgmtObject->inc_abandonedViaAlt(); + alternateExchange->routeWithAlternate(msg); + } else { + if (brokerMgmtObject) + brokerMgmtObject->inc_abandoned(); + } popAndDequeue(locker); } - alternateExchange->decAlternateUsers(); + if (alternateExchange.get()) + alternateExchange->decAlternateUsers(); } if (store) { @@ -1124,6 +1218,8 @@ void Queue::unbind(ExchangeRegistry& exchanges) void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy) { policy = _policy; + if (policy.get()) + policy->setQueue(this); } const QueuePolicy* Queue::getPolicy() @@ -1291,6 +1387,40 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) { } } +void Queue::countRejected() const +{ + if (mgmtObject) { + mgmtObject->inc_discardsSubscriber(); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsSubscriber(); + } +} + +void Queue::countFlowedToDisk(uint64_t size) const +{ + if (mgmtObject) { + mgmtObject->inc_msgFtdEnqueues(); + mgmtObject->inc_byteFtdEnqueues(size); + if (brokerMgmtObject) { + brokerMgmtObject->inc_msgFtdEnqueues(); + brokerMgmtObject->inc_byteFtdEnqueues(size); + } + } +} + +void Queue::countLoadedFromDisk(uint64_t size) const +{ + if (mgmtObject) { + mgmtObject->inc_msgFtdDequeues(); + mgmtObject->inc_byteFtdDequeues(size); + if (brokerMgmtObject) { + brokerMgmtObject->inc_msgFtdDequeues(); + brokerMgmtObject->inc_byteFtdDequeues(size); + } + } +} + + ManagementObject* Queue::GetManagementObject (void) const { return (ManagementObject*) mgmtObject; |