diff options
author | Alan Conway <aconway@apache.org> | 2010-01-27 22:21:28 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-01-27 22:21:28 +0000 |
commit | 62315ddbbfcd5d41fd674bd2eb62f93d333d9ad4 (patch) | |
tree | 1dbb2bd598e496db5779fe420e0dab6d456aa71c /cpp/src | |
parent | 863b9e190616873c561a3f468f01e0fc793cd466 (diff) | |
download | qpid-python-62315ddbbfcd5d41fd674bd2eb62f93d333d9ad4.tar.gz |
QPID_2634 Management updates in timer create inconsistencies in a cluster.
Cluster plugin provides a PeriodicTimer implementation to the broker
which executes tasks in the cluster dispatch thread simultaneously
across the cluster.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@903869 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/CMakeLists.txt | 1 | ||||
-rw-r--r-- | cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp | 50 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DelegatingPeriodicTimer.h | 57 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 17 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/PeriodicTimerImpl.cpp | 14 |
9 files changed, 145 insertions, 17 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index fd77e34619..1727de6eb0 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -733,6 +733,7 @@ set (qpidbroker_SOURCES qpid/broker/Connection.cpp qpid/broker/ConnectionHandler.cpp qpid/broker/ConnectionFactory.cpp + qpid/broker/DelegatingPeriodicTimer.cpp qpid/broker/DeliverableMessage.cpp qpid/broker/DeliveryRecord.cpp qpid/broker/DirectExchange.cpp diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 5886c53ff9..0e1b330245 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -508,6 +508,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/Consumer.h \ qpid/broker/Daemon.cpp \ qpid/broker/Daemon.h \ + qpid/broker/DelegatingPeriodicTimer.h \ + qpid/broker/DelegatingPeriodicTimer.cpp \ qpid/broker/Deliverable.h \ qpid/broker/DeliverableMessage.cpp \ qpid/broker/DeliverableMessage.h \ diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index f47b6418bd..cd3b014256 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -137,7 +137,6 @@ const std::string knownHostsNone("none"); Broker::Broker(const Broker::Options& conf) : poller(new Poller), - periodicTimer(new sys::PeriodicTimerImpl(timer)), config(conf), managementAgent(conf.enableMgmt ? new ManagementAgent() : 0), store(new NullMessageStore), @@ -258,6 +257,12 @@ Broker::Broker(const Broker::Options& conf) : // Initialize plugins Plugin::initializeAll(*this); + if (!periodicTimer.hasDelegate()) { + // If no plugin has contributed a PeriodicTimer, use the default one. + periodicTimer.setDelegate( + std::auto_ptr<sys::PeriodicTimer>(new sys::PeriodicTimerImpl(timer))); + } + if (conf.queueCleanInterval) { queueCleaner.start(conf.queueCleanInterval * qpid::sys::TIME_SEC); } @@ -469,6 +474,10 @@ Broker::getKnownBrokersImpl() return knownBrokers; } +void Broker::setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer> pt) { + periodicTimer.setDelegate(pt); +} + const std::string Broker::TCP_TRANSPORT("tcp"); }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 04d62306da..302ef74e3d 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -25,6 +25,7 @@ #include "qpid/broker/BrokerImportExport.h" #include "qpid/broker/ConnectionFactory.h" #include "qpid/broker/ConnectionToken.h" +#include "qpid/broker/DelegatingPeriodicTimer.h" #include "qpid/broker/DirectExchange.h" #include "qpid/broker/DtxManager.h" #include "qpid/broker/ExchangeRegistry.h" @@ -49,7 +50,6 @@ #include "qpid/framing/ProtocolInitiation.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Timer.h" -#include "qpid/sys/PeriodicTimer.h" #include "qpid/RefCounted.h" #include "qpid/broker/AclModule.h" #include "qpid/sys/Mutex.h" @@ -147,7 +147,7 @@ public: boost::shared_ptr<sys::Poller> poller; sys::Timer timer; - std::auto_ptr<sys::PeriodicTimer> periodicTimer; + DelegatingPeriodicTimer periodicTimer; Options config; std::auto_ptr<management::ManagementAgent> managementAgent; ProtocolFactoryMap protocolFactories; @@ -255,8 +255,8 @@ public: void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; } sys::Timer& getTimer() { return timer; } - sys::PeriodicTimer& getPeriodicTimer() { return *periodicTimer; } - void setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer> pt) { periodicTimer = pt; } + sys::PeriodicTimer& getPeriodicTimer() { return periodicTimer; } + void setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer> pt); boost::function<std::vector<Url> ()> getKnownBrokers; diff --git a/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp b/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp new file mode 100644 index 0000000000..111d968543 --- /dev/null +++ b/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "DelegatingPeriodicTimer.h" + +namespace qpid { +namespace broker { + +DelegatingPeriodicTimer::DelegatingPeriodicTimer() {} + +void DelegatingPeriodicTimer::add( + const Task& task, sys::Duration period, const std::string& taskName) +{ + if (delegate.get()) + delegate->add(task, period, taskName); + else { + Entry e; + e.task = task; + e.period = period; + e.name = taskName; + entries.push_back(e); + } +} + +void DelegatingPeriodicTimer::setDelegate(std::auto_ptr<PeriodicTimer> impl) { + assert(impl.get()); + assert(!delegate.get()); + delegate = impl; + for (Entries::iterator i = entries.begin(); i != entries.end(); ++i) + delegate->add(i->task, i->period, i->name); +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/DelegatingPeriodicTimer.h b/cpp/src/qpid/broker/DelegatingPeriodicTimer.h new file mode 100644 index 0000000000..5186f41c3e --- /dev/null +++ b/cpp/src/qpid/broker/DelegatingPeriodicTimer.h @@ -0,0 +1,57 @@ +#ifndef QPID_BROKER_PERIODICTIMER_H +#define QPID_BROKER_PERIODICTIMER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/PeriodicTimer.h" +#include <vector> +#include <memory> + +namespace qpid { +namespace broker { + +/** + * A PeriodicTimer implementation that delegates to another PeriodicTimer. + * + * Tasks added while there is no delegate timer are stored. + * When a delgate timer is set, stored tasks are added to it. + */ +class DelegatingPeriodicTimer : public sys::PeriodicTimer +{ + public: + DelegatingPeriodicTimer(); + /** Add a task: if no delegate, store it. When delegate set, add stored tasks */ + void add(const Task& task, sys::Duration period, const std::string& taskName); + /** Set the delegate, transfers ownership of delegate. */ + void setDelegate(std::auto_ptr<PeriodicTimer> delegate); + bool hasDelegate() { return delegate.get(); } + private: + struct Entry { Task task; sys::Duration period; std::string name; }; + typedef std::vector<Entry> Entries; + std::auto_ptr<PeriodicTimer> delegate; + Entries entries; + +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_PERIODICTIMER_H*/ diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 44b95b75b8..0d0fb7bcee 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -250,9 +250,10 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : updateRetracted(false), error(*this) { - // FIXME aconway 2010-01-26: must be done before management registers with timer. - broker.setPeriodicTimer( - std::auto_ptr<sys::PeriodicTimer>(new PeriodicTimerImpl(*this))); + // We give ownership of the timer to the broker and keep a plain pointer. + // This is OK as it means the timer has the same lifetime as the broker. + timer = new PeriodicTimerImpl(*this); + broker.setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer>(timer)); mAgent = broker.getManagementAgent(); if (mAgent != 0){ @@ -448,8 +449,8 @@ void Cluster::flagError( // Handler for deliverFrameQueue. // This thread executes the main logic. void Cluster::deliveredFrame(const EventFrame& efConst) { - sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts. Mutex::ScopedLock l(lock); + sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts. if (state == LEFT) return; EventFrame e(efConst); const ClusterUpdateOfferBody* offer = castUpdateOffer(e.frame.getBody()); @@ -961,13 +962,13 @@ void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNu error.respondNone(from, type, frameSeq); } -void Cluster::periodicTimer(const MemberId&, const std::string& , Lock&) { - // FIXME aconway 2010-01-26: +void Cluster::periodicTimer(const MemberId&, const std::string& name, Lock&) { + timer->deliver(name); } bool Cluster::isElder() const { - Mutex::ScopedLock l(lock); - return elders.empty(); + Monitor::ScopedLock l(lock); + return state >= CATCHUP && elders.empty(); } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 39c440723f..08911081ea 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -63,6 +63,7 @@ namespace cluster { class Connection; class EventFrame; +class PeriodicTimerImpl; /** * Connection to the cluster @@ -271,6 +272,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { bool updateRetracted; ErrorCheck error; UpdateReceiver updateReceiver; + PeriodicTimerImpl* timer; friend std::ostream& operator<<(std::ostream&, const Cluster&); friend class ClusterDispatcher; diff --git a/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp b/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp index 0d6cac8cc9..ced34b572d 100644 --- a/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp +++ b/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp @@ -32,20 +32,24 @@ PeriodicTimerImpl::TaskEntry::TaskEntry( Cluster& c, const Task& t, sys::Duration d, const std::string& n) : TimerTask(d), cluster(c), timer(c.getBroker().getTimer()), task(t), name(n), inFlight(false) -{} +{ + timer.add(this); +} void PeriodicTimerImpl::TaskEntry::fire() { + setupNextFire(); + timer.add(this); + bool isElder = cluster.isElder(); // Call outside lock to avoid deadlock. sys::Mutex::ScopedLock l(lock); // Only the elder mcasts. // Don't mcast another if we haven't yet received the last one. - if (cluster.isElder() && !inFlight) { + if (isElder && !inFlight) { + QPID_LOG(trace, "Sending periodic-timer control for " << name); inFlight = true; cluster.getMulticast().mcastControl( framing::ClusterPeriodicTimerBody(framing::ProtocolVersion(), name), cluster.getId()); } - setupNextFire(); - timer.add(this); } void PeriodicTimerImpl::TaskEntry::deliver() { @@ -59,6 +63,7 @@ void PeriodicTimerImpl::add( const Task& task, sys::Duration period, const std::string& name) { sys::Mutex::ScopedLock l(lock); + QPID_LOG(debug, "Periodic timer add entry for " << name); if (map.find(name) != map.end()) throw Exception(QPID_MSG("Cluster timer task name added twice: " << name)); map[name] = new TaskEntry(cluster, task, period, name); @@ -72,6 +77,7 @@ void PeriodicTimerImpl::deliver(const std::string& name) { if (i == map.end()) throw Exception(QPID_MSG("Cluster timer unknown task: " << name)); } + QPID_LOG(debug, "Periodic timer execute " << name); i->second->deliver(); } |