diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 67 |
1 files changed, 41 insertions, 26 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 43d1a2b27c..d5dc3e85f1 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -31,6 +31,7 @@ #include "qpid/broker/MessageStore.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/ThresholdAlerts.h" #include "qpid/StringUtils.h" #include "qpid/log/Statement.h" @@ -104,7 +105,6 @@ Queue::Queue(const string& _name, bool _autodelete, policyExceeded(false), mgmtObject(0), eventMode(0), - eventMgr(0), insertSeqNo(0), broker(b), deleted(false), @@ -168,7 +168,6 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){ }else { push(msg); } - mgntEnqStats(msg); QPID_LOG(debug, "Message " << msg << " enqueued on " << name); } } @@ -187,7 +186,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ msg->addToSyncList(shared_from_this(), store); } msg->enqueueComplete(); // mark the message as enqueued - mgntEnqStats(msg); if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) { //content has not been loaded, need to ensure that lazy loading mode is set: @@ -202,7 +200,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ void Queue::process(boost::intrusive_ptr<Message>& msg){ push(msg); - mgntEnqStats(msg); if (mgmtObject != 0){ mgmtObject->inc_msgTxnEnqueues (); mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); @@ -527,14 +524,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ dequeueRequired = messages->push(qm, removed); listeners.populate(copy); - - if (eventMode) { - if (eventMgr) eventMgr->enqueued(qm); - else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName()); - } - if (policy.get()) { - policy->enqueued(qm); - } + enqueued(qm); } copy.notify(); if (dequeueRequired) { @@ -717,8 +707,12 @@ void Queue::dequeued(const QueuedMessage& msg) { if (policy.get()) policy->dequeued(msg); mgntDeqStats(msg.payload); - if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) { - eventMgr->dequeued(msg); + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + (*i)->dequeued(msg); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of dequeue for queue " << getName() << ": " << e.what()); + } } } @@ -736,12 +730,15 @@ void Queue::configure(const FieldTable& _settings, bool recovering) { eventMode = _settings.getAsInt(qpidQueueEventGeneration); + if (eventMode && broker) { + broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY); + } if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && - (!store || NullMessageStore::isNullStore(store) || (eventMode && eventMgr && !eventMgr->isSync()) )) { + (!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) { if ( NullMessageStore::isNullStore(store)) { QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName()); - } else if (eventMgr && !eventMgr->isSync() ) { + } else if (broker && !(broker->getQueueEvents().isSync()) ) { QPID_LOG(warning, "Flow to disk not valid with async Queue Events:" << getName()); } FieldTable copy(_settings); @@ -750,6 +747,10 @@ void Queue::configure(const FieldTable& _settings, bool recovering) } else { setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings)); } + if (broker && broker->getManagementAgent()) { + ThresholdAlerts::observe(*this, *(broker->getManagementAgent()), _settings); + } + //set this regardless of owner to allow use of no-local with exclusive consumers also noLocal = _settings.get(qpidNoLocal); QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal); @@ -1027,11 +1028,6 @@ SequenceNumber Queue::getPosition() { int Queue::getEventMode() { return eventMode; } -void Queue::setQueueEventManager(QueueEvents& mgr) -{ - eventMgr = &mgr; -} - void Queue::recoveryComplete(ExchangeRegistry& exchanges) { // set the alternate exchange @@ -1057,14 +1053,28 @@ void Queue::insertSequenceNumbers(const std::string& key) void Queue::enqueued(const QueuedMessage& m) { - if (m.payload) { - if (policy.get()) { - policy->recoverEnqueued(m.payload); - policy->enqueued(m); + for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) { + try { + (*i)->enqueued(m); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what()); } - mgntEnqStats(m.payload); + } + if (policy.get()) { + policy->enqueued(m); + } + mgntEnqStats(m.payload); +} + +void Queue::updateEnqueued(const QueuedMessage& m) +{ + if (m.payload) { boost::intrusive_ptr<Message> payload = m.payload; enqueue ( 0, payload, true ); + if (policy.get()) { + policy->recoverEnqueued(payload); + } + enqueued(m); } else { QPID_LOG(warning, "Queue informed of enqueued message that has no payload"); } @@ -1086,6 +1096,11 @@ void Queue::checkNotDeleted() } } +void Queue::addObserver(boost::shared_ptr<QueueObserver> observer) +{ + observers.insert(observer); +} + void Queue::flush() { ScopedUse u(barrier); |