From 8554740df378860da8e2124ea481e464208c4377 Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Tue, 30 Mar 2010 21:48:50 +0000 Subject: Merged Alan's cluster changes from trunk. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@929314 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/qpid/broker/DelegatingPeriodicTimer.cpp | 50 ------------- qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.h | 57 --------------- qpid/cpp/src/qpid/cluster/Cluster.cpp | 80 +++++++++++---------- qpid/cpp/src/qpid/cluster/Cluster.h | 6 +- qpid/cpp/src/qpid/cluster/ErrorCheck.cpp | 2 +- qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp | 84 ---------------------- qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.h | 72 ------------------- qpid/cpp/src/qpid/cluster/PollableQueue.h | 5 +- qpid/cpp/src/qpid/cluster/StoreStatus.h | 3 + qpid/cpp/src/qpid/sys/PeriodicTimer.h | 56 --------------- qpid/cpp/src/qpid/sys/PeriodicTimerImpl.cpp | 49 ------------- qpid/cpp/src/qpid/sys/PeriodicTimerImpl.h | 48 ------------- qpid/cpp/src/qpid/sys/PollableQueue.h | 2 +- qpid/cpp/src/tests/cluster_tests.py | 7 +- qpid/cpp/src/tests/failover_soak.cpp | 4 +- qpid/cpp/xml/cluster.xml | 4 +- 16 files changed, 66 insertions(+), 463 deletions(-) delete mode 100644 qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp delete mode 100644 qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.h delete mode 100644 qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp delete mode 100644 qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.h delete mode 100644 qpid/cpp/src/qpid/sys/PeriodicTimer.h delete mode 100644 qpid/cpp/src/qpid/sys/PeriodicTimerImpl.cpp delete mode 100644 qpid/cpp/src/qpid/sys/PeriodicTimerImpl.h diff --git a/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp b/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp deleted file mode 100644 index 111d968543..0000000000 --- a/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "DelegatingPeriodicTimer.h" - -namespace qpid { -namespace broker { - -DelegatingPeriodicTimer::DelegatingPeriodicTimer() {} - -void DelegatingPeriodicTimer::add( - const Task& task, sys::Duration period, const std::string& taskName) -{ - if (delegate.get()) - delegate->add(task, period, taskName); - else { - Entry e; - e.task = task; - e.period = period; - e.name = taskName; - entries.push_back(e); - } -} - -void DelegatingPeriodicTimer::setDelegate(std::auto_ptr impl) { - assert(impl.get()); - assert(!delegate.get()); - delegate = impl; - for (Entries::iterator i = entries.begin(); i != entries.end(); ++i) - delegate->add(i->task, i->period, i->name); -} - -}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.h b/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.h deleted file mode 100644 index 5186f41c3e..0000000000 --- a/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.h +++ /dev/null @@ -1,57 +0,0 @@ -#ifndef QPID_BROKER_PERIODICTIMER_H -#define QPID_BROKER_PERIODICTIMER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/PeriodicTimer.h" -#include -#include - -namespace qpid { -namespace broker { - -/** - * A PeriodicTimer implementation that delegates to another PeriodicTimer. - * - * Tasks added while there is no delegate timer are stored. - * When a delgate timer is set, stored tasks are added to it. - */ -class DelegatingPeriodicTimer : public sys::PeriodicTimer -{ - public: - DelegatingPeriodicTimer(); - /** Add a task: if no delegate, store it. When delegate set, add stored tasks */ - void add(const Task& task, sys::Duration period, const std::string& taskName); - /** Set the delegate, transfers ownership of delegate. */ - void setDelegate(std::auto_ptr delegate); - bool hasDelegate() { return delegate.get(); } - private: - struct Entry { Task task; sys::Duration period; std::string name; }; - typedef std::vector Entries; - std::auto_ptr delegate; - Entries entries; - -}; - -}} // namespace qpid::broker - -#endif /*!QPID_BROKER_PERIODICTIMER_H*/ diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 6bb597d21f..858900be9e 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -217,8 +217,11 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void ready(const std::string& url) { cluster.ready(member, url, l); } - void configChange(const std::string& current) { - cluster.configChange(member, current, l); + void configChange(const std::string& members, + const std::string& left, + const std::string& joined) + { + cluster.configChange(member, members, left, joined, l); } void updateOffer(uint64_t updatee) { cluster.updateOffer(member, updatee, l); @@ -316,7 +319,9 @@ void Cluster::initialize() { broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); broker.setExpiryPolicy(expiryPolicy); dispatcher.start(); + deliverEventQueue.bypassOff(); deliverEventQueue.start(); + deliverFrameQueue.bypassOff(); deliverFrameQueue.start(); mcast.start(); @@ -554,40 +559,28 @@ Cluster::ConnectionVector Cluster::getConnections(Lock&) { boost::bind(&ConnectionMap::value_type::second, _1)); return result; } - -struct AddrList { - const cpg_address* addrs; - int count; - const char *prefix; - AddrList(const cpg_address* a, int n, const char* p="") - : addrs(a), count(n), prefix(p) {} -}; - -ostream& operator<<(ostream& o, const AddrList& a) { - if (!a.count) return o; - o << a.prefix; - for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) - o << qpid::cluster::MemberId(*p) << " "; - return o; -} +// CPG config-change callback. void Cluster::configChange ( cpg_handle_t /*handle*/, const cpg_name */*group*/, - const cpg_address *current, int nCurrent, + const cpg_address *members, int nMembers, const cpg_address *left, int nLeft, const cpg_address *joined, int nJoined) { Mutex::ScopedLock l(lock); - QPID_LOG(notice, *this << " membership change: " - << AddrList(current, nCurrent) << "(" - << AddrList(joined, nJoined, "joined: ") - << AddrList(left, nLeft, "left: ") - << ")"); - string addresses; - for (const cpg_address* p = current; p < current+nCurrent; ++p) - addresses.append(MemberId(*p).str()); - deliverEvent(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self)); + string membersStr, leftStr, joinedStr; + // Encode members and enqueue as an event so the config change can + // be executed in the correct thread. + for (const cpg_address* p = members; p < members+nMembers; ++p) + membersStr.append(MemberId(*p).str()); + for (const cpg_address* p = left; p < left+nLeft; ++p) + leftStr.append(MemberId(*p).str()); + for (const cpg_address* p = joined; p < joined+nJoined; ++p) + joinedStr.append(MemberId(*p).str()); + deliverEvent(Event::control(ClusterConfigChangeBody( + ProtocolVersion(), membersStr, leftStr, joinedStr), + self)); } void Cluster::setReady(Lock&) { @@ -606,6 +599,7 @@ void Cluster::initMapCompleted(Lock& l) { // We decide here whether we want to recover from our store. // We won't recover if we are joining an active cluster or our store is dirty. if (store.hasStore() && + store.getState() != STORE_STATE_EMPTY_STORE && (initMap.isActive() || store.getState() == STORE_STATE_DIRTY_STORE)) broker.setRecovery(false); // Ditch my current store. state = INIT; @@ -654,22 +648,33 @@ void Cluster::initMapCompleted(Lock& l) { } } -void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& l) { +void Cluster::configChange(const MemberId&, + const std::string& membersStr, + const std::string& leftStr, + const std::string& joinedStr, + Lock& l) +{ if (state == LEFT) return; + MemberSet members = decodeMemberSet(membersStr); + MemberSet left = decodeMemberSet(leftStr); + MemberSet joined = decodeMemberSet(joinedStr); + QPID_LOG(notice, *this << " Membership update " << map.getConfigSeq() << ": " + << members); + QPID_LOG_IF(notice, !left.empty(), *this << " Members left: " << left); + QPID_LOG_IF(notice, !joined.empty(), *this << " Members joined: " << joined); - MemberSet config = decodeMemberSet(configStr); - elders = intersection(elders, config); + // Update initital status for members joining or leaving. + elders = intersection(elders, members); if (elders.empty() && INIT < state && state < CATCHUP) { QPID_LOG(critical, "Cannot update, all potential updaters left the cluster."); leave(l); return; } - bool memberChange = map.configChange(config); - QPID_LOG(debug, "Config sequence " << map.getConfigSeq()); + bool memberChange = map.configChange(members); store.setConfigSeq(map.getConfigSeq()); // Update initital status for members joining or leaving. - initMap.configChange(config); + initMap.configChange(members); if (initMap.isResendNeeded()) { mcast.mcastControl( ClusterInitialStatusBody( @@ -965,8 +970,11 @@ void Cluster::memberUpdate(Lock& l) { if (store.hasStore()) { // Mark store clean if I am the only broker, dirty otherwise. - if (size == 1) store.clean(Uuid(true)); - else store.dirty(clusterId); + if (size == 1 ) { + if (!store.isClean()) store.clean(Uuid(true)); + } else { + if (!store.isDirty()) store.dirty(clusterId); + } } if (size == 1 && lastSize > 1 && state >= CATCHUP) { diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index 4a64ad73d6..343a66428b 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -163,7 +163,11 @@ class Cluster : private Cpg::Handler, public management::Manageable { const std::string& firstConfig, Lock&); void ready(const MemberId&, const std::string&, Lock&); - void configChange(const MemberId&, const std::string& current, Lock& l); + void configChange(const MemberId&, + const std::string& members, + const std::string& left, + const std::string& joined, + Lock& l); void messageExpired(const MemberId&, uint64_t, Lock& l); void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&); void timerWakeup(const MemberId&, const std::string& name, Lock&); diff --git a/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp index d66db8551d..be671c0f48 100644 --- a/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp +++ b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp @@ -110,7 +110,7 @@ ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator& const ClusterConfigChangeBody* configChange = static_cast(method); if (configChange) { - MemberSet members(decodeMemberSet(configChange->getCurrent())); + MemberSet members(decodeMemberSet(configChange->getMembers())); QPID_LOG(debug, cluster << " apply config change to error " << frameSeq << ": " << members); MemberSet intersect; diff --git a/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp b/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp deleted file mode 100644 index ced34b572d..0000000000 --- a/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp +++ /dev/null @@ -1,84 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "PeriodicTimerImpl.h" -#include "Cluster.h" -#include "qpid/framing/ProtocolVersion.h" -#include "qpid/framing/ClusterPeriodicTimerBody.h" - -namespace qpid { -namespace cluster { - -PeriodicTimerImpl::PeriodicTimerImpl(Cluster& c) : cluster(c) {} - -PeriodicTimerImpl::TaskEntry::TaskEntry( - Cluster& c, const Task& t, sys::Duration d, const std::string& n) - : TimerTask(d), cluster(c), timer(c.getBroker().getTimer()), - task(t), name(n), inFlight(false) -{ - timer.add(this); -} - -void PeriodicTimerImpl::TaskEntry::fire() { - setupNextFire(); - timer.add(this); - bool isElder = cluster.isElder(); // Call outside lock to avoid deadlock. - sys::Mutex::ScopedLock l(lock); - // Only the elder mcasts. - // Don't mcast another if we haven't yet received the last one. - if (isElder && !inFlight) { - QPID_LOG(trace, "Sending periodic-timer control for " << name); - inFlight = true; - cluster.getMulticast().mcastControl( - framing::ClusterPeriodicTimerBody(framing::ProtocolVersion(), name), - cluster.getId()); - } -} - -void PeriodicTimerImpl::TaskEntry::deliver() { - task(); - sys::Mutex::ScopedLock l(lock); - inFlight = false; -} - - -void PeriodicTimerImpl::add( - const Task& task, sys::Duration period, const std::string& name) -{ - sys::Mutex::ScopedLock l(lock); - QPID_LOG(debug, "Periodic timer add entry for " << name); - if (map.find(name) != map.end()) - throw Exception(QPID_MSG("Cluster timer task name added twice: " << name)); - map[name] = new TaskEntry(cluster, task, period, name); -} - -void PeriodicTimerImpl::deliver(const std::string& name) { - Map::iterator i; - { - sys::Mutex::ScopedLock l(lock); - i = map.find(name); - if (i == map.end()) - throw Exception(QPID_MSG("Cluster timer unknown task: " << name)); - } - QPID_LOG(debug, "Periodic timer execute " << name); - i->second->deliver(); -} - -}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.h b/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.h deleted file mode 100644 index cb0034bcef..0000000000 --- a/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.h +++ /dev/null @@ -1,72 +0,0 @@ -#ifndef QPID_CLUSTER_PERIODICTIMERIMPL_H -#define QPID_CLUSTER_PERIODICTIMERIMPL_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/PeriodicTimer.h" -#include "qpid/sys/Mutex.h" -#include - -namespace qpid { -namespace cluster { - -class Cluster; - -/** - * Cluster implementation of PeriodicTimer. - * - * All members run a periodic task, elder mcasts periodic-timer control. - * Actual task is executed on delivery of periodic-timer. - */ -class PeriodicTimerImpl : public sys::PeriodicTimer -{ - public: - PeriodicTimerImpl(Cluster& cluster); - void add(const Task& task, sys::Duration period, const std::string& taskName); - void deliver(const std::string& name); - - private: - - class TaskEntry : public sys::TimerTask { - public: - TaskEntry(Cluster&, const Task&, sys::Duration period, const std::string& name); - void fire(); - void deliver(); - private: - sys::Mutex lock; - Cluster& cluster; - sys::Timer& timer; - Task task; - std::string name; - bool inFlight; - }; - - typedef std::map > Map; - struct TaskImpl; - - sys::Mutex lock; - Map map; - Cluster& cluster; -}; -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_PERIODICTIMER_H*/ diff --git a/qpid/cpp/src/qpid/cluster/PollableQueue.h b/qpid/cpp/src/qpid/cluster/PollableQueue.h index 59d0bcd36a..10e2ed6ac3 100644 --- a/qpid/cpp/src/qpid/cluster/PollableQueue.h +++ b/qpid/cpp/src/qpid/cluster/PollableQueue.h @@ -74,10 +74,7 @@ template class PollableQueue : public sys::PollableQueue { else sys::PollableQueue::push(t); } - void start() { - bypass = false; - sys::PollableQueue::start(); - } + void bypassOff() { bypass = false; } private: Callback callback; diff --git a/qpid/cpp/src/qpid/cluster/StoreStatus.h b/qpid/cpp/src/qpid/cluster/StoreStatus.h index 2371f0424e..b496fe0dc2 100644 --- a/qpid/cpp/src/qpid/cluster/StoreStatus.h +++ b/qpid/cpp/src/qpid/cluster/StoreStatus.h @@ -42,6 +42,9 @@ class StoreStatus StoreStatus(const std::string& dir); framing::cluster::StoreState getState() const { return state; } + bool isClean() { return state == framing::cluster::STORE_STATE_CLEAN_STORE; } + bool isDirty() { return state == framing::cluster::STORE_STATE_DIRTY_STORE; } + const Uuid& getClusterId() const { return clusterId; } const Uuid& getShutdownId() const { return shutdownId; } framing::SequenceNumber getConfigSeq() const { return configSeq; } diff --git a/qpid/cpp/src/qpid/sys/PeriodicTimer.h b/qpid/cpp/src/qpid/sys/PeriodicTimer.h deleted file mode 100644 index 290ffb6218..0000000000 --- a/qpid/cpp/src/qpid/sys/PeriodicTimer.h +++ /dev/null @@ -1,56 +0,0 @@ -#ifndef QPID_SYS_PERIODICTIMER_H -#define QPID_SYS_PERIODICTIMER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "Timer.h" -#include - -namespace qpid { -namespace sys { - -/** - * Interface of a timer for periodic tasks that should be synchronized - * across all brokers in a periodic. The standalone broker - * implementation simply wraps qpid::sys::Timer. The clustered broker - * implementation synchronizes execution of periodic tasks on all - * periodic members. - */ -class PeriodicTimer -{ - public: - typedef boost::function Task; - - QPID_COMMON_EXTERN virtual ~PeriodicTimer() {} - - /** - * Add a named task to be executed at the given period. - * - * The task registered under the same name will be executed on - * all brokers at the given period. - */ - virtual void add(const Task& task, Duration period, const std::string& taskName) = 0; - -}; -}} // namespace qpid::sys - -#endif /*!QPID_SYS_PERIODICTIMER_H*/ diff --git a/qpid/cpp/src/qpid/sys/PeriodicTimerImpl.cpp b/qpid/cpp/src/qpid/sys/PeriodicTimerImpl.cpp deleted file mode 100644 index e2a6dccf3e..0000000000 --- a/qpid/cpp/src/qpid/sys/PeriodicTimerImpl.cpp +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "PeriodicTimerImpl.h" - -namespace qpid { -namespace sys { - -PeriodicTimerImpl::PeriodicTimerImpl(Timer& t) : timer(t) {} - -struct PeriodicTimerImpl::TaskImpl : public TimerTask { - Timer& timer; - Task task; - - TaskImpl(Timer& timer_, const Task& task_, Duration period) : - TimerTask(period), timer(timer_), task(task_) {} - - void fire() { - task(); - setupNextFire(); - timer.add(this); - } -}; - -void PeriodicTimerImpl::add( - const Task& task, Duration period, const std::string& /*taskName*/ -) -{ - timer.add(new TaskImpl(timer, task, period)); -} - -}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/PeriodicTimerImpl.h b/qpid/cpp/src/qpid/sys/PeriodicTimerImpl.h deleted file mode 100644 index 3fc9396f01..0000000000 --- a/qpid/cpp/src/qpid/sys/PeriodicTimerImpl.h +++ /dev/null @@ -1,48 +0,0 @@ -#ifndef QPID_SYS_PERIODICTIMERIMPL_H -#define QPID_SYS_PERIODICTIMERIMPL_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "PeriodicTimer.h" -#include "qpid/CommonImportExport.h" - -namespace qpid { -namespace sys { - -/** - * Standalone broker implementation of PeriodicTimer. - */ -class PeriodicTimerImpl : public PeriodicTimer -{ - public: - QPID_COMMON_EXTERN PeriodicTimerImpl(Timer& timer); - QPID_COMMON_EXTERN void add(const Task& task, - Duration period, - const std::string& taskName); - - private: - struct TaskImpl; - Timer& timer; -}; -}} // namespace qpid::sys - -#endif /*!QPID_SYS_PERIODICTIMER_H*/ diff --git a/qpid/cpp/src/qpid/sys/PollableQueue.h b/qpid/cpp/src/qpid/sys/PollableQueue.h index 0786b21610..cb8c126fe6 100644 --- a/qpid/cpp/src/qpid/sys/PollableQueue.h +++ b/qpid/cpp/src/qpid/sys/PollableQueue.h @@ -126,7 +126,7 @@ template PollableQueue::~PollableQueue() { template void PollableQueue::push(const T& t) { ScopedLock l(lock); - if (queue.empty()) condition.set(); + if (queue.empty() && !stopped) condition.set(); queue.push_back(t); } diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index fe020786dc..0c4b0350a7 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -404,12 +404,17 @@ class StoreTests(BrokerTest): c = cluster.start("c", expect=EXPECT_EXIT_FAIL) a.send_message("q", Message("x", durable=True)) a.kill() + # FIXME aconway 2010-03-29: this test has too many sleeps. + # Need to tighten up status persistence to be more atomic and less + # prone to interruption. + time.sleep(0.1) # pause for b to update status. b.kill() # c is last man time.sleep(0.1) # pause for c to find out hes last. a = cluster.start("a", expect=EXPECT_EXIT_FAIL) # c no longer last man + time.sleep(0.1) # pause for c to find out hes no longer last. c.kill() # a is now last man time.sleep(0.1) # pause for a to find out hes last. - a.kill() # really last + a.kill() # really last, should be clean. # b & c should be dirty b = cluster.start("b", wait=False, expect=EXPECT_EXIT_FAIL) self.assert_dirty_store(b) diff --git a/qpid/cpp/src/tests/failover_soak.cpp b/qpid/cpp/src/tests/failover_soak.cpp index 8bf6eca9e6..cd7aaa6614 100644 --- a/qpid/cpp/src/tests/failover_soak.cpp +++ b/qpid/cpp/src/tests/failover_soak.cpp @@ -337,7 +337,6 @@ startNewBroker ( brokerVector & brokers, int verbosity, int durable ) { - // ("--log-enable=notice+") static int brokerId = 0; stringstream path, prefix; prefix << "soak-" << brokerId; @@ -348,7 +347,8 @@ startNewBroker ( brokerVector & brokers, ("--mgmt-enable=no") ("--log-prefix")(prefix.str()) ("--log-to-file")(prefix.str()+".log") - ("--log-enable=notice+") + ("--log-enable=info+") + ("--log-enable=debug+:cluster") ("TMP_DATA_DIR"); if (endsWith(moduleOrDir, "cluster.so")) { diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index 8f94eb3fd1..c4750a3195 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -73,7 +73,9 @@ - + + + -- cgit v1.2.1