diff options
author | Alan Conway <aconway@apache.org> | 2010-03-12 20:11:31 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-03-12 20:11:31 +0000 |
commit | ef9268528d3147173dfb0d2ef707ee3e4fc4f210 (patch) | |
tree | 4d8a9851683812bd04392f57c695a5143c80ca79 | |
parent | 937fe6e7295efff28cb680642fca28ebf65e7d4e (diff) | |
download | qpid-python-ef9268528d3147173dfb0d2ef707ee3e4fc4f210.tar.gz |
New cluster member pushes store when joining an active cluster.
Previously a broker with a clean store would not be able to join an
active cluster because the shtudown-id did not match. This commit
ensures that when a broker joins an active cluster, it always pushes
its store regardless of status. Clean/dirty status is only compared
when forming an initial cluster.
This change required splitting initialization into two phases:
PRE_INIT: occurs in the Cluster ctor during early-initialize. This
phase determines whether or not to push the store.
INIT: occurs after Cluster::initialize and does the remaining
initialization chores.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@922412 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/broker/IncompleteMessageList.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 91 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/InitialStatusMap.cpp | 32 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/InitialStatusMap.h | 11 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Multicaster.cpp | 21 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Multicaster.h | 23 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/PollableQueue.h | 20 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/StoreStatus.cpp | 39 | ||||
-rw-r--r-- | cpp/src/tests/InitialStatusMap.cpp | 16 | ||||
-rwxr-xr-x | cpp/src/tests/cluster_tests.py | 70 | ||||
-rw-r--r-- | python/qpid/brokertest.py | 9 |
12 files changed, 242 insertions, 96 deletions
diff --git a/cpp/src/qpid/broker/IncompleteMessageList.cpp b/cpp/src/qpid/broker/IncompleteMessageList.cpp index a061e872d0..34d92fa752 100644 --- a/cpp/src/qpid/broker/IncompleteMessageList.cpp +++ b/cpp/src/qpid/broker/IncompleteMessageList.cpp @@ -79,7 +79,7 @@ void IncompleteMessageList::each(const CompletionListener& listen) { sys::Mutex::ScopedLock l(lock); snapshot = incomplete; } - std::for_each(incomplete.begin(), incomplete.end(), listen); // FIXME aconway 2008-11-07: passed by ref or value? + std::for_each(incomplete.begin(), incomplete.end(), listen); } }} diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 92e2b65fe2..f8a875a30c 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -16,7 +16,8 @@ * */ -/** CLUSTER IMPLEMENTATION OVERVIEW +/** + * <h1>CLUSTER IMPLEMENTATION OVERVIEW</h1> * * The cluster works on the principle that if all members of the * cluster receive identical input, they will all produce identical @@ -41,12 +42,15 @@ * * The following are the current areas where broker uses timers or timestamps: * - * - Producer flow control: broker::SemanticState uses connection::getClusterOrderOutput. - * a FrameHandler that sends frames to the client via the cluster. Used by broker::SessionState + * - Producer flow control: broker::SemanticState uses + * connection::getClusterOrderOutput. a FrameHandler that sends + * frames to the client via the cluster. Used by broker::SessionState * - * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is implemented by cluster::ExpiryPolicy. + * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is + * implemented by cluster::ExpiryPolicy. * - * - Connection heartbeat: sends connection controls, not part of session command counting so OK to ignore. + * - Connection heartbeat: sends connection controls, not part of + * session command counting so OK to ignore. * * - LinkRegistry: only cluster elder is ever active for links. * @@ -57,7 +61,10 @@ * * cluster::ExpiryPolicy implements the strategy for message expiry. * - * CLUSTER PROTOCOL OVERVIEW + * ClusterTimer implements periodic timed events in the cluster context. + * Used for periodic management events. + * + * <h1>CLUSTER PROTOCOL OVERVIEW</h1> * * Messages sent to/from CPG are called Events. * @@ -84,12 +91,16 @@ * - Connection control events carrying non-cluster frames: frames sent to the client. * e.g. flow-control frames generated on a timer. * - * CLUSTER INITIALIZATION OVERVIEW + * <h1>CLUSTER INITIALIZATION OVERVIEW</h1> + * + * @see InitialStatusMap * * When a new member joins the CPG group, all members (including the * new one) multicast their "initial status." The new member is in - * INIT mode until it gets a complete set of initial status messages - * from all cluster members. + * PRE_INIT mode until it gets a complete set of initial status + * messages from all cluster members. In a newly-forming cluster is + * then in INIT mode until the configured cluster-size members have + * joined. * * The newcomer uses initial status to determine * - The cluster UUID @@ -97,11 +108,16 @@ * - Do I need to get an update from an existing active member? * - Can I recover from my own store? * - * Initialization happens in the Cluster constructor (plugin - * early-init phase) because it needs to be done before the store - * initializes. In INIT mode sending & receiving from the cluster are - * done single-threaded, bypassing the normal PollableQueues because - * the Poller is not active at this point to service them. + * Pre-initialization happens in the Cluster constructor (plugin + * early-init phase) because it needs to set the recovery flag before + * the store initializes. This phase lasts until inital-status is + * received for all active members. The PollableQueues and Multicaster + * are in "bypass" mode during this phase since the poller has not + * started so there are no threads to serve pollable queues. + * + * The remaining initialization happens in Cluster::initialize() or, + * if cluster-size=N is specified, in the deliver thread when an + * initial-status control is delivered that brings the total to N. */ #include "qpid/Exception.h" #include "qpid/cluster/Cluster.h" @@ -244,7 +260,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : quorum(boost::bind(&Cluster::leave, this)), decoder(boost::bind(&Cluster::deliverFrame, this, _1)), discarding(true), - state(INIT), + state(PRE_INIT), initMap(self, settings.size), store(broker.getDataDir().getPath()), elder(false), @@ -274,17 +290,18 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : // without modifying delivery-properties.exchange. broker.getExchanges().registerExchange( boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); + // Load my store status before we go into initialization if (! broker::NullMessageStore::isNullStore(&broker.getStore())) { store.load(); - if (store.getState() == STORE_STATE_DIRTY_STORE) - broker.setRecovery(false); // Ditch my current store. if (store.getClusterId()) clusterId = store.getClusterId(); // Use stored ID if there is one. QPID_LOG(notice, "Cluster store state: " << store) } - cpg.join(name); + // pump the CPG dispatch manually till we get past PRE_INIT. + while (state == PRE_INIT) + cpg.dispatchOne(); } Cluster::~Cluster() { @@ -301,9 +318,14 @@ void Cluster::initialize() { dispatcher.start(); deliverEventQueue.start(); deliverFrameQueue.start(); + mcast.start(); + + // Run initMapCompleted immediately to process the initial configuration. + assert(state == INIT); + initMapCompleted(*(Mutex::ScopedLock*)0); // Fake lock, single-threaded context. // Add finalizer last for exception safety. - broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); + broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); } // Called in connection thread to insert a client connection. @@ -579,9 +601,27 @@ void Cluster::setReady(Lock&) { void Cluster::initMapCompleted(Lock& l) { // Called on completion of the initial status map. QPID_LOG(debug, *this << " initial status map complete. "); - if (state == INIT) { - // We have status for all members so we can make join descisions. + if (state == PRE_INIT) { + // PRE_INIT means we're still in the earlyInitialize phase, in the constructor. + // We decide here whether we want to recover from our store. + // We won't recover if we are joining an active cluster or our store is dirty. + if (store.hasStore() && + (initMap.isActive() || store.getState() == STORE_STATE_DIRTY_STORE)) + broker.setRecovery(false); // Ditch my current store. + state = INIT; + } + else if (state == INIT) { + // INIT means we are past Cluster::initialize(). + + // If we're forming an initial cluster (no active members) + // then we wait to reach the configured cluster-size + if (!initMap.isActive() && initMap.getActualSize() < initMap.getRequiredSize()) { + QPID_LOG(info, *this << initMap.getActualSize() + << " members, waiting for at least " << initMap.getRequiredSize()); + return; + } initMap.checkConsistent(); + elders = initMap.getElders(); QPID_LOG(debug, *this << " elders: " << elders); if (elders.empty()) @@ -969,7 +1009,8 @@ void Cluster::memberUpdate(Lock& l) { std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { static const char* STATE[] = { - "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" + "PRE_INIT", "INIT", "JOINER", "UPDATEE", "CATCHUP", + "READY", "OFFER", "UPDATER", "LEFT" }; assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1); o << "cluster(" << cluster.self << " " << STATE[cluster.state]; @@ -1009,12 +1050,14 @@ void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNu } void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) { - timer->deliverWakeup(name); + if (state >= CATCHUP) // Pre catchup our timer isn't set up. + timer->deliverWakeup(name); } void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) { QPID_LOG(debug, "Cluster timer drop " << map.getFrameSeq() << ": " << name) - timer->deliverDrop(name); + if (state >= CATCHUP) // Pre catchup our timer isn't set up. + timer->deliverDrop(name); } bool Cluster::isElder() const { diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index e280a7e928..4a64ad73d6 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -180,6 +180,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void memberUpdate(Lock&); void setClusterId(const framing::Uuid&, Lock&); void erase(const ConnectionId&, Lock&); + void requestUpdate(Lock& ); void initMapCompleted(Lock&); void becomeElder(Lock&); @@ -251,7 +252,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Local cluster state, cluster map enum { - INIT, ///< Establishing inital cluster stattus. + PRE_INIT,///< Have not yet received complete initial status map. + INIT, ///< Waiting to reach cluster-size. JOINER, ///< Sent update request, waiting for update offer. UPDATEE, ///< Stalled receive queue at update offer, waiting for update to complete. CATCHUP, ///< Update complete, unstalled but has not yet seen own "ready" event. diff --git a/cpp/src/qpid/cluster/InitialStatusMap.cpp b/cpp/src/qpid/cluster/InitialStatusMap.cpp index a1a1456618..c8ecc13f2c 100644 --- a/cpp/src/qpid/cluster/InitialStatusMap.cpp +++ b/cpp/src/qpid/cluster/InitialStatusMap.cpp @@ -86,8 +86,7 @@ bool InitialStatusMap::notInitialized(const Map::value_type& v) { } bool InitialStatusMap::isComplete() const { - return !map.empty() && find_if(map.begin(), map.end(), ¬Initialized) == map.end() - && (map.size() >= size); + return !map.empty() && find_if(map.begin(), map.end(), ¬Initialized) == map.end(); } bool InitialStatusMap::transitionToComplete() { @@ -100,7 +99,7 @@ bool InitialStatusMap::isResendNeeded() { return ret; } -bool InitialStatusMap::isActive(const Map::value_type& v) { +bool InitialStatusMap::isActiveEntry(const Map::value_type& v) { return v.second && v.second->getActive(); } @@ -110,10 +109,15 @@ bool InitialStatusMap::hasStore(const Map::value_type& v) { v.second->getStoreState() == STORE_STATE_DIRTY_STORE); } +bool InitialStatusMap::isActive() { + assert(isComplete()); + return (find_if(map.begin(), map.end(), &isActiveEntry) != map.end()); +} + bool InitialStatusMap::isUpdateNeeded() { assert(isComplete()); // We need an update if there are any active members. - if (find_if(map.begin(), map.end(), &isActive) != map.end()) return true; + if (isActive()) return true; // Otherwise it depends on store status, get my own status: Map::iterator me = map.find(self); @@ -154,7 +158,7 @@ MemberSet InitialStatusMap::getElders() const { Uuid InitialStatusMap::getClusterId() { assert(isComplete()); assert(!map.empty()); - Map::iterator i = find_if(map.begin(), map.end(), &isActive); + Map::iterator i = find_if(map.begin(), map.end(), &isActiveEntry); if (i != map.end()) return i->second->getClusterId(); // An active member else @@ -178,6 +182,7 @@ void InitialStatusMap::checkConsistent() { Uuid clusterId; Uuid shutdownId; + bool initialCluster = !isActive(); for (Map::iterator i = map.begin(); i != map.end(); ++i) { assert(i->second); if (i->second->getActive()) ++active; @@ -193,8 +198,10 @@ void InitialStatusMap::checkConsistent() { ++clean; checkId(clusterId, i->second->getClusterId(), "Cluster-ID mismatch. Stores belong to different clusters."); - checkId(shutdownId, i->second->getShutdownId(), - "Shutdown-ID mismatch. Stores were not shut down together"); + // Only need shutdownId to match if we are in an initially forming cluster. + if (initialCluster) + checkId(shutdownId, i->second->getShutdownId(), + "Shutdown-ID mismatch. Stores were not shut down together"); break; } } @@ -202,10 +209,13 @@ void InitialStatusMap::checkConsistent() { if (none && (clean+dirty+empty)) throw Exception("Mixing transient and persistent brokers in a cluster"); - // If there are no active members and there are dirty stores there - // must be at least one clean store. - if (!active && dirty && !clean) - throw Exception("Cannot recover, no clean store."); + if (map.size() >= size) { + // All initial members are present. If there are no active + // members and there are dirty stores there must be at least + // one clean store. + if (!active && dirty && !clean) + throw Exception("Cannot recover, no clean store."); + } } std::string InitialStatusMap::getFirstConfigStr() const { diff --git a/cpp/src/qpid/cluster/InitialStatusMap.h b/cpp/src/qpid/cluster/InitialStatusMap.h index eedc99b0b2..a5a600365e 100644 --- a/cpp/src/qpid/cluster/InitialStatusMap.h +++ b/cpp/src/qpid/cluster/InitialStatusMap.h @@ -51,12 +51,18 @@ class InitialStatusMap /** Process received status */ void received(const MemberId&, const Status& is); - /**@return true if the map is complete. */ + /**@return true if the map has an entry for all current cluster members. */ bool isComplete() const; + + size_t getActualSize() const { return map.size(); } + size_t getRequiredSize() const { return size; } + /**@return true if the map was completed by the last config change or received. */ bool transitionToComplete(); /**@pre isComplete(). @return this node's elders */ MemberSet getElders() const; + /**@pre isComplete(). @return True if there are active members of the cluster. */ + bool isActive(); /**@pre isComplete(). @return True if we need to request an update. */ bool isUpdateNeeded(); /**@pre isComplete(). @return Cluster-wide cluster ID. */ @@ -71,8 +77,9 @@ class InitialStatusMap private: typedef std::map<MemberId, boost::optional<Status> > Map; static bool notInitialized(const Map::value_type&); - static bool isActive(const Map::value_type&); + static bool isActiveEntry(const Map::value_type&); static bool hasStore(const Map::value_type&); + Map map; MemberSet firstConfig; MemberId self; diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp index 4a8195438f..d57ff76941 100644 --- a/cpp/src/qpid/cluster/Multicaster.cpp +++ b/cpp/src/qpid/cluster/Multicaster.cpp @@ -33,10 +33,8 @@ Multicaster::Multicaster(Cpg& cpg_, boost::function<void()> onError_) : onError(onError_), cpg(cpg_), queue(boost::bind(&Multicaster::sendMcast, this, _1), poller), - ready(false) -{ - queue.start(); -} + ready(false), bypass(true) +{} void Multicaster::mcastControl(const framing::AMQBody& body, const ConnectionId& id) { mcast(Event::control(body, id)); @@ -61,10 +59,16 @@ void Multicaster::mcast(const Event& e) { } } QPID_LOG(trace, "MCAST " << e); - queue.push(e); + if (bypass) { // direct, don't queue + iovec iov = e.toIovec(); + // FIXME aconway 2010-03-10: should do limited retry. + while (!cpg.mcast(&iov, 1)) + ; + } + else + queue.push(e); } - Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(const PollableEventQueue::Batch& values) { try { PollableEventQueue::Batch::const_iterator i = values.begin(); @@ -86,6 +90,11 @@ Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(co } } +void Multicaster::start() { + queue.start(); + bypass = false; +} + void Multicaster::setReady() { sys::Mutex::ScopedLock l(lock); ready = true; diff --git a/cpp/src/qpid/cluster/Multicaster.h b/cpp/src/qpid/cluster/Multicaster.h index 2db84a9ce0..f70bd5ca31 100644 --- a/cpp/src/qpid/cluster/Multicaster.h +++ b/cpp/src/qpid/cluster/Multicaster.h @@ -41,16 +41,18 @@ class Cpg; /** * Multicast to the cluster. Shared, thread safe object. - * - * Runs in two modes; * - * initializing: Hold connection mcast events. Multicast cluster - * events directly in the calling thread. This mode is used before - * joining the cluster where the poller may not yet be active and we - * want to hold any connection traffic till we join. + * holding mode: Hold connection events for later multicast. Cluster + * events are never held. Used during PRE_INIT/INIT state when we + * want to hold any connection traffic till we are read in the + * cluster. + * + * bypass mode: Multicast cluster events directly in the calling + * thread. This mode is used by cluster in PRE_INIT state the poller + * is not yet be active. * - * ready: normal operation. Queues all mcasts on a pollable queue, - * multicasts connection and cluster events. + * Multicaster is created in bypass+holding mode, they are disabled by + * start and setReady respectively. */ class Multicaster { @@ -65,7 +67,9 @@ class Multicaster void mcastBuffer(const char*, size_t, const ConnectionId&); void mcast(const Event& e); - /** Switch to ready mode. */ + /** Start the pollable queue, turn off bypass mode. */ + void start(); + /** Switch to ready mode, release held messages. */ void setReady(); private: @@ -81,6 +85,7 @@ class Multicaster bool ready; PlainEventQueue holdingQueue; std::vector<struct ::iovec> ioVector; + bool bypass; }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/PollableQueue.h b/cpp/src/qpid/cluster/PollableQueue.h index 2aed6de5b9..59d0bcd36a 100644 --- a/cpp/src/qpid/cluster/PollableQueue.h +++ b/cpp/src/qpid/cluster/PollableQueue.h @@ -31,6 +31,13 @@ namespace cluster { /** * More convenient version of PollableQueue that handles iterating * over the batch and error handling. + * + * Constructed in "bypass" mode where items are processed directly + * rather than put on the queue. This is important for the + * PRE_INIT stage when Cluster is pumping CPG dispatch directly + * before the poller has started. + * + * Calling start() starts the pollable queue and disabled bypass mode. */ template <class T> class PollableQueue : public sys::PollableQueue<T> { public: @@ -41,7 +48,7 @@ template <class T> class PollableQueue : public sys::PollableQueue<T> { const boost::shared_ptr<sys::Poller>& poller) : sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1), poller), - callback(f), error(err), message(msg) + callback(f), error(err), message(msg), bypass(true) {} typename sys::PollableQueue<T>::Batch::const_iterator @@ -62,10 +69,21 @@ template <class T> class PollableQueue : public sys::PollableQueue<T> { } } + void push(const T& t) { + if (bypass) callback(t); + else sys::PollableQueue<T>::push(t); + } + + void start() { + bypass = false; + sys::PollableQueue<T>::start(); + } + private: Callback callback; ErrorCallback error; std::string message; + bool bypass; }; diff --git a/cpp/src/qpid/cluster/StoreStatus.cpp b/cpp/src/qpid/cluster/StoreStatus.cpp index 648fcfbbd5..b44c0e1a9a 100644 --- a/cpp/src/qpid/cluster/StoreStatus.cpp +++ b/cpp/src/qpid/cluster/StoreStatus.cpp @@ -21,6 +21,7 @@ #include "StoreStatus.h" #include "qpid/Exception.h" #include "qpid/Msg.h" +#include "qpid/log/Statement.h" #include <boost/filesystem/path.hpp> #include <boost/filesystem/fstream.hpp> #include <boost/filesystem/operations.hpp> @@ -54,24 +55,39 @@ Uuid loadUuid(const fs::path& path) { Uuid ret; if (exists(path)) { fs::ifstream i(path); - throw_exceptions(i); - i >> ret; + try { + throw_exceptions(i); + i >> ret; + } catch (const std::exception& e) { + QPID_LOG(error, "Cant load UUID from " << path.string() << ": " << e.what()); + throw; + } } return ret; } void saveUuid(const fs::path& path, const Uuid& uuid) { fs::ofstream o(path); - throw_exceptions(o); - o << uuid; + try { + throw_exceptions(o); + o << uuid; + } catch (const std::exception& e) { + QPID_LOG(error, "Cant save UUID to " << path.string() << ": " << e.what()); + throw; + } } framing::SequenceNumber loadSeqNum(const fs::path& path) { uint32_t n = 0; if (exists(path)) { fs::ifstream i(path); - throw_exceptions(i); - i >> n; + try { + throw_exceptions(i); + i >> n; + } catch (const std::exception& e) { + QPID_LOG(error, "Cant load sequence number from " << path.string() << ": " << e.what()); + throw; + } } return framing::SequenceNumber(n); } @@ -105,9 +121,14 @@ void StoreStatus::save() { create_directory(dir); saveUuid(dir/CLUSTER_ID_FILE, clusterId); saveUuid(dir/SHUTDOWN_ID_FILE, shutdownId); - fs::ofstream o(dir/CONFIG_SEQ_FILE); - throw_exceptions(o); - o << configSeq.getValue(); + try { + fs::ofstream o(dir/CONFIG_SEQ_FILE); + throw_exceptions(o); + o << configSeq.getValue(); + } catch (const std::exception& e) { + QPID_LOG(error, "Cant save sequence number to " << (dir/CONFIG_SEQ_FILE).string() << ": " << e.what()); + throw; + } } catch (const std::exception&e) { throw Exception(QPID_MSG("Cannot save cluster store status: " << e.what())); diff --git a/cpp/src/tests/InitialStatusMap.cpp b/cpp/src/tests/InitialStatusMap.cpp index 91c95ac517..ecbe2d4161 100644 --- a/cpp/src/tests/InitialStatusMap.cpp +++ b/cpp/src/tests/InitialStatusMap.cpp @@ -173,20 +173,6 @@ QPID_AUTO_TEST_CASE(testInteveningConfig) { BOOST_CHECK_EQUAL(map.getClusterId(), id); } -QPID_AUTO_TEST_CASE(testInitialSize) { - InitialStatusMap map(MemberId(0), 3); - map.configChange(list_of<MemberId>(0)(1)); - map.received(MemberId(0), newcomerStatus()); - map.received(MemberId(1), newcomerStatus()); - BOOST_CHECK(!map.isComplete()); - - map.configChange(list_of<MemberId>(0)(1)(2)); - map.received(MemberId(0), newcomerStatus()); - map.received(MemberId(1), newcomerStatus()); - map.received(MemberId(2), newcomerStatus()); - BOOST_CHECK(map.isComplete()); -} - QPID_AUTO_TEST_CASE(testAllCleanNoUpdate) { InitialStatusMap map(MemberId(0), 3); map.configChange(list_of<MemberId>(0)(1)(2)); @@ -244,8 +230,6 @@ QPID_AUTO_TEST_CASE(testEmptyAlone) { BOOST_CHECK(!map.isUpdateNeeded()); } -// FIXME aconway 2009-11-20: consistency tests for mixed stores, - QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index 08f1697c7a..4fefe26db3 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -29,9 +29,19 @@ from itertools import chain log = getLogger("qpid.cluster_tests") +# Note: brokers that shut themselves down due to critical error during +# normal operation will still have an exit code of 0. Brokers that +# shut down because of an error found during initialize will exit with +# a non-0 code. Hence the apparently inconsistent use of EXPECT_EXIT_OK +# and EXPECT_EXIT_FAIL in some of the tests below. + +# FIXME aconway 2010-03-11: resolve this - ideally any exit due to an error +# should give non-0 exit status. + # Import scripts as modules qpid_cluster=import_script(checkenv("QPID_CLUSTER_EXEC")) + def readfile(filename): """Returns te content of file named filename as a string""" f = file(filename) @@ -287,6 +297,11 @@ class StoreTests(BrokerTest): m = cluster.start("restartme").get_message("q") self.assertEqual("x", m.content) + def stop_cluster(self,broker): + """Clean shut-down of a cluster""" + self.assertEqual(0, qpid_cluster.main( + ["qpid-cluster", "-kf", broker.host_port()])) + def test_persistent_restart(self): """Verify persistent cluster shutdown/restart scenarios""" cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"]) @@ -302,7 +317,7 @@ class StoreTests(BrokerTest): self.assertEqual(c.get_message("q").content, "2") # Shut down the entire cluster cleanly and bring it back up a.send_message("q", Message("3", durable=True)) - self.assertEqual(0, qpid_cluster.main(["qpid-cluster", "-kf", a.host_port()])) + self.stop_cluster(a) a = cluster.start("a", wait=False) b = cluster.start("b", wait=False) c = cluster.start("c", wait=True) @@ -320,7 +335,7 @@ class StoreTests(BrokerTest): b.kill() self.assertEqual(c.get_message("q").content, "4") c.send_message("q", Message("clean", durable=True)) - self.assertEqual(0, qpid_cluster.main(["qpid-cluster", "-kf", c.host_port()])) + self.stop_cluster(c) a = cluster.start("a", wait=False) b = cluster.start("b", wait=False) c = cluster.start("c", wait=True) @@ -333,7 +348,7 @@ class StoreTests(BrokerTest): a.terminate() cluster2 = self.cluster(1, args=self.args()) try: - a = cluster2.start("a", expect=EXPECT_EXIT_OK) + a = cluster2.start("a", expect=EXPECT_EXIT_FAIL) a.ready() self.fail("Expected exception") except: pass @@ -343,27 +358,29 @@ class StoreTests(BrokerTest): cluster = self.cluster(0, args=self.args()+["--cluster-size=2"]) a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False) - self.assertEqual(0, qpid_cluster.main(["qpid_cluster", "-kf", a.host_port()])) + self.stop_cluster(a) self.assertEqual(a.wait(), 0) self.assertEqual(b.wait(), 0) # Restart with a different member and shut down. a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=False) - self.assertEqual(0, qpid_cluster.main(["qpid_cluster", "-kf", a.host_port()])) + self.stop_cluster(a) self.assertEqual(a.wait(), 0) self.assertEqual(c.wait(), 0) - # Mix members from both shutdown events, they should fail - a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) - b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False) + # FIXME aconway 2010-03-11: can't predict the exit status of these + # as it depends on the order of delivery of initial-status messages. + # See comment at top of this file. + a = cluster.start("a", expect=EXPECT_UNKNOWN, wait=False) + b = cluster.start("b", expect=EXPECT_UNKNOWN, wait=False) self.assertRaises(Exception, lambda: a.ready()) self.assertRaises(Exception, lambda: b.ready()) def assert_dirty_store(self, broker): - self.assertRaises(Exception, lambda: broker.ready()) + assert retry(lambda: os.path.exists(broker.log)), "Missing log file %s"%broker.log msg = re.compile("critical.*no clean store") - assert msg.search(readfile(broker.log)) + assert retry(lambda: msg.search(readfile(broker.log))), "Expected dirty store message in %s"%broker.log def test_solo_store_clean(self): # A single node cluster should always leave a clean store. @@ -375,7 +392,6 @@ class StoreTests(BrokerTest): self.assertEqual(a.get_message("q").content, "x") def test_last_store_clean(self): - # Verify that only the last node in a cluster to shut down has # a clean store. Start with cluster of 3, reduce to 1 then # increase again to ensure that a node that was once alone but @@ -394,13 +410,41 @@ class StoreTests(BrokerTest): time.sleep(0.1) # pause for a to find out hes last. a.kill() # really last # b & c should be dirty - b = cluster.start("b", wait=False, expect=EXPECT_EXIT_OK) + b = cluster.start("b", wait=False, expect=EXPECT_EXIT_FAIL) self.assert_dirty_store(b) - c = cluster.start("c", wait=False, expect=EXPECT_EXIT_OK) + c = cluster.start("c", wait=False, expect=EXPECT_EXIT_FAIL) self.assert_dirty_store(c) # a should be clean a = cluster.start("a") self.assertEqual(a.get_message("q").content, "x") + def test_restart_clean(self): + """Verify that we can re-start brokers one by one in a + persistent cluster after a clean oshutdown""" + cluster = self.cluster(0, self.args()) + a = cluster.start("a", expect=EXPECT_EXIT_OK) + b = cluster.start("b", expect=EXPECT_EXIT_OK) + c = cluster.start("c", expect=EXPECT_EXIT_OK) + a.send_message("q", Message("x", durable=True)) + self.stop_cluster(a) + a = cluster.start("a") + b = cluster.start("b") + c = cluster.start("c") + self.assertEqual(c.get_message("q").content, "x") + def test_join_sub_size(self): + """Verify that after starting a cluster with cluster-size=N, + we can join new members even if size < N-1""" + cluster = self.cluster(0, self.args()) + a = cluster.start("a", wait=False, expect=EXPECT_EXIT_FAIL) + b = cluster.start("b", wait=False, expect=EXPECT_EXIT_FAIL) + c = cluster.start("c") + a.send_message("q", Message("x", durable=True)) + a.send_message("q", Message("y", durable=True)) + a.kill() + b.kill() + a = cluster.start("a") + self.assertEqual(c.get_message("q").content, "x") + b = cluster.start("b") + self.assertEqual(c.get_message("q").content, "y") diff --git a/python/qpid/brokertest.py b/python/qpid/brokertest.py index 500d41f85d..26b46ad468 100644 --- a/python/qpid/brokertest.py +++ b/python/qpid/brokertest.py @@ -268,6 +268,7 @@ class Broker(Popen): test.cleanup_stop(self) self._host = "localhost" log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log)) + self._log_ready = False def host(self): return self._host @@ -343,12 +344,14 @@ class Broker(Popen): def log_ready(self): """Return true if the log file exists and contains a broker ready message""" + if self._log_ready: return True if not os.path.exists(self.log): return False - ready_msg = re.compile("notice Broker running") f = file(self.log) try: for l in f: - if ready_msg.search(l): return True + if "notice Broker running" in l: + self._log_ready = True + return True return False finally: f.close() @@ -445,7 +448,7 @@ class BrokerTest(TestCase): if (wait): try: b.ready() except Exception, e: - raise Exception("Failed to start broker %s: %s" % ( b.name, e)) + raise Exception("Failed to start broker %s(%s): %s" % (b.name, b.log, e)) return b def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True): |