summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp146
1 files changed, 138 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index dd23760922..969d510e26 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -109,6 +109,7 @@ Queue::Queue(const string& _name, bool _autodelete,
persistenceId(0),
policyExceeded(false),
mgmtObject(0),
+ brokerMgmtObject(0),
eventMode(0),
insertSeqNo(0),
broker(b),
@@ -123,14 +124,20 @@ Queue::Queue(const string& _name, bool _autodelete,
if (agent != 0) {
mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, _autodelete, _owner != 0);
agent->addObject(mgmtObject, 0, store != 0);
+ brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_queueCount();
}
}
}
Queue::~Queue()
{
- if (mgmtObject != 0)
+ if (mgmtObject != 0) {
mgmtObject->resourceDestroy();
+ if (brokerMgmtObject)
+ brokerMgmtObject->dec_queueCount();
+ }
}
bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg)
@@ -204,6 +211,10 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){
if (mgmtObject != 0){
mgmtObject->inc_msgTxnEnqueues ();
mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
+ if (brokerMgmtObject) {
+ brokerMgmtObject->inc_msgTxnEnqueues ();
+ brokerMgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
+ }
}
}
@@ -221,7 +232,13 @@ void Queue::requeue(const QueuedMessage& msg){
if (alternateExchange.get()) {
DeliverableMessage dmsg(msg.payload);
alternateExchange->routeWithAlternate(dmsg);
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_abandonedViaAlt();
+ } else {
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_abandoned();
}
+ mgntDeqStats(msg.payload);
} else {
messages->reinsert(msg);
listeners.populate(copy);
@@ -234,8 +251,8 @@ void Queue::requeue(const QueuedMessage& msg){
enqueue(0, payload);
}
}
+ observeRequeue(msg, locker);
}
- observeRequeue(msg, locker);
}
copy.notify();
}
@@ -323,6 +340,11 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
c->setPosition(msg.position);
acquire( msg.position, msg, locker);
dequeue( 0, msg );
+ if (mgmtObject) {
+ mgmtObject->inc_discardsTtl();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsTtl();
+ }
continue;
}
@@ -504,6 +526,15 @@ void Queue::purgeExpired(qpid::sys::Duration lapse)
messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1));
}
+ //
+ // Report the count of discarded-by-ttl messages
+ //
+ if (mgmtObject && !expired.empty()) {
+ mgmtObject->inc_discardsTtl(expired.size());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsTtl(expired.size());
+ }
+
for (std::deque<QueuedMessage>::const_iterator i = expired.begin();
i != expired.end(); ++i) {
{
@@ -638,6 +669,19 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange>
Mutex::ScopedLock locker(messageLock);
messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
+
+ if (mgmtObject && !c.matches.empty()) {
+ if (dest.get()) {
+ mgmtObject->inc_reroutes(c.matches.size());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_reroutes(c.matches.size());
+ } else {
+ mgmtObject->inc_discardsPurge(c.matches.size());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsPurge(c.matches.size());
+ }
+ }
+
for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
qmsg != c.matches.end(); ++qmsg) {
// Update observers and message state:
@@ -710,8 +754,14 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence);
dequeueRequired = messages->push(qm, removed);
- if (dequeueRequired)
+ if (dequeueRequired) {
observeAcquire(removed, locker);
+ if (mgmtObject) {
+ mgmtObject->inc_discardsLvq();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsLvq();
+ }
+ }
listeners.populate(copy);
observeEnqueue(qm, locker);
}
@@ -799,10 +849,30 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg
std::deque<QueuedMessage> dequeues;
{
Mutex::ScopedLock locker(messageLock);
- policy->tryEnqueue(msg);
+ try {
+ policy->tryEnqueue(msg);
+ } catch(ResourceLimitExceededException&) {
+ if (mgmtObject) {
+ mgmtObject->inc_discardsOverflow();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsOverflow();
+ }
+ throw;
+ }
policy->getPendingDequeues(dequeues);
}
//depending on policy, may have some dequeues that need to performed without holding the lock
+
+ //
+ // Count the dequeues as ring-discards. We know that these aren't rejects because
+ // policy->tryEnqueue would have thrown an exception.
+ //
+ if (mgmtObject && !dequeues.empty()) {
+ mgmtObject->inc_discardsRing(dequeues.size());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsRing(dequeues.size());
+ }
+
for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
@@ -871,6 +941,10 @@ void Queue::dequeueCommitted(const QueuedMessage& msg)
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
+ if (brokerMgmtObject) {
+ brokerMgmtObject->inc_msgTxnDequeues();
+ brokerMgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
+ }
}
}
@@ -893,8 +967,8 @@ void Queue::popAndDequeue(const Mutex::ScopedLock& held)
*/
void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
{
- if (policy.get()) policy->dequeued(msg);
mgntDeqStats(msg.payload);
+ if (policy.get()) policy->dequeued(msg);
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
(*i)->dequeued(msg);
@@ -909,6 +983,12 @@ void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
*/
void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&)
{
+ if (mgmtObject) {
+ mgmtObject->inc_acquires();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_acquires();
+ }
+
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
(*i)->acquired(msg);
@@ -923,6 +1003,12 @@ void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&)
*/
void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
{
+ if (mgmtObject) {
+ mgmtObject->inc_releases();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_releases();
+ }
+
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
(*i)->requeued(msg);
@@ -1079,14 +1165,22 @@ void Queue::configureImpl(const FieldTable& _settings)
void Queue::destroyed()
{
unbind(broker->getExchanges());
- if (alternateExchange.get()) {
+ {
Mutex::ScopedLock locker(messageLock);
while(!messages->empty()){
DeliverableMessage msg(messages->front().payload);
- alternateExchange->routeWithAlternate(msg);
+ if (alternateExchange.get()) {
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_abandonedViaAlt();
+ alternateExchange->routeWithAlternate(msg);
+ } else {
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_abandoned();
+ }
popAndDequeue(locker);
}
- alternateExchange->decAlternateUsers();
+ if (alternateExchange.get())
+ alternateExchange->decAlternateUsers();
}
if (store) {
@@ -1124,6 +1218,8 @@ void Queue::unbind(ExchangeRegistry& exchanges)
void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
{
policy = _policy;
+ if (policy.get())
+ policy->setQueue(this);
}
const QueuePolicy* Queue::getPolicy()
@@ -1291,6 +1387,40 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
}
}
+void Queue::countRejected() const
+{
+ if (mgmtObject) {
+ mgmtObject->inc_discardsSubscriber();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsSubscriber();
+ }
+}
+
+void Queue::countFlowedToDisk(uint64_t size) const
+{
+ if (mgmtObject) {
+ mgmtObject->inc_msgFtdEnqueues();
+ mgmtObject->inc_byteFtdEnqueues(size);
+ if (brokerMgmtObject) {
+ brokerMgmtObject->inc_msgFtdEnqueues();
+ brokerMgmtObject->inc_byteFtdEnqueues(size);
+ }
+ }
+}
+
+void Queue::countLoadedFromDisk(uint64_t size) const
+{
+ if (mgmtObject) {
+ mgmtObject->inc_msgFtdDequeues();
+ mgmtObject->inc_byteFtdDequeues(size);
+ if (brokerMgmtObject) {
+ brokerMgmtObject->inc_msgFtdDequeues();
+ brokerMgmtObject->inc_byteFtdDequeues(size);
+ }
+ }
+}
+
+
ManagementObject* Queue::GetManagementObject (void) const
{
return (ManagementObject*) mgmtObject;