summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2011-02-15 14:17:45 +0000
committerGordon Sim <gsim@apache.org>2011-02-15 14:17:45 +0000
commit6a87cd223a386526ccca0a1ba6f0bdd1c4320ba1 (patch)
tree15ca8c59c1a9a6786ab08690d7acf6320c1e1dc2
parent87381a0bae2b890688db1597d2d6a144935498a0 (diff)
downloadqpid-python-6a87cd223a386526ccca0a1ba6f0bdd1c4320ba1.tar.gz
QPID-3002: Configurable threshold alerts for queues
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1070913 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/CMakeLists.txt1
-rw-r--r--cpp/src/Makefile.am3
-rw-r--r--cpp/src/qpid/broker/Broker.cpp1
-rw-r--r--cpp/src/qpid/broker/Queue.cpp67
-rw-r--r--cpp/src/qpid/broker/Queue.h12
-rw-r--r--cpp/src/qpid/broker/QueueEvents.cpp25
-rw-r--r--cpp/src/qpid/broker/QueueEvents.h1
-rw-r--r--cpp/src/qpid/broker/QueueObserver.h82
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.cpp6
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.h2
-rw-r--r--cpp/src/qpid/broker/ThresholdAlerts.cpp139
-rw-r--r--cpp/src/qpid/broker/ThresholdAlerts.h146
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp2
13 files changed, 447 insertions, 40 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index a709056899..71dce075a2 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -1011,6 +1011,7 @@ set (qpidbroker_SOURCES
qpid/broker/SessionHandler.h
qpid/broker/SessionHandler.cpp
qpid/broker/System.cpp
+ qpid/broker/ThresholdAlerts.cpp
qpid/broker/TopicExchange.cpp
qpid/broker/TxAccept.cpp
qpid/broker/TxBuffer.cpp
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index fc93653017..4e398ddc5e 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -603,6 +603,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/QueueEvents.h \
qpid/broker/QueueListeners.cpp \
qpid/broker/QueueListeners.h \
+ qpid/broker/QueueObserver.h \
qpid/broker/QueuePolicy.cpp \
qpid/broker/QueuePolicy.h \
qpid/broker/QueueRegistry.cpp \
@@ -649,6 +650,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/SignalHandler.h \
qpid/broker/System.cpp \
qpid/broker/System.h \
+ qpid/broker/ThresholdAlerts.cpp \
+ qpid/broker/ThresholdAlerts.h \
qpid/broker/TopicExchange.cpp \
qpid/broker/TopicExchange.h \
qpid/broker/TransactionalStore.h \
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index cb301916c2..3c692fc368 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -221,7 +221,6 @@ Broker::Broker(const Broker::Options& conf) :
}
QueuePolicy::setDefaultMaxSize(conf.queueLimit);
- queues.setQueueEvents(&queueEvents);
// Early-Initialize plugins
Plugin::earlyInitAll(*this);
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 43d1a2b27c..d5dc3e85f1 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -31,6 +31,7 @@
#include "qpid/broker/MessageStore.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/ThresholdAlerts.h"
#include "qpid/StringUtils.h"
#include "qpid/log/Statement.h"
@@ -104,7 +105,6 @@ Queue::Queue(const string& _name, bool _autodelete,
policyExceeded(false),
mgmtObject(0),
eventMode(0),
- eventMgr(0),
insertSeqNo(0),
broker(b),
deleted(false),
@@ -168,7 +168,6 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){
}else {
push(msg);
}
- mgntEnqStats(msg);
QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
}
}
@@ -187,7 +186,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
msg->addToSyncList(shared_from_this(), store);
}
msg->enqueueComplete(); // mark the message as enqueued
- mgntEnqStats(msg);
if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) {
//content has not been loaded, need to ensure that lazy loading mode is set:
@@ -202,7 +200,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
void Queue::process(boost::intrusive_ptr<Message>& msg){
push(msg);
- mgntEnqStats(msg);
if (mgmtObject != 0){
mgmtObject->inc_msgTxnEnqueues ();
mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
@@ -527,14 +524,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
dequeueRequired = messages->push(qm, removed);
listeners.populate(copy);
-
- if (eventMode) {
- if (eventMgr) eventMgr->enqueued(qm);
- else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName());
- }
- if (policy.get()) {
- policy->enqueued(qm);
- }
+ enqueued(qm);
}
copy.notify();
if (dequeueRequired) {
@@ -717,8 +707,12 @@ void Queue::dequeued(const QueuedMessage& msg)
{
if (policy.get()) policy->dequeued(msg);
mgntDeqStats(msg.payload);
- if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) {
- eventMgr->dequeued(msg);
+ for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+ try{
+ (*i)->dequeued(msg);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on notification of dequeue for queue " << getName() << ": " << e.what());
+ }
}
}
@@ -736,12 +730,15 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
{
eventMode = _settings.getAsInt(qpidQueueEventGeneration);
+ if (eventMode && broker) {
+ broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY);
+ }
if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
- (!store || NullMessageStore::isNullStore(store) || (eventMode && eventMgr && !eventMgr->isSync()) )) {
+ (!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) {
if ( NullMessageStore::isNullStore(store)) {
QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName());
- } else if (eventMgr && !eventMgr->isSync() ) {
+ } else if (broker && !(broker->getQueueEvents().isSync()) ) {
QPID_LOG(warning, "Flow to disk not valid with async Queue Events:" << getName());
}
FieldTable copy(_settings);
@@ -750,6 +747,10 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
} else {
setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings));
}
+ if (broker && broker->getManagementAgent()) {
+ ThresholdAlerts::observe(*this, *(broker->getManagementAgent()), _settings);
+ }
+
//set this regardless of owner to allow use of no-local with exclusive consumers also
noLocal = _settings.get(qpidNoLocal);
QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal);
@@ -1027,11 +1028,6 @@ SequenceNumber Queue::getPosition() {
int Queue::getEventMode() { return eventMode; }
-void Queue::setQueueEventManager(QueueEvents& mgr)
-{
- eventMgr = &mgr;
-}
-
void Queue::recoveryComplete(ExchangeRegistry& exchanges)
{
// set the alternate exchange
@@ -1057,14 +1053,28 @@ void Queue::insertSequenceNumbers(const std::string& key)
void Queue::enqueued(const QueuedMessage& m)
{
- if (m.payload) {
- if (policy.get()) {
- policy->recoverEnqueued(m.payload);
- policy->enqueued(m);
+ for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
+ try {
+ (*i)->enqueued(m);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what());
}
- mgntEnqStats(m.payload);
+ }
+ if (policy.get()) {
+ policy->enqueued(m);
+ }
+ mgntEnqStats(m.payload);
+}
+
+void Queue::updateEnqueued(const QueuedMessage& m)
+{
+ if (m.payload) {
boost::intrusive_ptr<Message> payload = m.payload;
enqueue ( 0, payload, true );
+ if (policy.get()) {
+ policy->recoverEnqueued(payload);
+ }
+ enqueued(m);
} else {
QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
}
@@ -1086,6 +1096,11 @@ void Queue::checkNotDeleted()
}
}
+void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
+{
+ observers.insert(observer);
+}
+
void Queue::flush()
{
ScopedUse u(barrier);
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 66e4c5fa22..664b1e0f01 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -31,6 +31,7 @@
#include "qpid/broker/QueuePolicy.h"
#include "qpid/broker/QueueBindings.h"
#include "qpid/broker/QueueListeners.h"
+#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/RateTracker.h"
#include "qpid/framing/FieldTable.h"
@@ -47,6 +48,7 @@
#include <vector>
#include <memory>
#include <deque>
+#include <set>
#include <algorithm>
namespace qpid {
@@ -86,6 +88,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
~ScopedUse() { if (acquired) barrier.release(); }
};
+ typedef std::set< boost::shared_ptr<QueueObserver> > Observers;
enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
@@ -117,7 +120,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
qmf::org::apache::qpid::broker::Queue* mgmtObject;
RateTracker dequeueTracker;
int eventMode;
- QueueEvents* eventMgr;
+ Observers observers;
bool insertSeqNo;
std::string seqNoKey;
Broker* broker;
@@ -136,11 +139,13 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bool isExcluded(boost::intrusive_ptr<Message>& msg);
+ void enqueued(const QueuedMessage& msg);
void dequeued(const QueuedMessage& msg);
void pop();
void popAndDequeue();
QueuedMessage getFront();
void forcePersistent(QueuedMessage& msg);
+ int getEventMode();
inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg)
{
@@ -270,7 +275,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
* thus are still logically on the queue) - used in
* clustered broker.
*/
- void enqueued(const QueuedMessage& msg);
+ void updateEnqueued(const QueuedMessage& msg);
/**
* Test whether the specified message (identified by its
@@ -331,8 +336,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
/** return current position sequence number for the next message on the queue.
*/
QPID_BROKER_EXTERN framing::SequenceNumber getPosition();
- int getEventMode();
- void setQueueEventManager(QueueEvents&);
+ void addObserver(boost::shared_ptr<QueueObserver>);
QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key);
/**
* Notify queue that recovery has completed.
diff --git a/cpp/src/qpid/broker/QueueEvents.cpp b/cpp/src/qpid/broker/QueueEvents.cpp
index bba054b0b8..2c540ff1ad 100644
--- a/cpp/src/qpid/broker/QueueEvents.cpp
+++ b/cpp/src/qpid/broker/QueueEvents.cpp
@@ -19,6 +19,8 @@
*
*/
#include "qpid/broker/QueueEvents.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueObserver.h"
#include "qpid/Exception.h"
#include "qpid/log/Statement.h"
@@ -115,6 +117,29 @@ bool QueueEvents::isSync()
return sync;
}
+class EventGenerator : public QueueObserver
+{
+ public:
+ EventGenerator(QueueEvents& mgr, bool enqOnly) : manager(mgr), enqueueOnly(enqOnly) {}
+ void enqueued(const QueuedMessage& m)
+ {
+ manager.enqueued(m);
+ }
+ void dequeued(const QueuedMessage& m)
+ {
+ if (!enqueueOnly) manager.dequeued(m);
+ }
+ private:
+ QueueEvents& manager;
+ const bool enqueueOnly;
+};
+
+void QueueEvents::observe(Queue& queue, bool enqueueOnly)
+{
+ boost::shared_ptr<QueueObserver> observer(new EventGenerator(*this, enqueueOnly));
+ queue.addObserver(observer);
+}
+
QueueEvents::Event::Event(EventType t, const QueuedMessage& m) : type(t), msg(m) {}
diff --git a/cpp/src/qpid/broker/QueueEvents.h b/cpp/src/qpid/broker/QueueEvents.h
index c42752133e..fcddfe9092 100644
--- a/cpp/src/qpid/broker/QueueEvents.h
+++ b/cpp/src/qpid/broker/QueueEvents.h
@@ -63,6 +63,7 @@ class QueueEvents
QPID_BROKER_EXTERN void unregisterListener(const std::string& id);
void enable();
void disable();
+ void observe(Queue&, bool enqueueOnly);
//process all outstanding events
QPID_BROKER_EXTERN void shutdown();
QPID_BROKER_EXTERN bool isSync();
diff --git a/cpp/src/qpid/broker/QueueObserver.h b/cpp/src/qpid/broker/QueueObserver.h
new file mode 100644
index 0000000000..2c9354cfe7
--- /dev/null
+++ b/cpp/src/qpid/broker/QueueObserver.h
@@ -0,0 +1,82 @@
+#ifndef QPID_BROKER_QUEUEOBSERVER_H
+#define QPID_BROKER_QUEUEOBSERVER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace qpid {
+namespace broker {
+
+class QueuedMessage;
+/**
+ * Interface for notifying classes who want to act as 'observers' of a
+ * queue of particular events.
+ */
+class QueueObserver
+{
+ public:
+ virtual void enqueued(const QueuedMessage&) = 0;
+ virtual void dequeued(const QueuedMessage&) = 0;
+ private:
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_QUEUEOBSERVER_H*/
+#ifndef QPID_BROKER_QUEUEOBSERVER_H
+#define QPID_BROKER_QUEUEOBSERVER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace qpid {
+namespace broker {
+
+class QueuedMessage;
+/**
+ * Interface for notifying classes who want to act as 'observers' of a
+ * queue of particular events.
+ */
+class QueueObserver
+{
+ public:
+ virtual void enqueued(const QueuedMessage&) = 0;
+ virtual void dequeued(const QueuedMessage&) = 0;
+ private:
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_QUEUEOBSERVER_H*/
diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp
index 28b2d60cda..ea2531dae7 100644
--- a/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -47,7 +47,6 @@ QueueRegistry::declare(const string& declareName, bool durable,
Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker));
queues[name] = queue;
if (lastNode) queue->setLastNodeFailure();
- if (events) queue->setQueueEventManager(*events);
return std::pair<Queue::shared_ptr, bool>(queue, true);
} else {
@@ -108,8 +107,3 @@ void QueueRegistry::updateQueueClusterState(bool _lastNode)
}
lastNode = _lastNode;
}
-
-void QueueRegistry::setQueueEvents(QueueEvents* e)
-{
- events = e;
-}
diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h
index 66437f9665..57859fe639 100644
--- a/cpp/src/qpid/broker/QueueRegistry.h
+++ b/cpp/src/qpid/broker/QueueRegistry.h
@@ -96,8 +96,6 @@ class QueueRegistry {
*/
std::string generateName();
- void setQueueEvents(QueueEvents*);
-
/**
* Set the store to use. May only be called once.
*/
diff --git a/cpp/src/qpid/broker/ThresholdAlerts.cpp b/cpp/src/qpid/broker/ThresholdAlerts.cpp
new file mode 100644
index 0000000000..5e61ff0e2b
--- /dev/null
+++ b/cpp/src/qpid/broker/ThresholdAlerts.cpp
@@ -0,0 +1,139 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/ThresholdAlerts.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueuedMessage.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qmf/org/apache/qpid/broker/EventQueueThresholdExceeded.h"
+
+namespace qpid {
+namespace broker {
+ThresholdAlerts::ThresholdAlerts(const std::string& n,
+ qpid::management::ManagementAgent& a,
+ const uint32_t ct,
+ const uint64_t st,
+ const long repeat)
+ : name(n), agent(a), countThreshold(ct), sizeThreshold(st),
+ repeatInterval(repeat ? repeat*qpid::sys::TIME_SEC : 0),
+ count(0), size(0), lastAlert(qpid::sys::EPOCH) {}
+
+void ThresholdAlerts::enqueued(const QueuedMessage& m)
+{
+ size += m.payload->contentSize();
+ ++count;
+ if ((countThreshold && count >= countThreshold) || (sizeThreshold && size >= sizeThreshold)) {
+ if ((repeatInterval == 0 && lastAlert == qpid::sys::EPOCH)
+ || qpid::sys::Duration(lastAlert, qpid::sys::now()) > repeatInterval) {
+ agent.raiseEvent(qmf::org::apache::qpid::broker::EventQueueThresholdExceeded(name, count, size));
+ lastAlert = qpid::sys::now();
+ }
+ }
+}
+
+void ThresholdAlerts::dequeued(const QueuedMessage& m)
+{
+ size -= m.payload->contentSize();
+ --count;
+ if ((countThreshold && count < countThreshold) || (sizeThreshold && size < sizeThreshold)) {
+ lastAlert = qpid::sys::EPOCH;
+ }
+}
+
+
+
+void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent,
+ const uint64_t countThreshold,
+ const uint64_t sizeThreshold,
+ const long repeatInterval)
+{
+ if (countThreshold || sizeThreshold) {
+ boost::shared_ptr<QueueObserver> observer(
+ new ThresholdAlerts(queue.getName(), agent, countThreshold, sizeThreshold, repeatInterval)
+ );
+ queue.addObserver(observer);
+ }
+}
+
+void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent,
+ const qpid::framing::FieldTable& settings)
+
+{
+ qpid::types::Variant::Map map;
+ qpid::amqp_0_10::translate(settings, map);
+ observe(queue, agent, map);
+}
+
+template <class T>
+class Option
+{
+ public:
+ Option(const std::string& name, T d) : defaultValue(d) { names.push_back(name); }
+ void addAlias(const std::string& name) { names.push_back(name); }
+ T get(const qpid::types::Variant::Map& settings) const
+ {
+ T value(defaultValue);
+ for (std::vector<std::string>::const_iterator i = names.begin(); i != names.end(); ++i) {
+ if (get(settings, *i, value)) break;
+ }
+ return value;
+ }
+ private:
+ std::vector<std::string> names;
+ T defaultValue;
+
+ bool get(const qpid::types::Variant::Map& settings, const std::string& name, T& value) const
+ {
+ qpid::types::Variant::Map::const_iterator i = settings.find(name);
+ if (i != settings.end()) {
+ try {
+ value = (T) i->second;
+ } catch (const qpid::types::InvalidConversion&) {
+ QPID_LOG(warning, "Bad value for" << name << ": " << i->second);
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+};
+
+void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent,
+ const qpid::types::Variant::Map& settings)
+
+{
+ //Note: aliases are keys defined by java broker
+ Option<int64_t> repeatInterval("qpid.alert_repeat_gap", 60);
+ repeatInterval.addAlias("x-qpid-minimum-alert-repeat-gap");
+
+ //If no explicit threshold settings were given use 80% of any
+ //limit from the policy.
+ const QueuePolicy* policy = queue.getPolicy();
+ Option<uint32_t> countThreshold("qpid.alert_count", policy ? policy->getMaxCount()*0.8 : 0);
+ countThreshold.addAlias("x-qpid-maximum-message-count");
+ Option<uint64_t> sizeThreshold("qpid.alert_size", policy ? policy->getMaxSize()*0.8 : 0);
+ sizeThreshold.addAlias("x-qpid-maximum-message-size");
+
+ observe(queue, agent, countThreshold.get(settings), sizeThreshold.get(settings), repeatInterval.get(settings));
+}
+
+}}
diff --git a/cpp/src/qpid/broker/ThresholdAlerts.h b/cpp/src/qpid/broker/ThresholdAlerts.h
new file mode 100644
index 0000000000..7437c401a8
--- /dev/null
+++ b/cpp/src/qpid/broker/ThresholdAlerts.h
@@ -0,0 +1,146 @@
+#ifndef QPID_BROKER_THRESHOLDALERTS_H
+#define QPID_BROKER_THRESHOLDALERTS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/QueueObserver.h"
+#include "qpid/sys/Time.h"
+#include "qpid/types/Variant.h"
+#include <string>
+
+namespace qpid {
+namespace framing {
+class FieldTable;
+}
+namespace management {
+class ManagementAgent;
+}
+namespace broker {
+
+class Queue;
+/**
+ * Class to manage generation of QMF alerts when particular thresholds
+ * are breached on a queue.
+ */
+class ThresholdAlerts : public QueueObserver
+{
+ public:
+ ThresholdAlerts(const std::string& name,
+ qpid::management::ManagementAgent& agent,
+ const uint32_t countThreshold,
+ const uint64_t sizeThreshold,
+ const long repeatInterval);
+ void enqueued(const QueuedMessage&);
+ void dequeued(const QueuedMessage&);
+ static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
+ const uint64_t countThreshold,
+ const uint64_t sizeThreshold,
+ const long repeatInterval);
+ static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
+ const qpid::framing::FieldTable& settings);
+ static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
+ const qpid::types::Variant::Map& settings);
+ private:
+ const std::string name;
+ qpid::management::ManagementAgent& agent;
+ const uint32_t countThreshold;
+ const uint64_t sizeThreshold;
+ const qpid::sys::Duration repeatInterval;
+ uint64_t count;
+ uint64_t size;
+ qpid::sys::AbsTime lastAlert;
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_THRESHOLDALERTS_H*/
+#ifndef QPID_BROKER_THRESHOLDALERTS_H
+#define QPID_BROKER_THRESHOLDALERTS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/QueueObserver.h"
+#include "qpid/sys/Time.h"
+#include "qpid/types/Variant.h"
+#include <string>
+
+namespace qpid {
+namespace framing {
+class FieldTable;
+}
+namespace management {
+class ManagementAgent;
+}
+namespace broker {
+
+class Queue;
+/**
+ * Class to manage generation of QMF alerts when particular thresholds
+ * are breached on a queue.
+ */
+class ThresholdAlerts : public QueueObserver
+{
+ public:
+ ThresholdAlerts(const std::string& name,
+ qpid::management::ManagementAgent& agent,
+ const uint32_t countThreshold,
+ const uint64_t sizeThreshold,
+ const long repeatInterval);
+ void enqueued(const QueuedMessage&);
+ void dequeued(const QueuedMessage&);
+ static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
+ const uint64_t countThreshold,
+ const uint64_t sizeThreshold,
+ const long repeatInterval);
+ static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
+ const qpid::framing::FieldTable& settings);
+ static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
+ const qpid::types::Variant::Map& settings);
+ private:
+ const std::string name;
+ qpid::management::ManagementAgent& agent;
+ const uint32_t countThreshold;
+ const uint64_t sizeThreshold;
+ const qpid::sys::Duration repeatInterval;
+ uint64_t count;
+ uint64_t size;
+ qpid::sys::AbsTime lastAlert;
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_THRESHOLDALERTS_H*/
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 0582945a9c..e9b718e6de 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -529,7 +529,7 @@ void Connection::deliveryRecord(const string& qname,
m = getUpdateMessage();
m.queue = queue.get();
m.position = position;
- if (enqueued) queue->enqueued(m); //inform queue of the message
+ if (enqueued) queue->updateEnqueued(m); //inform queue of the message
} else { // Message at original position in original queue
m = queue->find(position);
}