summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-03-30 21:48:50 +0000
committerTed Ross <tross@apache.org>2010-03-30 21:48:50 +0000
commit8554740df378860da8e2124ea481e464208c4377 (patch)
treefa274e8eea9e37abb511933a613721d1f53036e3
parent153da67fa601a2f20da496b1982469f67ba7ea0a (diff)
downloadqpid-python-qmf-devel0.7a.tar.gz
Merged Alan's cluster changes from trunk.qmf-devel0.7a
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@929314 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp50
-rw-r--r--qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.h57
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp80
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h6
-rw-r--r--qpid/cpp/src/qpid/cluster/ErrorCheck.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp84
-rw-r--r--qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.h72
-rw-r--r--qpid/cpp/src/qpid/cluster/PollableQueue.h5
-rw-r--r--qpid/cpp/src/qpid/cluster/StoreStatus.h3
-rw-r--r--qpid/cpp/src/qpid/sys/PeriodicTimer.h56
-rw-r--r--qpid/cpp/src/qpid/sys/PeriodicTimerImpl.cpp49
-rw-r--r--qpid/cpp/src/qpid/sys/PeriodicTimerImpl.h48
-rw-r--r--qpid/cpp/src/qpid/sys/PollableQueue.h2
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py7
-rw-r--r--qpid/cpp/src/tests/failover_soak.cpp4
-rw-r--r--qpid/cpp/xml/cluster.xml4
16 files changed, 66 insertions, 463 deletions
diff --git a/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp b/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp
deleted file mode 100644
index 111d968543..0000000000
--- a/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "DelegatingPeriodicTimer.h"
-
-namespace qpid {
-namespace broker {
-
-DelegatingPeriodicTimer::DelegatingPeriodicTimer() {}
-
-void DelegatingPeriodicTimer::add(
- const Task& task, sys::Duration period, const std::string& taskName)
-{
- if (delegate.get())
- delegate->add(task, period, taskName);
- else {
- Entry e;
- e.task = task;
- e.period = period;
- e.name = taskName;
- entries.push_back(e);
- }
-}
-
-void DelegatingPeriodicTimer::setDelegate(std::auto_ptr<PeriodicTimer> impl) {
- assert(impl.get());
- assert(!delegate.get());
- delegate = impl;
- for (Entries::iterator i = entries.begin(); i != entries.end(); ++i)
- delegate->add(i->task, i->period, i->name);
-}
-
-}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.h b/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.h
deleted file mode 100644
index 5186f41c3e..0000000000
--- a/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.h
+++ /dev/null
@@ -1,57 +0,0 @@
-#ifndef QPID_BROKER_PERIODICTIMER_H
-#define QPID_BROKER_PERIODICTIMER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/PeriodicTimer.h"
-#include <vector>
-#include <memory>
-
-namespace qpid {
-namespace broker {
-
-/**
- * A PeriodicTimer implementation that delegates to another PeriodicTimer.
- *
- * Tasks added while there is no delegate timer are stored.
- * When a delgate timer is set, stored tasks are added to it.
- */
-class DelegatingPeriodicTimer : public sys::PeriodicTimer
-{
- public:
- DelegatingPeriodicTimer();
- /** Add a task: if no delegate, store it. When delegate set, add stored tasks */
- void add(const Task& task, sys::Duration period, const std::string& taskName);
- /** Set the delegate, transfers ownership of delegate. */
- void setDelegate(std::auto_ptr<PeriodicTimer> delegate);
- bool hasDelegate() { return delegate.get(); }
- private:
- struct Entry { Task task; sys::Duration period; std::string name; };
- typedef std::vector<Entry> Entries;
- std::auto_ptr<PeriodicTimer> delegate;
- Entries entries;
-
-};
-
-}} // namespace qpid::broker
-
-#endif /*!QPID_BROKER_PERIODICTIMER_H*/
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 6bb597d21f..858900be9e 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -217,8 +217,11 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void ready(const std::string& url) {
cluster.ready(member, url, l);
}
- void configChange(const std::string& current) {
- cluster.configChange(member, current, l);
+ void configChange(const std::string& members,
+ const std::string& left,
+ const std::string& joined)
+ {
+ cluster.configChange(member, members, left, joined, l);
}
void updateOffer(uint64_t updatee) {
cluster.updateOffer(member, updatee, l);
@@ -316,7 +319,9 @@ void Cluster::initialize() {
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
broker.setExpiryPolicy(expiryPolicy);
dispatcher.start();
+ deliverEventQueue.bypassOff();
deliverEventQueue.start();
+ deliverFrameQueue.bypassOff();
deliverFrameQueue.start();
mcast.start();
@@ -554,40 +559,28 @@ Cluster::ConnectionVector Cluster::getConnections(Lock&) {
boost::bind(&ConnectionMap::value_type::second, _1));
return result;
}
-
-struct AddrList {
- const cpg_address* addrs;
- int count;
- const char *prefix;
- AddrList(const cpg_address* a, int n, const char* p="")
- : addrs(a), count(n), prefix(p) {}
-};
-
-ostream& operator<<(ostream& o, const AddrList& a) {
- if (!a.count) return o;
- o << a.prefix;
- for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p)
- o << qpid::cluster::MemberId(*p) << " ";
- return o;
-}
+// CPG config-change callback.
void Cluster::configChange (
cpg_handle_t /*handle*/,
const cpg_name */*group*/,
- const cpg_address *current, int nCurrent,
+ const cpg_address *members, int nMembers,
const cpg_address *left, int nLeft,
const cpg_address *joined, int nJoined)
{
Mutex::ScopedLock l(lock);
- QPID_LOG(notice, *this << " membership change: "
- << AddrList(current, nCurrent) << "("
- << AddrList(joined, nJoined, "joined: ")
- << AddrList(left, nLeft, "left: ")
- << ")");
- string addresses;
- for (const cpg_address* p = current; p < current+nCurrent; ++p)
- addresses.append(MemberId(*p).str());
- deliverEvent(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self));
+ string membersStr, leftStr, joinedStr;
+ // Encode members and enqueue as an event so the config change can
+ // be executed in the correct thread.
+ for (const cpg_address* p = members; p < members+nMembers; ++p)
+ membersStr.append(MemberId(*p).str());
+ for (const cpg_address* p = left; p < left+nLeft; ++p)
+ leftStr.append(MemberId(*p).str());
+ for (const cpg_address* p = joined; p < joined+nJoined; ++p)
+ joinedStr.append(MemberId(*p).str());
+ deliverEvent(Event::control(ClusterConfigChangeBody(
+ ProtocolVersion(), membersStr, leftStr, joinedStr),
+ self));
}
void Cluster::setReady(Lock&) {
@@ -606,6 +599,7 @@ void Cluster::initMapCompleted(Lock& l) {
// We decide here whether we want to recover from our store.
// We won't recover if we are joining an active cluster or our store is dirty.
if (store.hasStore() &&
+ store.getState() != STORE_STATE_EMPTY_STORE &&
(initMap.isActive() || store.getState() == STORE_STATE_DIRTY_STORE))
broker.setRecovery(false); // Ditch my current store.
state = INIT;
@@ -654,22 +648,33 @@ void Cluster::initMapCompleted(Lock& l) {
}
}
-void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& l) {
+void Cluster::configChange(const MemberId&,
+ const std::string& membersStr,
+ const std::string& leftStr,
+ const std::string& joinedStr,
+ Lock& l)
+{
if (state == LEFT) return;
+ MemberSet members = decodeMemberSet(membersStr);
+ MemberSet left = decodeMemberSet(leftStr);
+ MemberSet joined = decodeMemberSet(joinedStr);
+ QPID_LOG(notice, *this << " Membership update " << map.getConfigSeq() << ": "
+ << members);
+ QPID_LOG_IF(notice, !left.empty(), *this << " Members left: " << left);
+ QPID_LOG_IF(notice, !joined.empty(), *this << " Members joined: " << joined);
- MemberSet config = decodeMemberSet(configStr);
- elders = intersection(elders, config);
+ // Update initital status for members joining or leaving.
+ elders = intersection(elders, members);
if (elders.empty() && INIT < state && state < CATCHUP) {
QPID_LOG(critical, "Cannot update, all potential updaters left the cluster.");
leave(l);
return;
}
- bool memberChange = map.configChange(config);
- QPID_LOG(debug, "Config sequence " << map.getConfigSeq());
+ bool memberChange = map.configChange(members);
store.setConfigSeq(map.getConfigSeq());
// Update initital status for members joining or leaving.
- initMap.configChange(config);
+ initMap.configChange(members);
if (initMap.isResendNeeded()) {
mcast.mcastControl(
ClusterInitialStatusBody(
@@ -965,8 +970,11 @@ void Cluster::memberUpdate(Lock& l) {
if (store.hasStore()) {
// Mark store clean if I am the only broker, dirty otherwise.
- if (size == 1) store.clean(Uuid(true));
- else store.dirty(clusterId);
+ if (size == 1 ) {
+ if (!store.isClean()) store.clean(Uuid(true));
+ } else {
+ if (!store.isDirty()) store.dirty(clusterId);
+ }
}
if (size == 1 && lastSize > 1 && state >= CATCHUP) {
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index 4a64ad73d6..343a66428b 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -163,7 +163,11 @@ class Cluster : private Cpg::Handler, public management::Manageable {
const std::string& firstConfig,
Lock&);
void ready(const MemberId&, const std::string&, Lock&);
- void configChange(const MemberId&, const std::string& current, Lock& l);
+ void configChange(const MemberId&,
+ const std::string& members,
+ const std::string& left,
+ const std::string& joined,
+ Lock& l);
void messageExpired(const MemberId&, uint64_t, Lock& l);
void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&);
void timerWakeup(const MemberId&, const std::string& name, Lock&);
diff --git a/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
index d66db8551d..be671c0f48 100644
--- a/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
+++ b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
@@ -110,7 +110,7 @@ ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator&
const ClusterConfigChangeBody* configChange =
static_cast<const ClusterConfigChangeBody*>(method);
if (configChange) {
- MemberSet members(decodeMemberSet(configChange->getCurrent()));
+ MemberSet members(decodeMemberSet(configChange->getMembers()));
QPID_LOG(debug, cluster << " apply config change to error "
<< frameSeq << ": " << members);
MemberSet intersect;
diff --git a/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp b/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp
deleted file mode 100644
index ced34b572d..0000000000
--- a/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "PeriodicTimerImpl.h"
-#include "Cluster.h"
-#include "qpid/framing/ProtocolVersion.h"
-#include "qpid/framing/ClusterPeriodicTimerBody.h"
-
-namespace qpid {
-namespace cluster {
-
-PeriodicTimerImpl::PeriodicTimerImpl(Cluster& c) : cluster(c) {}
-
-PeriodicTimerImpl::TaskEntry::TaskEntry(
- Cluster& c, const Task& t, sys::Duration d, const std::string& n)
- : TimerTask(d), cluster(c), timer(c.getBroker().getTimer()),
- task(t), name(n), inFlight(false)
-{
- timer.add(this);
-}
-
-void PeriodicTimerImpl::TaskEntry::fire() {
- setupNextFire();
- timer.add(this);
- bool isElder = cluster.isElder(); // Call outside lock to avoid deadlock.
- sys::Mutex::ScopedLock l(lock);
- // Only the elder mcasts.
- // Don't mcast another if we haven't yet received the last one.
- if (isElder && !inFlight) {
- QPID_LOG(trace, "Sending periodic-timer control for " << name);
- inFlight = true;
- cluster.getMulticast().mcastControl(
- framing::ClusterPeriodicTimerBody(framing::ProtocolVersion(), name),
- cluster.getId());
- }
-}
-
-void PeriodicTimerImpl::TaskEntry::deliver() {
- task();
- sys::Mutex::ScopedLock l(lock);
- inFlight = false;
-}
-
-
-void PeriodicTimerImpl::add(
- const Task& task, sys::Duration period, const std::string& name)
-{
- sys::Mutex::ScopedLock l(lock);
- QPID_LOG(debug, "Periodic timer add entry for " << name);
- if (map.find(name) != map.end())
- throw Exception(QPID_MSG("Cluster timer task name added twice: " << name));
- map[name] = new TaskEntry(cluster, task, period, name);
-}
-
-void PeriodicTimerImpl::deliver(const std::string& name) {
- Map::iterator i;
- {
- sys::Mutex::ScopedLock l(lock);
- i = map.find(name);
- if (i == map.end())
- throw Exception(QPID_MSG("Cluster timer unknown task: " << name));
- }
- QPID_LOG(debug, "Periodic timer execute " << name);
- i->second->deliver();
-}
-
-}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.h b/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.h
deleted file mode 100644
index cb0034bcef..0000000000
--- a/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.h
+++ /dev/null
@@ -1,72 +0,0 @@
-#ifndef QPID_CLUSTER_PERIODICTIMERIMPL_H
-#define QPID_CLUSTER_PERIODICTIMERIMPL_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/PeriodicTimer.h"
-#include "qpid/sys/Mutex.h"
-#include <map>
-
-namespace qpid {
-namespace cluster {
-
-class Cluster;
-
-/**
- * Cluster implementation of PeriodicTimer.
- *
- * All members run a periodic task, elder mcasts periodic-timer control.
- * Actual task is executed on delivery of periodic-timer.
- */
-class PeriodicTimerImpl : public sys::PeriodicTimer
-{
- public:
- PeriodicTimerImpl(Cluster& cluster);
- void add(const Task& task, sys::Duration period, const std::string& taskName);
- void deliver(const std::string& name);
-
- private:
-
- class TaskEntry : public sys::TimerTask {
- public:
- TaskEntry(Cluster&, const Task&, sys::Duration period, const std::string& name);
- void fire();
- void deliver();
- private:
- sys::Mutex lock;
- Cluster& cluster;
- sys::Timer& timer;
- Task task;
- std::string name;
- bool inFlight;
- };
-
- typedef std::map<std::string, boost::intrusive_ptr<TaskEntry> > Map;
- struct TaskImpl;
-
- sys::Mutex lock;
- Map map;
- Cluster& cluster;
-};
-}} // namespace qpid::cluster
-
-#endif /*!QPID_CLUSTER_PERIODICTIMER_H*/
diff --git a/qpid/cpp/src/qpid/cluster/PollableQueue.h b/qpid/cpp/src/qpid/cluster/PollableQueue.h
index 59d0bcd36a..10e2ed6ac3 100644
--- a/qpid/cpp/src/qpid/cluster/PollableQueue.h
+++ b/qpid/cpp/src/qpid/cluster/PollableQueue.h
@@ -74,10 +74,7 @@ template <class T> class PollableQueue : public sys::PollableQueue<T> {
else sys::PollableQueue<T>::push(t);
}
- void start() {
- bypass = false;
- sys::PollableQueue<T>::start();
- }
+ void bypassOff() { bypass = false; }
private:
Callback callback;
diff --git a/qpid/cpp/src/qpid/cluster/StoreStatus.h b/qpid/cpp/src/qpid/cluster/StoreStatus.h
index 2371f0424e..b496fe0dc2 100644
--- a/qpid/cpp/src/qpid/cluster/StoreStatus.h
+++ b/qpid/cpp/src/qpid/cluster/StoreStatus.h
@@ -42,6 +42,9 @@ class StoreStatus
StoreStatus(const std::string& dir);
framing::cluster::StoreState getState() const { return state; }
+ bool isClean() { return state == framing::cluster::STORE_STATE_CLEAN_STORE; }
+ bool isDirty() { return state == framing::cluster::STORE_STATE_DIRTY_STORE; }
+
const Uuid& getClusterId() const { return clusterId; }
const Uuid& getShutdownId() const { return shutdownId; }
framing::SequenceNumber getConfigSeq() const { return configSeq; }
diff --git a/qpid/cpp/src/qpid/sys/PeriodicTimer.h b/qpid/cpp/src/qpid/sys/PeriodicTimer.h
deleted file mode 100644
index 290ffb6218..0000000000
--- a/qpid/cpp/src/qpid/sys/PeriodicTimer.h
+++ /dev/null
@@ -1,56 +0,0 @@
-#ifndef QPID_SYS_PERIODICTIMER_H
-#define QPID_SYS_PERIODICTIMER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "Timer.h"
-#include <boost/function.hpp>
-
-namespace qpid {
-namespace sys {
-
-/**
- * Interface of a timer for periodic tasks that should be synchronized
- * across all brokers in a periodic. The standalone broker
- * implementation simply wraps qpid::sys::Timer. The clustered broker
- * implementation synchronizes execution of periodic tasks on all
- * periodic members.
- */
-class PeriodicTimer
-{
- public:
- typedef boost::function<void()> Task;
-
- QPID_COMMON_EXTERN virtual ~PeriodicTimer() {}
-
- /**
- * Add a named task to be executed at the given period.
- *
- * The task registered under the same name will be executed on
- * all brokers at the given period.
- */
- virtual void add(const Task& task, Duration period, const std::string& taskName) = 0;
-
-};
-}} // namespace qpid::sys
-
-#endif /*!QPID_SYS_PERIODICTIMER_H*/
diff --git a/qpid/cpp/src/qpid/sys/PeriodicTimerImpl.cpp b/qpid/cpp/src/qpid/sys/PeriodicTimerImpl.cpp
deleted file mode 100644
index e2a6dccf3e..0000000000
--- a/qpid/cpp/src/qpid/sys/PeriodicTimerImpl.cpp
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "PeriodicTimerImpl.h"
-
-namespace qpid {
-namespace sys {
-
-PeriodicTimerImpl::PeriodicTimerImpl(Timer& t) : timer(t) {}
-
-struct PeriodicTimerImpl::TaskImpl : public TimerTask {
- Timer& timer;
- Task task;
-
- TaskImpl(Timer& timer_, const Task& task_, Duration period) :
- TimerTask(period), timer(timer_), task(task_) {}
-
- void fire() {
- task();
- setupNextFire();
- timer.add(this);
- }
-};
-
-void PeriodicTimerImpl::add(
- const Task& task, Duration period, const std::string& /*taskName*/
-)
-{
- timer.add(new TaskImpl(timer, task, period));
-}
-
-}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/PeriodicTimerImpl.h b/qpid/cpp/src/qpid/sys/PeriodicTimerImpl.h
deleted file mode 100644
index 3fc9396f01..0000000000
--- a/qpid/cpp/src/qpid/sys/PeriodicTimerImpl.h
+++ /dev/null
@@ -1,48 +0,0 @@
-#ifndef QPID_SYS_PERIODICTIMERIMPL_H
-#define QPID_SYS_PERIODICTIMERIMPL_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "PeriodicTimer.h"
-#include "qpid/CommonImportExport.h"
-
-namespace qpid {
-namespace sys {
-
-/**
- * Standalone broker implementation of PeriodicTimer.
- */
-class PeriodicTimerImpl : public PeriodicTimer
-{
- public:
- QPID_COMMON_EXTERN PeriodicTimerImpl(Timer& timer);
- QPID_COMMON_EXTERN void add(const Task& task,
- Duration period,
- const std::string& taskName);
-
- private:
- struct TaskImpl;
- Timer& timer;
-};
-}} // namespace qpid::sys
-
-#endif /*!QPID_SYS_PERIODICTIMER_H*/
diff --git a/qpid/cpp/src/qpid/sys/PollableQueue.h b/qpid/cpp/src/qpid/sys/PollableQueue.h
index 0786b21610..cb8c126fe6 100644
--- a/qpid/cpp/src/qpid/sys/PollableQueue.h
+++ b/qpid/cpp/src/qpid/sys/PollableQueue.h
@@ -126,7 +126,7 @@ template <class T> PollableQueue<T>::~PollableQueue() {
template <class T> void PollableQueue<T>::push(const T& t) {
ScopedLock l(lock);
- if (queue.empty()) condition.set();
+ if (queue.empty() && !stopped) condition.set();
queue.push_back(t);
}
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index fe020786dc..0c4b0350a7 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -404,12 +404,17 @@ class StoreTests(BrokerTest):
c = cluster.start("c", expect=EXPECT_EXIT_FAIL)
a.send_message("q", Message("x", durable=True))
a.kill()
+ # FIXME aconway 2010-03-29: this test has too many sleeps.
+ # Need to tighten up status persistence to be more atomic and less
+ # prone to interruption.
+ time.sleep(0.1) # pause for b to update status.
b.kill() # c is last man
time.sleep(0.1) # pause for c to find out hes last.
a = cluster.start("a", expect=EXPECT_EXIT_FAIL) # c no longer last man
+ time.sleep(0.1) # pause for c to find out hes no longer last.
c.kill() # a is now last man
time.sleep(0.1) # pause for a to find out hes last.
- a.kill() # really last
+ a.kill() # really last, should be clean.
# b & c should be dirty
b = cluster.start("b", wait=False, expect=EXPECT_EXIT_FAIL)
self.assert_dirty_store(b)
diff --git a/qpid/cpp/src/tests/failover_soak.cpp b/qpid/cpp/src/tests/failover_soak.cpp
index 8bf6eca9e6..cd7aaa6614 100644
--- a/qpid/cpp/src/tests/failover_soak.cpp
+++ b/qpid/cpp/src/tests/failover_soak.cpp
@@ -337,7 +337,6 @@ startNewBroker ( brokerVector & brokers,
int verbosity,
int durable )
{
- // ("--log-enable=notice+")
static int brokerId = 0;
stringstream path, prefix;
prefix << "soak-" << brokerId;
@@ -348,7 +347,8 @@ startNewBroker ( brokerVector & brokers,
("--mgmt-enable=no")
("--log-prefix")(prefix.str())
("--log-to-file")(prefix.str()+".log")
- ("--log-enable=notice+")
+ ("--log-enable=info+")
+ ("--log-enable=debug+:cluster")
("TMP_DATA_DIR");
if (endsWith(moduleOrDir, "cluster.so")) {
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 8f94eb3fd1..c4750a3195 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -73,7 +73,9 @@
</control>
<control name="config-change" code="0x11" label="Raw cluster membership.">
- <field name="current" type="vbin16"/> <!-- packed member-id array -->
+ <field name="members" type="vbin16"/> <!-- packed member-id array -->
+ <field name="joined" type="vbin16"/> <!-- packed member-id array -->
+ <field name="left" type="vbin16"/> <!-- packed member-id array -->
</control>
<control name="message-expired" code="0x12">