summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-01-27 22:21:28 +0000
committerAlan Conway <aconway@apache.org>2010-01-27 22:21:28 +0000
commit62315ddbbfcd5d41fd674bd2eb62f93d333d9ad4 (patch)
tree1dbb2bd598e496db5779fe420e0dab6d456aa71c /cpp/src
parent863b9e190616873c561a3f468f01e0fc793cd466 (diff)
downloadqpid-python-62315ddbbfcd5d41fd674bd2eb62f93d333d9ad4.tar.gz
QPID_2634 Management updates in timer create inconsistencies in a cluster.
Cluster plugin provides a PeriodicTimer implementation to the broker which executes tasks in the cluster dispatch thread simultaneously across the cluster. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@903869 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/CMakeLists.txt1
-rw-r--r--cpp/src/Makefile.am2
-rw-r--r--cpp/src/qpid/broker/Broker.cpp11
-rw-r--r--cpp/src/qpid/broker/Broker.h8
-rw-r--r--cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp50
-rw-r--r--cpp/src/qpid/broker/DelegatingPeriodicTimer.h57
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp17
-rw-r--r--cpp/src/qpid/cluster/Cluster.h2
-rw-r--r--cpp/src/qpid/cluster/PeriodicTimerImpl.cpp14
9 files changed, 145 insertions, 17 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index fd77e34619..1727de6eb0 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -733,6 +733,7 @@ set (qpidbroker_SOURCES
qpid/broker/Connection.cpp
qpid/broker/ConnectionHandler.cpp
qpid/broker/ConnectionFactory.cpp
+ qpid/broker/DelegatingPeriodicTimer.cpp
qpid/broker/DeliverableMessage.cpp
qpid/broker/DeliveryRecord.cpp
qpid/broker/DirectExchange.cpp
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 5886c53ff9..0e1b330245 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -508,6 +508,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/Consumer.h \
qpid/broker/Daemon.cpp \
qpid/broker/Daemon.h \
+ qpid/broker/DelegatingPeriodicTimer.h \
+ qpid/broker/DelegatingPeriodicTimer.cpp \
qpid/broker/Deliverable.h \
qpid/broker/DeliverableMessage.cpp \
qpid/broker/DeliverableMessage.h \
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index f47b6418bd..cd3b014256 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -137,7 +137,6 @@ const std::string knownHostsNone("none");
Broker::Broker(const Broker::Options& conf) :
poller(new Poller),
- periodicTimer(new sys::PeriodicTimerImpl(timer)),
config(conf),
managementAgent(conf.enableMgmt ? new ManagementAgent() : 0),
store(new NullMessageStore),
@@ -258,6 +257,12 @@ Broker::Broker(const Broker::Options& conf) :
// Initialize plugins
Plugin::initializeAll(*this);
+ if (!periodicTimer.hasDelegate()) {
+ // If no plugin has contributed a PeriodicTimer, use the default one.
+ periodicTimer.setDelegate(
+ std::auto_ptr<sys::PeriodicTimer>(new sys::PeriodicTimerImpl(timer)));
+ }
+
if (conf.queueCleanInterval) {
queueCleaner.start(conf.queueCleanInterval * qpid::sys::TIME_SEC);
}
@@ -469,6 +474,10 @@ Broker::getKnownBrokersImpl()
return knownBrokers;
}
+void Broker::setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer> pt) {
+ periodicTimer.setDelegate(pt);
+}
+
const std::string Broker::TCP_TRANSPORT("tcp");
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 04d62306da..302ef74e3d 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -25,6 +25,7 @@
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/ConnectionFactory.h"
#include "qpid/broker/ConnectionToken.h"
+#include "qpid/broker/DelegatingPeriodicTimer.h"
#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/DtxManager.h"
#include "qpid/broker/ExchangeRegistry.h"
@@ -49,7 +50,6 @@
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Timer.h"
-#include "qpid/sys/PeriodicTimer.h"
#include "qpid/RefCounted.h"
#include "qpid/broker/AclModule.h"
#include "qpid/sys/Mutex.h"
@@ -147,7 +147,7 @@ public:
boost::shared_ptr<sys::Poller> poller;
sys::Timer timer;
- std::auto_ptr<sys::PeriodicTimer> periodicTimer;
+ DelegatingPeriodicTimer periodicTimer;
Options config;
std::auto_ptr<management::ManagementAgent> managementAgent;
ProtocolFactoryMap protocolFactories;
@@ -255,8 +255,8 @@ public:
void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; }
sys::Timer& getTimer() { return timer; }
- sys::PeriodicTimer& getPeriodicTimer() { return *periodicTimer; }
- void setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer> pt) { periodicTimer = pt; }
+ sys::PeriodicTimer& getPeriodicTimer() { return periodicTimer; }
+ void setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer> pt);
boost::function<std::vector<Url> ()> getKnownBrokers;
diff --git a/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp b/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp
new file mode 100644
index 0000000000..111d968543
--- /dev/null
+++ b/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "DelegatingPeriodicTimer.h"
+
+namespace qpid {
+namespace broker {
+
+DelegatingPeriodicTimer::DelegatingPeriodicTimer() {}
+
+void DelegatingPeriodicTimer::add(
+ const Task& task, sys::Duration period, const std::string& taskName)
+{
+ if (delegate.get())
+ delegate->add(task, period, taskName);
+ else {
+ Entry e;
+ e.task = task;
+ e.period = period;
+ e.name = taskName;
+ entries.push_back(e);
+ }
+}
+
+void DelegatingPeriodicTimer::setDelegate(std::auto_ptr<PeriodicTimer> impl) {
+ assert(impl.get());
+ assert(!delegate.get());
+ delegate = impl;
+ for (Entries::iterator i = entries.begin(); i != entries.end(); ++i)
+ delegate->add(i->task, i->period, i->name);
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/DelegatingPeriodicTimer.h b/cpp/src/qpid/broker/DelegatingPeriodicTimer.h
new file mode 100644
index 0000000000..5186f41c3e
--- /dev/null
+++ b/cpp/src/qpid/broker/DelegatingPeriodicTimer.h
@@ -0,0 +1,57 @@
+#ifndef QPID_BROKER_PERIODICTIMER_H
+#define QPID_BROKER_PERIODICTIMER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/PeriodicTimer.h"
+#include <vector>
+#include <memory>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * A PeriodicTimer implementation that delegates to another PeriodicTimer.
+ *
+ * Tasks added while there is no delegate timer are stored.
+ * When a delgate timer is set, stored tasks are added to it.
+ */
+class DelegatingPeriodicTimer : public sys::PeriodicTimer
+{
+ public:
+ DelegatingPeriodicTimer();
+ /** Add a task: if no delegate, store it. When delegate set, add stored tasks */
+ void add(const Task& task, sys::Duration period, const std::string& taskName);
+ /** Set the delegate, transfers ownership of delegate. */
+ void setDelegate(std::auto_ptr<PeriodicTimer> delegate);
+ bool hasDelegate() { return delegate.get(); }
+ private:
+ struct Entry { Task task; sys::Duration period; std::string name; };
+ typedef std::vector<Entry> Entries;
+ std::auto_ptr<PeriodicTimer> delegate;
+ Entries entries;
+
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_PERIODICTIMER_H*/
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 44b95b75b8..0d0fb7bcee 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -250,9 +250,10 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
updateRetracted(false),
error(*this)
{
- // FIXME aconway 2010-01-26: must be done before management registers with timer.
- broker.setPeriodicTimer(
- std::auto_ptr<sys::PeriodicTimer>(new PeriodicTimerImpl(*this)));
+ // We give ownership of the timer to the broker and keep a plain pointer.
+ // This is OK as it means the timer has the same lifetime as the broker.
+ timer = new PeriodicTimerImpl(*this);
+ broker.setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer>(timer));
mAgent = broker.getManagementAgent();
if (mAgent != 0){
@@ -448,8 +449,8 @@ void Cluster::flagError(
// Handler for deliverFrameQueue.
// This thread executes the main logic.
void Cluster::deliveredFrame(const EventFrame& efConst) {
- sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts.
Mutex::ScopedLock l(lock);
+ sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts.
if (state == LEFT) return;
EventFrame e(efConst);
const ClusterUpdateOfferBody* offer = castUpdateOffer(e.frame.getBody());
@@ -961,13 +962,13 @@ void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNu
error.respondNone(from, type, frameSeq);
}
-void Cluster::periodicTimer(const MemberId&, const std::string& , Lock&) {
- // FIXME aconway 2010-01-26:
+void Cluster::periodicTimer(const MemberId&, const std::string& name, Lock&) {
+ timer->deliver(name);
}
bool Cluster::isElder() const {
- Mutex::ScopedLock l(lock);
- return elders.empty();
+ Monitor::ScopedLock l(lock);
+ return state >= CATCHUP && elders.empty();
}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 39c440723f..08911081ea 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -63,6 +63,7 @@ namespace cluster {
class Connection;
class EventFrame;
+class PeriodicTimerImpl;
/**
* Connection to the cluster
@@ -271,6 +272,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
bool updateRetracted;
ErrorCheck error;
UpdateReceiver updateReceiver;
+ PeriodicTimerImpl* timer;
friend std::ostream& operator<<(std::ostream&, const Cluster&);
friend class ClusterDispatcher;
diff --git a/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp b/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp
index 0d6cac8cc9..ced34b572d 100644
--- a/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp
+++ b/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp
@@ -32,20 +32,24 @@ PeriodicTimerImpl::TaskEntry::TaskEntry(
Cluster& c, const Task& t, sys::Duration d, const std::string& n)
: TimerTask(d), cluster(c), timer(c.getBroker().getTimer()),
task(t), name(n), inFlight(false)
-{}
+{
+ timer.add(this);
+}
void PeriodicTimerImpl::TaskEntry::fire() {
+ setupNextFire();
+ timer.add(this);
+ bool isElder = cluster.isElder(); // Call outside lock to avoid deadlock.
sys::Mutex::ScopedLock l(lock);
// Only the elder mcasts.
// Don't mcast another if we haven't yet received the last one.
- if (cluster.isElder() && !inFlight) {
+ if (isElder && !inFlight) {
+ QPID_LOG(trace, "Sending periodic-timer control for " << name);
inFlight = true;
cluster.getMulticast().mcastControl(
framing::ClusterPeriodicTimerBody(framing::ProtocolVersion(), name),
cluster.getId());
}
- setupNextFire();
- timer.add(this);
}
void PeriodicTimerImpl::TaskEntry::deliver() {
@@ -59,6 +63,7 @@ void PeriodicTimerImpl::add(
const Task& task, sys::Duration period, const std::string& name)
{
sys::Mutex::ScopedLock l(lock);
+ QPID_LOG(debug, "Periodic timer add entry for " << name);
if (map.find(name) != map.end())
throw Exception(QPID_MSG("Cluster timer task name added twice: " << name));
map[name] = new TaskEntry(cluster, task, period, name);
@@ -72,6 +77,7 @@ void PeriodicTimerImpl::deliver(const std::string& name) {
if (i == map.end())
throw Exception(QPID_MSG("Cluster timer unknown task: " << name));
}
+ QPID_LOG(debug, "Periodic timer execute " << name);
i->second->deliver();
}