diff options
author | Alan Conway <aconway@apache.org> | 2010-01-29 22:59:09 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-01-29 22:59:09 +0000 |
commit | a78bf7b9144ed3db8e798124595f48fc75231cce (patch) | |
tree | b7284043fe639a2c6a880fc33836ce0b51d21b7e /cpp | |
parent | 726b23f43478a85b961365e4de3a9302a261f6b3 (diff) | |
download | qpid-python-a78bf7b9144ed3db8e798124595f48fc75231cce.tar.gz |
Replace PeriodicTimer with ClusterTimer, which inherits from Timer.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@904656 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/CMakeLists.txt | 2 | ||||
-rw-r--r-- | cpp/src/Makefile.am | 5 | ||||
-rw-r--r-- | cpp/src/cluster.cmake | 4 | ||||
-rw-r--r-- | cpp/src/cluster.mk | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 48 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterTimer.cpp | 114 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterTimer.h | 58 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 26 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 17 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Timer.cpp | 20 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Timer.h | 39 | ||||
-rw-r--r-- | cpp/xml/cluster.xml | 8 |
15 files changed, 298 insertions, 78 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 6570740872..9270515515 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -573,7 +573,6 @@ set (qpidcommon_SOURCES qpid/sys/Runnable.cpp qpid/sys/Shlib.cpp qpid/sys/Timer.cpp - qpid/sys/PeriodicTimerImpl.cpp ) add_library (qpidcommon SHARED ${qpidcommon_SOURCES}) @@ -735,7 +734,6 @@ set (qpidbroker_SOURCES qpid/broker/Connection.cpp qpid/broker/ConnectionHandler.cpp qpid/broker/ConnectionFactory.cpp - qpid/broker/DelegatingPeriodicTimer.cpp qpid/broker/DeliverableMessage.cpp qpid/broker/DeliveryRecord.cpp qpid/broker/DirectExchange.cpp diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 58214d42c6..bc8531af92 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -471,9 +471,6 @@ libqpidcommon_la_SOURCES += \ qpid/sys/TimeoutHandler.h \ qpid/sys/Timer.cpp \ qpid/sys/Timer.h \ - qpid/sys/PeriodicTimer.h \ - qpid/sys/PeriodicTimerImpl.h \ - qpid/sys/PeriodicTimerImpl.cpp \ qpid/sys/Waitable.h \ qpid/sys/alloca.h \ qpid/sys/uuid.h @@ -508,8 +505,6 @@ libqpidbroker_la_SOURCES = \ qpid/broker/Consumer.h \ qpid/broker/Daemon.cpp \ qpid/broker/Daemon.h \ - qpid/broker/DelegatingPeriodicTimer.h \ - qpid/broker/DelegatingPeriodicTimer.cpp \ qpid/broker/Deliverable.h \ qpid/broker/DeliverableMessage.cpp \ qpid/broker/DeliverableMessage.h \ diff --git a/cpp/src/cluster.cmake b/cpp/src/cluster.cmake index 41b048382b..8f886e7f3f 100644 --- a/cpp/src/cluster.cmake +++ b/cpp/src/cluster.cmake @@ -81,6 +81,8 @@ if (BUILD_CLUSTER) ${CMAN_SOURCES} qpid/cluster/Cluster.cpp qpid/cluster/Cluster.h + qpid/cluster/ClusterTimer.cpp + qpid/cluster/ClusterTimer.h qpid/cluster/Decoder.cpp qpid/cluster/Decoder.h qpid/cluster/PollableQueue.h @@ -129,8 +131,6 @@ if (BUILD_CLUSTER) qpid/cluster/MemberSet.h qpid/cluster/MemberSet.cpp qpid/cluster/types.h - qpid/cluster/PeriodicTimerImpl.h - qpid/cluster/PeriodicTimerImpl.cpp qpid/cluster/StoreStatus.h qpid/cluster/StoreStatus.cpp ) diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index db2c20f5eb..8e95747c4d 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -40,6 +40,8 @@ cluster_la_SOURCES = \ $(CMAN_SOURCES) \ qpid/cluster/Cluster.cpp \ qpid/cluster/Cluster.h \ + qpid/cluster/ClusterTimer.cpp \ + qpid/cluster/ClusterTimer.h \ qpid/cluster/Decoder.cpp \ qpid/cluster/Decoder.h \ qpid/cluster/PollableQueue.h \ @@ -88,8 +90,6 @@ cluster_la_SOURCES = \ qpid/cluster/MemberSet.h \ qpid/cluster/MemberSet.cpp \ qpid/cluster/types.h \ - qpid/cluster/PeriodicTimerImpl.h \ - qpid/cluster/PeriodicTimerImpl.cpp \ qpid/cluster/StoreStatus.h \ qpid/cluster/StoreStatus.cpp diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index cd3b014256..cbccca6eea 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -49,7 +49,6 @@ #include "qpid/sys/ConnectionInputHandlerFactory.h" #include "qpid/sys/TimeoutHandler.h" #include "qpid/sys/SystemInfo.h" -#include "qpid/sys/PeriodicTimerImpl.h" #include "qpid/Address.h" #include "qpid/Url.h" #include "qpid/Version.h" @@ -257,11 +256,7 @@ Broker::Broker(const Broker::Options& conf) : // Initialize plugins Plugin::initializeAll(*this); - if (!periodicTimer.hasDelegate()) { - // If no plugin has contributed a PeriodicTimer, use the default one. - periodicTimer.setDelegate( - std::auto_ptr<sys::PeriodicTimer>(new sys::PeriodicTimerImpl(timer))); - } + if (managementAgent.get()) managementAgent->pluginsInitialized(); if (conf.queueCleanInterval) { queueCleaner.start(conf.queueCleanInterval * qpid::sys::TIME_SEC); @@ -474,8 +469,8 @@ Broker::getKnownBrokersImpl() return knownBrokers; } -void Broker::setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer> pt) { - periodicTimer.setDelegate(pt); +void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) { + clusterTimer = t; } const std::string Broker::TCP_TRANSPORT("tcp"); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 302ef74e3d..465a17f4eb 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -25,7 +25,6 @@ #include "qpid/broker/BrokerImportExport.h" #include "qpid/broker/ConnectionFactory.h" #include "qpid/broker/ConnectionToken.h" -#include "qpid/broker/DelegatingPeriodicTimer.h" #include "qpid/broker/DirectExchange.h" #include "qpid/broker/DtxManager.h" #include "qpid/broker/ExchangeRegistry.h" @@ -147,7 +146,7 @@ public: boost::shared_ptr<sys::Poller> poller; sys::Timer timer; - DelegatingPeriodicTimer periodicTimer; + std::auto_ptr<sys::Timer> clusterTimer; Options config; std::auto_ptr<management::ManagementAgent> managementAgent; ProtocolFactoryMap protocolFactories; @@ -254,9 +253,12 @@ public: boost::shared_ptr<sys::ConnectionCodec::Factory> getConnectionFactory() { return factory; } void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; } + /** Timer for local tasks affecting only this broker */ sys::Timer& getTimer() { return timer; } - sys::PeriodicTimer& getPeriodicTimer() { return periodicTimer; } - void setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer> pt); + + /** Timer for tasks that must be synchronized if we are in a cluster */ + sys::Timer& getClusterTimer() { return clusterTimer.get() ? *clusterTimer : timer; } + void setClusterTimer(std::auto_ptr<sys::Timer>); boost::function<std::vector<Url> ()> getKnownBrokers; diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 0d0fb7bcee..7dd8c7e62c 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -112,7 +112,7 @@ #include "qpid/cluster/RetractClient.h" #include "qpid/cluster/FailoverExchange.h" #include "qpid/cluster/UpdateExchange.h" -#include "qpid/cluster/PeriodicTimerImpl.h" +#include "qpid/cluster/ClusterTimer.h" #include "qpid/assert.h" #include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" @@ -137,7 +137,7 @@ #include "qpid/framing/ClusterUpdateRequestBody.h" #include "qpid/framing/ClusterConnectionAnnounceBody.h" #include "qpid/framing/ClusterErrorCheckBody.h" -#include "qpid/framing/ClusterPeriodicTimerBody.h" +#include "qpid/framing/ClusterTimerWakeupBody.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Helpers.h" #include "qpid/log/Statement.h" @@ -179,7 +179,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 903171; +const uint32_t Cluster::CLUSTER_VERSION = 904565; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -209,9 +209,8 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) { cluster.errorCheck(member, type, frameSeq, l); } - void periodicTimer(const std::string& name) { - cluster.periodicTimer(member, name, l); - } + void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); } + void timerDrop(const std::string& name) { cluster.timerWakeup(member, name, l); } void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); } @@ -245,6 +244,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : state(INIT), initMap(self, settings.size), store(broker.getDataDir().getPath()), + elder(false), lastSize(0), lastBroker(false), updateRetracted(false), @@ -252,8 +252,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : { // We give ownership of the timer to the broker and keep a plain pointer. // This is OK as it means the timer has the same lifetime as the broker. - timer = new PeriodicTimerImpl(*this); - broker.setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer>(timer)); + timer = new ClusterTimer(*this); + broker.setClusterTimer(std::auto_ptr<sys::Timer>(timer)); mAgent = broker.getManagementAgent(); if (mAgent != 0){ @@ -577,14 +577,13 @@ void Cluster::initMapCompleted(Lock& l) { initMap.checkConsistent(); elders = initMap.getElders(); QPID_LOG(debug, *this << " elders: " << elders); - if (!elders.empty()) { // I'm not the elder, I don't handle links & replication. + if (elders.empty()) + becomeElder(); + else { broker.getLinks().setPassive(true); broker.getQueueEvents().disable(); QPID_LOG(info, *this << " not active for links."); } - else { - QPID_LOG(info, this << " active for links."); - } setClusterId(initMap.getClusterId(), l); if (store.hasStore()) store.dirty(clusterId); @@ -636,14 +635,19 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& if (state >= CATCHUP && memberChange) { memberUpdate(l); - if (elders.empty()) { - // We are the oldest, reactive links if necessary - QPID_LOG(info, this << " becoming active for links."); - broker.getLinks().setPassive(false); - } + if (elders.empty()) becomeElder(); } } +void Cluster::becomeElder() { + if (elder) return; // We were already the elder. + // We are the oldest, reactive links if necessary + QPID_LOG(info, *this << " became the elder, active for links."); + elder = true; + broker.getLinks().setPassive(false); + timer->becomeElder(); +} + void Cluster::makeOffer(const MemberId& id, Lock& ) { if (state == READY && map.isJoiner(id)) { state = OFFER; @@ -962,13 +966,17 @@ void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNu error.respondNone(from, type, frameSeq); } -void Cluster::periodicTimer(const MemberId&, const std::string& name, Lock&) { - timer->deliver(name); +void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) { + timer->deliverWakeup(name); +} + +void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) { + timer->deliverDrop(name); } bool Cluster::isElder() const { Monitor::ScopedLock l(lock); - return state >= CATCHUP && elders.empty(); + return elder; } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 08911081ea..977c873e29 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -63,7 +63,7 @@ namespace cluster { class Connection; class EventFrame; -class PeriodicTimerImpl; +class ClusterTimer; /** * Connection to the cluster @@ -164,7 +164,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { void configChange(const MemberId&, const std::string& current, Lock& l); void messageExpired(const MemberId&, uint64_t, Lock& l); void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&); - void periodicTimer(const MemberId&, const std::string& name, Lock&); + void timerWakeup(const MemberId&, const std::string& name, Lock&); + void timerDrop(const MemberId&, const std::string& name, Lock&); void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&); @@ -201,6 +202,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { const struct cpg_address */*joined*/, int /*nJoined*/ ); + void becomeElder(); + // == Called in management threads. virtual qpid::management::ManagementObject* GetManagementObject() const; virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); @@ -265,6 +268,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { StoreStatus store; ClusterMap map; MemberSet elders; + bool elder; size_t lastSize; bool lastBroker; sys::Thread updateThread; @@ -272,7 +276,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { bool updateRetracted; ErrorCheck error; UpdateReceiver updateReceiver; - PeriodicTimerImpl* timer; + ClusterTimer* timer; friend std::ostream& operator<<(std::ostream&, const Cluster&); friend class ClusterDispatcher; diff --git a/cpp/src/qpid/cluster/ClusterTimer.cpp b/cpp/src/qpid/cluster/ClusterTimer.cpp new file mode 100644 index 0000000000..612758152f --- /dev/null +++ b/cpp/src/qpid/cluster/ClusterTimer.cpp @@ -0,0 +1,114 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Cluster.h" +#include "ClusterTimer.h" +#include "qpid/log/Statement.h" +#include "qpid/framing/ClusterTimerWakeupBody.h" +#include "qpid/framing/ClusterTimerDropBody.h" + +namespace qpid { +namespace cluster { + +using boost::intrusive_ptr; +using std::max; +using sys::Timer; +using sys::TimerTask; + + +ClusterTimer::ClusterTimer(Cluster& c) : cluster(c) {} + +ClusterTimer::~ClusterTimer() {} + +// Initialization or deliver thread. +void ClusterTimer::add(intrusive_ptr<TimerTask> task) +{ + QPID_LOG(trace, "Adding cluster timer task " << task->getName()); + Map::iterator i = map.find(task->getName()); + if (i != map.end()) + throw Exception(QPID_MSG("Task already exists with name " << task->getName())); + map[task->getName()] = task; + // Only the elder actually activates the task with the Timer base class. + if (cluster.isElder()) { + QPID_LOG(trace, "Elder activating cluster timer task " << task->getName()); + Timer::add(task); + } +} + +// Timer thread +void ClusterTimer::fire(intrusive_ptr<TimerTask> t) { + // Elder mcasts wakeup on fire, task is not fired until deliverWakeup + if (cluster.isElder()) { + QPID_LOG(trace, "Sending cluster timer wakeup " << t->getName()); + cluster.getMulticast().mcastControl( + framing::ClusterTimerWakeupBody(framing::ProtocolVersion(), t->getName()), + cluster.getId()); + } + else + QPID_LOG(trace, "Cluster timer task fired, but not elder " << t->getName()); +} + +// Timer thread +void ClusterTimer::drop(intrusive_ptr<TimerTask> t) { + // Elder mcasts drop, task is droped in deliverDrop + if (cluster.isElder()) { + QPID_LOG(trace, "Sending cluster timer drop " << t->getName()); + cluster.getMulticast().mcastControl( + framing::ClusterTimerDropBody(framing::ProtocolVersion(), t->getName()), + cluster.getId()); + } + else + QPID_LOG(trace, "Cluster timer task dropped, but not on elder " << t->getName()); +} + +// Deliver thread +void ClusterTimer::deliverWakeup(const std::string& name) { + QPID_LOG(trace, "Cluster timer wakeup delivered for " << name); + Map::iterator i = map.find(name); + if (i == map.end()) + throw Exception(QPID_MSG("Cluster timer wakeup non-existent task " << name)); + else { + intrusive_ptr<TimerTask> t = i->second; + map.erase(i); + Timer::fire(t); + } +} + +// Deliver thread +void ClusterTimer::deliverDrop(const std::string& name) { + QPID_LOG(trace, "Cluster timer drop delivered for " << name); + Map::iterator i = map.find(name); + if (i == map.end()) + throw Exception(QPID_MSG("Cluster timer drop non-existent task " << name)); + else { + intrusive_ptr<TimerTask> t = i->second; + map.erase(i); + } +} + +// Deliver thread +void ClusterTimer::becomeElder() { + for (Map::iterator i = map.begin(); i != map.end(); ++i) { + Timer::add(i->second); + } +} + +}} diff --git a/cpp/src/qpid/cluster/ClusterTimer.h b/cpp/src/qpid/cluster/ClusterTimer.h new file mode 100644 index 0000000000..395e505451 --- /dev/null +++ b/cpp/src/qpid/cluster/ClusterTimer.h @@ -0,0 +1,58 @@ +#ifndef QPID_CLUSTER_CLUSTERTIMER_H +#define QPID_CLUSTER_CLUSTERTIMER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Timer.h" +#include <map> + +namespace qpid { +namespace cluster { + +class Cluster; + +class ClusterTimer : public sys::Timer { + public: + ClusterTimer(Cluster&); + ~ClusterTimer(); + + void add(boost::intrusive_ptr<sys::TimerTask> task); + + void deliverWakeup(const std::string& name); + void deliverDrop(const std::string& name); + void becomeElder(); + + protected: + void fire(boost::intrusive_ptr<sys::TimerTask> task); + void drop(boost::intrusive_ptr<sys::TimerTask> task); + + private: + typedef std::map<std::string, boost::intrusive_ptr<sys::TimerTask> > Map; + Cluster& cluster; + Map map; +}; + + +}} + + +#endif /*!QPID_CLUSTER_CLUSTERTIMER_H*/ diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 9f7d8046d4..e00f394a01 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -52,7 +52,8 @@ ManagementAgent::RemoteAgent::~RemoteAgent () } ManagementAgent::ManagementAgent () : - threadPoolSize(1), interval(10), broker(0), startTime(uint64_t(Duration(now()))) + threadPoolSize(1), interval(10), broker(0), timer(0), + startTime(uint64_t(Duration(now()))) { nextObjectId = 1; brokerBank = 1; @@ -91,12 +92,8 @@ void ManagementAgent::configure(const string& _dataDir, uint16_t _interval, dataDir = _dataDir; interval = _interval; broker = _broker; - timer = &_broker->getPeriodicTimer(); threadPoolSize = _threads; ManagementObject::maxThreads = threadPoolSize; - timer->add (boost::bind(&ManagementAgent::periodicProcessing, this), - interval * sys::TIME_SEC, - "ManagementAgent::periodicProcessing"); // Get from file or generate and save to file. if (dataDir.empty()) @@ -135,6 +132,12 @@ void ManagementAgent::configure(const string& _dataDir, uint16_t _interval, } } +void ManagementAgent::pluginsInitialized() { + // Do this here so cluster plugin has the chance to set up the timer. + timer = &broker->getClusterTimer(); + timer->add(new Periodic(*this, interval)); +} + void ManagementAgent::writeData () { string filename (dataDir + "/.mbrokerdata"); @@ -233,6 +236,19 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi "console.event.1.0." + event.getPackageName() + "." + event.getEventName()); } +ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) + : TimerTask (qpid::sys::Duration((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC), + "ManagementAgent::periodicProcessing"), + agent(_agent) {} + +ManagementAgent::Periodic::~Periodic () {} + +void ManagementAgent::Periodic::fire () +{ + agent.timer->add (new Periodic (agent, agent.interval)); + agent.periodicProcessing (); +} + void ManagementAgent::clientAdded (const std::string& routingKey) { if (routingKey.find("console") != 0) diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index ca8bfe97ed..b9ac54c064 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -26,7 +26,7 @@ #include "qpid/broker/Exchange.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/Mutex.h" -#include "qpid/sys/PeriodicTimer.h" +#include "qpid/sys/Timer.h" #include "qpid/broker/ConnectionToken.h" #include "qpid/management/ManagementObject.h" #include "qpid/management/ManagementEvent.h" @@ -65,8 +65,12 @@ public: ManagementAgent (); virtual ~ManagementAgent (); + /** Called before plugins are initialized */ void configure (const std::string& dataDir, uint16_t interval, qpid::broker::Broker* broker, int threadPoolSize); + /** Called after plugins are initialized. */ + void pluginsInitialized(); + void setInterval (uint16_t _interval) { interval = _interval; } void setExchange (qpid::broker::Exchange::shared_ptr mgmtExchange, qpid::broker::Exchange::shared_ptr directExchange); @@ -112,6 +116,15 @@ public: void setBootSequence(uint16_t b) { bootSequence = b; } private: + struct Periodic : public qpid::sys::TimerTask + { + ManagementAgent& agent; + + Periodic (ManagementAgent& agent, uint32_t seconds); + virtual ~Periodic (); + void fire (); + }; + // Storage for tracking remote management agents, attached via the client // management agent API. // @@ -203,7 +216,7 @@ private: std::string dataDir; uint16_t interval; qpid::broker::Broker* broker; - qpid::sys::PeriodicTimer* timer; + qpid::sys::Timer* timer; uint16_t bootSequence; uint32_t nextObjectId; uint32_t brokerBank; diff --git a/cpp/src/qpid/sys/Timer.cpp b/cpp/src/qpid/sys/Timer.cpp index c18fd93538..fcd58b187f 100644 --- a/cpp/src/qpid/sys/Timer.cpp +++ b/cpp/src/qpid/sys/Timer.cpp @@ -30,14 +30,16 @@ using std::max; namespace qpid { namespace sys { -TimerTask::TimerTask(Duration timeout) : +TimerTask::TimerTask(Duration timeout, const std::string& n) : + name(n), sortTime(AbsTime::FarFuture()), period(timeout), nextFireTime(AbsTime::now(), timeout), cancelled(false) {} -TimerTask::TimerTask(AbsTime time) : +TimerTask::TimerTask(AbsTime time, const std::string& n) : + name(n), sortTime(AbsTime::FarFuture()), period(0), nextFireTime(time), @@ -102,13 +104,15 @@ void Timer::run() { ScopedLock<Mutex> l(t->callbackLock); if (t->cancelled) { + drop(t); if (delay > 500 * TIME_MSEC) { - QPID_LOG(debug, "cancelled Timer woken up " << delay / TIME_MSEC << "ms late"); + QPID_LOG(debug, "cancelled Timer woken up " << delay / TIME_MSEC + << "ms late"); } continue; } else if(Duration(t->nextFireTime, start) >= 0) { Monitor::ScopedUnlock u(monitor); - t->fireTask(); + fire(t); // Warn on callback overrun AbsTime end(AbsTime::now()); Duration overrun(tasks.top()->nextFireTime, end); @@ -169,6 +173,14 @@ void Timer::stop() runner.join(); } +// Allow subclasses to override behavior when firing a task. +void Timer::fire(boost::intrusive_ptr<TimerTask> t) { + t->fireTask(); +} + +// Provided for subclasses: called when a task is droped. +void Timer::drop(boost::intrusive_ptr<TimerTask>) {} + bool operator<(const intrusive_ptr<TimerTask>& a, const intrusive_ptr<TimerTask>& b) { diff --git a/cpp/src/qpid/sys/Timer.h b/cpp/src/qpid/sys/Timer.h index 303d44a299..4a579fe032 100644 --- a/cpp/src/qpid/sys/Timer.h +++ b/cpp/src/qpid/sys/Timer.h @@ -38,10 +38,11 @@ namespace sys { class Timer; class TimerTask : public RefCounted { - friend class Timer; - friend bool operator<(const boost::intrusive_ptr<TimerTask>&, - const boost::intrusive_ptr<TimerTask>&); + friend class Timer; + friend bool operator<(const boost::intrusive_ptr<TimerTask>&, + const boost::intrusive_ptr<TimerTask>&); + std::string name; AbsTime sortTime; Duration period; AbsTime nextFireTime; @@ -51,30 +52,26 @@ class TimerTask : public RefCounted { bool readyToFire() const; void fireTask(); -public: - QPID_COMMON_EXTERN TimerTask(Duration period); - QPID_COMMON_EXTERN TimerTask(AbsTime fireTime); + public: + QPID_COMMON_EXTERN TimerTask(Duration period, const std::string& name=std::string()); + QPID_COMMON_EXTERN TimerTask(AbsTime fireTime, const std::string& name=std::string()); QPID_COMMON_EXTERN virtual ~TimerTask(); QPID_COMMON_EXTERN void setupNextFire(); QPID_COMMON_EXTERN void restart(); QPID_COMMON_EXTERN void cancel(); -protected: + std::string getName() const { return name; } + + protected: // Must be overridden with callback virtual void fire() = 0; }; // For the priority_queue order bool operator<(const boost::intrusive_ptr<TimerTask>& a, - const boost::intrusive_ptr<TimerTask>& b); - -/** - A timer to trigger tasks that are local to one broker. + const boost::intrusive_ptr<TimerTask>& b); - For periodic tasks that should be synchronized across all brokers - in a cluster, use qpid::sys::PeriodicTimer. - */ class Timer : private Runnable { qpid::sys::Monitor monitor; std::priority_queue<boost::intrusive_ptr<TimerTask> > tasks; @@ -84,13 +81,17 @@ class Timer : private Runnable { // Runnable interface void run(); -public: + public: QPID_COMMON_EXTERN Timer(); - QPID_COMMON_EXTERN ~Timer(); + QPID_COMMON_EXTERN virtual ~Timer(); + + QPID_COMMON_EXTERN virtual void add(boost::intrusive_ptr<TimerTask> task); + QPID_COMMON_EXTERN virtual void start(); + QPID_COMMON_EXTERN virtual void stop(); - QPID_COMMON_EXTERN void add(boost::intrusive_ptr<TimerTask> task); - QPID_COMMON_EXTERN void start(); - QPID_COMMON_EXTERN void stop(); + protected: + QPID_COMMON_EXTERN virtual void fire(boost::intrusive_ptr<TimerTask> task); + QPID_COMMON_EXTERN virtual void drop(boost::intrusive_ptr<TimerTask> task); }; diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml index 569cebaf14..33553fe7f8 100644 --- a/cpp/xml/cluster.xml +++ b/cpp/xml/cluster.xml @@ -95,8 +95,12 @@ <field name="frame-seq" type="sequence-no"/> </control> - <!-- Synchronize periodic timer tasks across the cluster --> - <control name="periodic-timer" code="0x15"> + <!-- Synchronize timer tasks across the cluster --> + <control name="timer-wakeup" code="0x15"> + <field name="name" type="str16"/> + </control> + + <control name="timer-drop" code="0x16"> <field name="name" type="str16"/> </control> |