From 6a87cd223a386526ccca0a1ba6f0bdd1c4320ba1 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Tue, 15 Feb 2011 14:17:45 +0000 Subject: QPID-3002: Configurable threshold alerts for queues git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1070913 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/CMakeLists.txt | 1 + cpp/src/Makefile.am | 3 + cpp/src/qpid/broker/Broker.cpp | 1 - cpp/src/qpid/broker/Queue.cpp | 67 +++++++++------ cpp/src/qpid/broker/Queue.h | 12 ++- cpp/src/qpid/broker/QueueEvents.cpp | 25 ++++++ cpp/src/qpid/broker/QueueEvents.h | 1 + cpp/src/qpid/broker/QueueObserver.h | 82 ++++++++++++++++++ cpp/src/qpid/broker/QueueRegistry.cpp | 6 -- cpp/src/qpid/broker/QueueRegistry.h | 2 - cpp/src/qpid/broker/ThresholdAlerts.cpp | 139 ++++++++++++++++++++++++++++++ cpp/src/qpid/broker/ThresholdAlerts.h | 146 ++++++++++++++++++++++++++++++++ cpp/src/qpid/cluster/Connection.cpp | 2 +- 13 files changed, 447 insertions(+), 40 deletions(-) create mode 100644 cpp/src/qpid/broker/QueueObserver.h create mode 100644 cpp/src/qpid/broker/ThresholdAlerts.cpp create mode 100644 cpp/src/qpid/broker/ThresholdAlerts.h diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index a709056899..71dce075a2 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -1011,6 +1011,7 @@ set (qpidbroker_SOURCES qpid/broker/SessionHandler.h qpid/broker/SessionHandler.cpp qpid/broker/System.cpp + qpid/broker/ThresholdAlerts.cpp qpid/broker/TopicExchange.cpp qpid/broker/TxAccept.cpp qpid/broker/TxBuffer.cpp diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index fc93653017..4e398ddc5e 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -603,6 +603,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/QueueEvents.h \ qpid/broker/QueueListeners.cpp \ qpid/broker/QueueListeners.h \ + qpid/broker/QueueObserver.h \ qpid/broker/QueuePolicy.cpp \ qpid/broker/QueuePolicy.h \ qpid/broker/QueueRegistry.cpp \ @@ -649,6 +650,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/SignalHandler.h \ qpid/broker/System.cpp \ qpid/broker/System.h \ + qpid/broker/ThresholdAlerts.cpp \ + qpid/broker/ThresholdAlerts.h \ qpid/broker/TopicExchange.cpp \ qpid/broker/TopicExchange.h \ qpid/broker/TransactionalStore.h \ diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index cb301916c2..3c692fc368 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -221,7 +221,6 @@ Broker::Broker(const Broker::Options& conf) : } QueuePolicy::setDefaultMaxSize(conf.queueLimit); - queues.setQueueEvents(&queueEvents); // Early-Initialize plugins Plugin::earlyInitAll(*this); diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 43d1a2b27c..d5dc3e85f1 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -31,6 +31,7 @@ #include "qpid/broker/MessageStore.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/ThresholdAlerts.h" #include "qpid/StringUtils.h" #include "qpid/log/Statement.h" @@ -104,7 +105,6 @@ Queue::Queue(const string& _name, bool _autodelete, policyExceeded(false), mgmtObject(0), eventMode(0), - eventMgr(0), insertSeqNo(0), broker(b), deleted(false), @@ -168,7 +168,6 @@ void Queue::deliver(boost::intrusive_ptr msg){ }else { push(msg); } - mgntEnqStats(msg); QPID_LOG(debug, "Message " << msg << " enqueued on " << name); } } @@ -187,7 +186,6 @@ void Queue::recover(boost::intrusive_ptr& msg){ msg->addToSyncList(shared_from_this(), store); } msg->enqueueComplete(); // mark the message as enqueued - mgntEnqStats(msg); if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) { //content has not been loaded, need to ensure that lazy loading mode is set: @@ -202,7 +200,6 @@ void Queue::recover(boost::intrusive_ptr& msg){ void Queue::process(boost::intrusive_ptr& msg){ push(msg); - mgntEnqStats(msg); if (mgmtObject != 0){ mgmtObject->inc_msgTxnEnqueues (); mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); @@ -527,14 +524,7 @@ void Queue::push(boost::intrusive_ptr& msg, bool isRecovery){ dequeueRequired = messages->push(qm, removed); listeners.populate(copy); - - if (eventMode) { - if (eventMgr) eventMgr->enqueued(qm); - else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName()); - } - if (policy.get()) { - policy->enqueued(qm); - } + enqueued(qm); } copy.notify(); if (dequeueRequired) { @@ -717,8 +707,12 @@ void Queue::dequeued(const QueuedMessage& msg) { if (policy.get()) policy->dequeued(msg); mgntDeqStats(msg.payload); - if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) { - eventMgr->dequeued(msg); + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + (*i)->dequeued(msg); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of dequeue for queue " << getName() << ": " << e.what()); + } } } @@ -736,12 +730,15 @@ void Queue::configure(const FieldTable& _settings, bool recovering) { eventMode = _settings.getAsInt(qpidQueueEventGeneration); + if (eventMode && broker) { + broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY); + } if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && - (!store || NullMessageStore::isNullStore(store) || (eventMode && eventMgr && !eventMgr->isSync()) )) { + (!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) { if ( NullMessageStore::isNullStore(store)) { QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName()); - } else if (eventMgr && !eventMgr->isSync() ) { + } else if (broker && !(broker->getQueueEvents().isSync()) ) { QPID_LOG(warning, "Flow to disk not valid with async Queue Events:" << getName()); } FieldTable copy(_settings); @@ -750,6 +747,10 @@ void Queue::configure(const FieldTable& _settings, bool recovering) } else { setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings)); } + if (broker && broker->getManagementAgent()) { + ThresholdAlerts::observe(*this, *(broker->getManagementAgent()), _settings); + } + //set this regardless of owner to allow use of no-local with exclusive consumers also noLocal = _settings.get(qpidNoLocal); QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal); @@ -1027,11 +1028,6 @@ SequenceNumber Queue::getPosition() { int Queue::getEventMode() { return eventMode; } -void Queue::setQueueEventManager(QueueEvents& mgr) -{ - eventMgr = &mgr; -} - void Queue::recoveryComplete(ExchangeRegistry& exchanges) { // set the alternate exchange @@ -1057,14 +1053,28 @@ void Queue::insertSequenceNumbers(const std::string& key) void Queue::enqueued(const QueuedMessage& m) { - if (m.payload) { - if (policy.get()) { - policy->recoverEnqueued(m.payload); - policy->enqueued(m); + for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) { + try { + (*i)->enqueued(m); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what()); } - mgntEnqStats(m.payload); + } + if (policy.get()) { + policy->enqueued(m); + } + mgntEnqStats(m.payload); +} + +void Queue::updateEnqueued(const QueuedMessage& m) +{ + if (m.payload) { boost::intrusive_ptr payload = m.payload; enqueue ( 0, payload, true ); + if (policy.get()) { + policy->recoverEnqueued(payload); + } + enqueued(m); } else { QPID_LOG(warning, "Queue informed of enqueued message that has no payload"); } @@ -1086,6 +1096,11 @@ void Queue::checkNotDeleted() } } +void Queue::addObserver(boost::shared_ptr observer) +{ + observers.insert(observer); +} + void Queue::flush() { ScopedUse u(barrier); diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 66e4c5fa22..664b1e0f01 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -31,6 +31,7 @@ #include "qpid/broker/QueuePolicy.h" #include "qpid/broker/QueueBindings.h" #include "qpid/broker/QueueListeners.h" +#include "qpid/broker/QueueObserver.h" #include "qpid/broker/RateTracker.h" #include "qpid/framing/FieldTable.h" @@ -47,6 +48,7 @@ #include #include #include +#include #include namespace qpid { @@ -86,6 +88,7 @@ class Queue : public boost::enable_shared_from_this, ~ScopedUse() { if (acquired) barrier.release(); } }; + typedef std::set< boost::shared_ptr > Observers; enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2}; @@ -117,7 +120,7 @@ class Queue : public boost::enable_shared_from_this, qmf::org::apache::qpid::broker::Queue* mgmtObject; RateTracker dequeueTracker; int eventMode; - QueueEvents* eventMgr; + Observers observers; bool insertSeqNo; std::string seqNoKey; Broker* broker; @@ -136,11 +139,13 @@ class Queue : public boost::enable_shared_from_this, bool isExcluded(boost::intrusive_ptr& msg); + void enqueued(const QueuedMessage& msg); void dequeued(const QueuedMessage& msg); void pop(); void popAndDequeue(); QueuedMessage getFront(); void forcePersistent(QueuedMessage& msg); + int getEventMode(); inline void mgntEnqStats(const boost::intrusive_ptr& msg) { @@ -270,7 +275,7 @@ class Queue : public boost::enable_shared_from_this, * thus are still logically on the queue) - used in * clustered broker. */ - void enqueued(const QueuedMessage& msg); + void updateEnqueued(const QueuedMessage& msg); /** * Test whether the specified message (identified by its @@ -331,8 +336,7 @@ class Queue : public boost::enable_shared_from_this, /** return current position sequence number for the next message on the queue. */ QPID_BROKER_EXTERN framing::SequenceNumber getPosition(); - int getEventMode(); - void setQueueEventManager(QueueEvents&); + void addObserver(boost::shared_ptr); QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key); /** * Notify queue that recovery has completed. diff --git a/cpp/src/qpid/broker/QueueEvents.cpp b/cpp/src/qpid/broker/QueueEvents.cpp index bba054b0b8..2c540ff1ad 100644 --- a/cpp/src/qpid/broker/QueueEvents.cpp +++ b/cpp/src/qpid/broker/QueueEvents.cpp @@ -19,6 +19,8 @@ * */ #include "qpid/broker/QueueEvents.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/QueueObserver.h" #include "qpid/Exception.h" #include "qpid/log/Statement.h" @@ -115,6 +117,29 @@ bool QueueEvents::isSync() return sync; } +class EventGenerator : public QueueObserver +{ + public: + EventGenerator(QueueEvents& mgr, bool enqOnly) : manager(mgr), enqueueOnly(enqOnly) {} + void enqueued(const QueuedMessage& m) + { + manager.enqueued(m); + } + void dequeued(const QueuedMessage& m) + { + if (!enqueueOnly) manager.dequeued(m); + } + private: + QueueEvents& manager; + const bool enqueueOnly; +}; + +void QueueEvents::observe(Queue& queue, bool enqueueOnly) +{ + boost::shared_ptr observer(new EventGenerator(*this, enqueueOnly)); + queue.addObserver(observer); +} + QueueEvents::Event::Event(EventType t, const QueuedMessage& m) : type(t), msg(m) {} diff --git a/cpp/src/qpid/broker/QueueEvents.h b/cpp/src/qpid/broker/QueueEvents.h index c42752133e..fcddfe9092 100644 --- a/cpp/src/qpid/broker/QueueEvents.h +++ b/cpp/src/qpid/broker/QueueEvents.h @@ -63,6 +63,7 @@ class QueueEvents QPID_BROKER_EXTERN void unregisterListener(const std::string& id); void enable(); void disable(); + void observe(Queue&, bool enqueueOnly); //process all outstanding events QPID_BROKER_EXTERN void shutdown(); QPID_BROKER_EXTERN bool isSync(); diff --git a/cpp/src/qpid/broker/QueueObserver.h b/cpp/src/qpid/broker/QueueObserver.h new file mode 100644 index 0000000000..2c9354cfe7 --- /dev/null +++ b/cpp/src/qpid/broker/QueueObserver.h @@ -0,0 +1,82 @@ +#ifndef QPID_BROKER_QUEUEOBSERVER_H +#define QPID_BROKER_QUEUEOBSERVER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace qpid { +namespace broker { + +class QueuedMessage; +/** + * Interface for notifying classes who want to act as 'observers' of a + * queue of particular events. + */ +class QueueObserver +{ + public: + virtual void enqueued(const QueuedMessage&) = 0; + virtual void dequeued(const QueuedMessage&) = 0; + private: +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_QUEUEOBSERVER_H*/ +#ifndef QPID_BROKER_QUEUEOBSERVER_H +#define QPID_BROKER_QUEUEOBSERVER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace qpid { +namespace broker { + +class QueuedMessage; +/** + * Interface for notifying classes who want to act as 'observers' of a + * queue of particular events. + */ +class QueueObserver +{ + public: + virtual void enqueued(const QueuedMessage&) = 0; + virtual void dequeued(const QueuedMessage&) = 0; + private: +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_QUEUEOBSERVER_H*/ diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index 28b2d60cda..ea2531dae7 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -47,7 +47,6 @@ QueueRegistry::declare(const string& declareName, bool durable, Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker)); queues[name] = queue; if (lastNode) queue->setLastNodeFailure(); - if (events) queue->setQueueEventManager(*events); return std::pair(queue, true); } else { @@ -108,8 +107,3 @@ void QueueRegistry::updateQueueClusterState(bool _lastNode) } lastNode = _lastNode; } - -void QueueRegistry::setQueueEvents(QueueEvents* e) -{ - events = e; -} diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h index 66437f9665..57859fe639 100644 --- a/cpp/src/qpid/broker/QueueRegistry.h +++ b/cpp/src/qpid/broker/QueueRegistry.h @@ -96,8 +96,6 @@ class QueueRegistry { */ std::string generateName(); - void setQueueEvents(QueueEvents*); - /** * Set the store to use. May only be called once. */ diff --git a/cpp/src/qpid/broker/ThresholdAlerts.cpp b/cpp/src/qpid/broker/ThresholdAlerts.cpp new file mode 100644 index 0000000000..5e61ff0e2b --- /dev/null +++ b/cpp/src/qpid/broker/ThresholdAlerts.cpp @@ -0,0 +1,139 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/ThresholdAlerts.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/QueuedMessage.h" +#include "qpid/amqp_0_10/Codecs.h" +#include "qpid/log/Statement.h" +#include "qpid/management/ManagementAgent.h" +#include "qmf/org/apache/qpid/broker/EventQueueThresholdExceeded.h" + +namespace qpid { +namespace broker { +ThresholdAlerts::ThresholdAlerts(const std::string& n, + qpid::management::ManagementAgent& a, + const uint32_t ct, + const uint64_t st, + const long repeat) + : name(n), agent(a), countThreshold(ct), sizeThreshold(st), + repeatInterval(repeat ? repeat*qpid::sys::TIME_SEC : 0), + count(0), size(0), lastAlert(qpid::sys::EPOCH) {} + +void ThresholdAlerts::enqueued(const QueuedMessage& m) +{ + size += m.payload->contentSize(); + ++count; + if ((countThreshold && count >= countThreshold) || (sizeThreshold && size >= sizeThreshold)) { + if ((repeatInterval == 0 && lastAlert == qpid::sys::EPOCH) + || qpid::sys::Duration(lastAlert, qpid::sys::now()) > repeatInterval) { + agent.raiseEvent(qmf::org::apache::qpid::broker::EventQueueThresholdExceeded(name, count, size)); + lastAlert = qpid::sys::now(); + } + } +} + +void ThresholdAlerts::dequeued(const QueuedMessage& m) +{ + size -= m.payload->contentSize(); + --count; + if ((countThreshold && count < countThreshold) || (sizeThreshold && size < sizeThreshold)) { + lastAlert = qpid::sys::EPOCH; + } +} + + + +void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent, + const uint64_t countThreshold, + const uint64_t sizeThreshold, + const long repeatInterval) +{ + if (countThreshold || sizeThreshold) { + boost::shared_ptr observer( + new ThresholdAlerts(queue.getName(), agent, countThreshold, sizeThreshold, repeatInterval) + ); + queue.addObserver(observer); + } +} + +void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent, + const qpid::framing::FieldTable& settings) + +{ + qpid::types::Variant::Map map; + qpid::amqp_0_10::translate(settings, map); + observe(queue, agent, map); +} + +template +class Option +{ + public: + Option(const std::string& name, T d) : defaultValue(d) { names.push_back(name); } + void addAlias(const std::string& name) { names.push_back(name); } + T get(const qpid::types::Variant::Map& settings) const + { + T value(defaultValue); + for (std::vector::const_iterator i = names.begin(); i != names.end(); ++i) { + if (get(settings, *i, value)) break; + } + return value; + } + private: + std::vector names; + T defaultValue; + + bool get(const qpid::types::Variant::Map& settings, const std::string& name, T& value) const + { + qpid::types::Variant::Map::const_iterator i = settings.find(name); + if (i != settings.end()) { + try { + value = (T) i->second; + } catch (const qpid::types::InvalidConversion&) { + QPID_LOG(warning, "Bad value for" << name << ": " << i->second); + } + return true; + } else { + return false; + } + } +}; + +void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent, + const qpid::types::Variant::Map& settings) + +{ + //Note: aliases are keys defined by java broker + Option repeatInterval("qpid.alert_repeat_gap", 60); + repeatInterval.addAlias("x-qpid-minimum-alert-repeat-gap"); + + //If no explicit threshold settings were given use 80% of any + //limit from the policy. + const QueuePolicy* policy = queue.getPolicy(); + Option countThreshold("qpid.alert_count", policy ? policy->getMaxCount()*0.8 : 0); + countThreshold.addAlias("x-qpid-maximum-message-count"); + Option sizeThreshold("qpid.alert_size", policy ? policy->getMaxSize()*0.8 : 0); + sizeThreshold.addAlias("x-qpid-maximum-message-size"); + + observe(queue, agent, countThreshold.get(settings), sizeThreshold.get(settings), repeatInterval.get(settings)); +} + +}} diff --git a/cpp/src/qpid/broker/ThresholdAlerts.h b/cpp/src/qpid/broker/ThresholdAlerts.h new file mode 100644 index 0000000000..7437c401a8 --- /dev/null +++ b/cpp/src/qpid/broker/ThresholdAlerts.h @@ -0,0 +1,146 @@ +#ifndef QPID_BROKER_THRESHOLDALERTS_H +#define QPID_BROKER_THRESHOLDALERTS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/QueueObserver.h" +#include "qpid/sys/Time.h" +#include "qpid/types/Variant.h" +#include + +namespace qpid { +namespace framing { +class FieldTable; +} +namespace management { +class ManagementAgent; +} +namespace broker { + +class Queue; +/** + * Class to manage generation of QMF alerts when particular thresholds + * are breached on a queue. + */ +class ThresholdAlerts : public QueueObserver +{ + public: + ThresholdAlerts(const std::string& name, + qpid::management::ManagementAgent& agent, + const uint32_t countThreshold, + const uint64_t sizeThreshold, + const long repeatInterval); + void enqueued(const QueuedMessage&); + void dequeued(const QueuedMessage&); + static void observe(Queue& queue, qpid::management::ManagementAgent& agent, + const uint64_t countThreshold, + const uint64_t sizeThreshold, + const long repeatInterval); + static void observe(Queue& queue, qpid::management::ManagementAgent& agent, + const qpid::framing::FieldTable& settings); + static void observe(Queue& queue, qpid::management::ManagementAgent& agent, + const qpid::types::Variant::Map& settings); + private: + const std::string name; + qpid::management::ManagementAgent& agent; + const uint32_t countThreshold; + const uint64_t sizeThreshold; + const qpid::sys::Duration repeatInterval; + uint64_t count; + uint64_t size; + qpid::sys::AbsTime lastAlert; +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_THRESHOLDALERTS_H*/ +#ifndef QPID_BROKER_THRESHOLDALERTS_H +#define QPID_BROKER_THRESHOLDALERTS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/QueueObserver.h" +#include "qpid/sys/Time.h" +#include "qpid/types/Variant.h" +#include + +namespace qpid { +namespace framing { +class FieldTable; +} +namespace management { +class ManagementAgent; +} +namespace broker { + +class Queue; +/** + * Class to manage generation of QMF alerts when particular thresholds + * are breached on a queue. + */ +class ThresholdAlerts : public QueueObserver +{ + public: + ThresholdAlerts(const std::string& name, + qpid::management::ManagementAgent& agent, + const uint32_t countThreshold, + const uint64_t sizeThreshold, + const long repeatInterval); + void enqueued(const QueuedMessage&); + void dequeued(const QueuedMessage&); + static void observe(Queue& queue, qpid::management::ManagementAgent& agent, + const uint64_t countThreshold, + const uint64_t sizeThreshold, + const long repeatInterval); + static void observe(Queue& queue, qpid::management::ManagementAgent& agent, + const qpid::framing::FieldTable& settings); + static void observe(Queue& queue, qpid::management::ManagementAgent& agent, + const qpid::types::Variant::Map& settings); + private: + const std::string name; + qpid::management::ManagementAgent& agent; + const uint32_t countThreshold; + const uint64_t sizeThreshold; + const qpid::sys::Duration repeatInterval; + uint64_t count; + uint64_t size; + qpid::sys::AbsTime lastAlert; +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_THRESHOLDALERTS_H*/ diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 0582945a9c..e9b718e6de 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -529,7 +529,7 @@ void Connection::deliveryRecord(const string& qname, m = getUpdateMessage(); m.queue = queue.get(); m.position = position; - if (enqueued) queue->enqueued(m); //inform queue of the message + if (enqueued) queue->updateEnqueued(m); //inform queue of the message } else { // Message at original position in original queue m = queue->find(position); } -- cgit v1.2.1