summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp67
1 files changed, 41 insertions, 26 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 43d1a2b27c..d5dc3e85f1 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -31,6 +31,7 @@
#include "qpid/broker/MessageStore.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/ThresholdAlerts.h"
#include "qpid/StringUtils.h"
#include "qpid/log/Statement.h"
@@ -104,7 +105,6 @@ Queue::Queue(const string& _name, bool _autodelete,
policyExceeded(false),
mgmtObject(0),
eventMode(0),
- eventMgr(0),
insertSeqNo(0),
broker(b),
deleted(false),
@@ -168,7 +168,6 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){
}else {
push(msg);
}
- mgntEnqStats(msg);
QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
}
}
@@ -187,7 +186,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
msg->addToSyncList(shared_from_this(), store);
}
msg->enqueueComplete(); // mark the message as enqueued
- mgntEnqStats(msg);
if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) {
//content has not been loaded, need to ensure that lazy loading mode is set:
@@ -202,7 +200,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
void Queue::process(boost::intrusive_ptr<Message>& msg){
push(msg);
- mgntEnqStats(msg);
if (mgmtObject != 0){
mgmtObject->inc_msgTxnEnqueues ();
mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
@@ -527,14 +524,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
dequeueRequired = messages->push(qm, removed);
listeners.populate(copy);
-
- if (eventMode) {
- if (eventMgr) eventMgr->enqueued(qm);
- else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName());
- }
- if (policy.get()) {
- policy->enqueued(qm);
- }
+ enqueued(qm);
}
copy.notify();
if (dequeueRequired) {
@@ -717,8 +707,12 @@ void Queue::dequeued(const QueuedMessage& msg)
{
if (policy.get()) policy->dequeued(msg);
mgntDeqStats(msg.payload);
- if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) {
- eventMgr->dequeued(msg);
+ for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+ try{
+ (*i)->dequeued(msg);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on notification of dequeue for queue " << getName() << ": " << e.what());
+ }
}
}
@@ -736,12 +730,15 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
{
eventMode = _settings.getAsInt(qpidQueueEventGeneration);
+ if (eventMode && broker) {
+ broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY);
+ }
if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
- (!store || NullMessageStore::isNullStore(store) || (eventMode && eventMgr && !eventMgr->isSync()) )) {
+ (!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) {
if ( NullMessageStore::isNullStore(store)) {
QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName());
- } else if (eventMgr && !eventMgr->isSync() ) {
+ } else if (broker && !(broker->getQueueEvents().isSync()) ) {
QPID_LOG(warning, "Flow to disk not valid with async Queue Events:" << getName());
}
FieldTable copy(_settings);
@@ -750,6 +747,10 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
} else {
setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings));
}
+ if (broker && broker->getManagementAgent()) {
+ ThresholdAlerts::observe(*this, *(broker->getManagementAgent()), _settings);
+ }
+
//set this regardless of owner to allow use of no-local with exclusive consumers also
noLocal = _settings.get(qpidNoLocal);
QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal);
@@ -1027,11 +1028,6 @@ SequenceNumber Queue::getPosition() {
int Queue::getEventMode() { return eventMode; }
-void Queue::setQueueEventManager(QueueEvents& mgr)
-{
- eventMgr = &mgr;
-}
-
void Queue::recoveryComplete(ExchangeRegistry& exchanges)
{
// set the alternate exchange
@@ -1057,14 +1053,28 @@ void Queue::insertSequenceNumbers(const std::string& key)
void Queue::enqueued(const QueuedMessage& m)
{
- if (m.payload) {
- if (policy.get()) {
- policy->recoverEnqueued(m.payload);
- policy->enqueued(m);
+ for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
+ try {
+ (*i)->enqueued(m);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what());
}
- mgntEnqStats(m.payload);
+ }
+ if (policy.get()) {
+ policy->enqueued(m);
+ }
+ mgntEnqStats(m.payload);
+}
+
+void Queue::updateEnqueued(const QueuedMessage& m)
+{
+ if (m.payload) {
boost::intrusive_ptr<Message> payload = m.payload;
enqueue ( 0, payload, true );
+ if (policy.get()) {
+ policy->recoverEnqueued(payload);
+ }
+ enqueued(m);
} else {
QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
}
@@ -1086,6 +1096,11 @@ void Queue::checkNotDeleted()
}
}
+void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
+{
+ observers.insert(observer);
+}
+
void Queue::flush()
{
ScopedUse u(barrier);