diff options
author | Alan Conway <aconway@apache.org> | 2009-11-24 20:07:24 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-11-24 20:07:24 +0000 |
commit | a53255f11a8f8ee49aadec889981cea03934cc72 (patch) | |
tree | 680d2055af6297cfbf77e4eaa57f3c9f7d78f90d /qpid/cpp/src | |
parent | 892cfdf7c3578d27603c4ae4a54ac5aec101d521 (diff) | |
download | qpid-python-a53255f11a8f8ee49aadec889981cea03934cc72.tar.gz |
Support for restarting a persistent cluster.
Option --cluster-size=N: members wait for N members before recovering store.
Stores marked as clean/dirty. Automatically recover from clean store on restart.
Stores marked with UUID to detect errors.
Not yet implemented: consistency checks, manual recovery from all dirty stores.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@883842 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
22 files changed, 616 insertions, 160 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index a2ec661ccf..9da6578df0 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -86,7 +86,9 @@ cluster_la_SOURCES = \ qpid/cluster/InitialStatusMap.cpp \ qpid/cluster/MemberSet.h \ qpid/cluster/MemberSet.cpp \ - qpid/cluster/types.h + qpid/cluster/types.h \ + qpid/cluster/StoreStatus.h \ + qpid/cluster/StoreStatus.cpp cluster_la_LIBADD= -lcpg $(libcman) libqpidbroker.la libqpidclient.la cluster_la_CXXFLAGS = $(AM_CXXFLAGS) -fno-strict-aliasing diff --git a/qpid/cpp/src/qpid/Plugin.cpp b/qpid/cpp/src/qpid/Plugin.cpp index 4368e15d27..196b5c2333 100644 --- a/qpid/cpp/src/qpid/Plugin.cpp +++ b/qpid/cpp/src/qpid/Plugin.cpp @@ -50,9 +50,16 @@ void Plugin::Target::addFinalizer(const boost::function<void()>& f) { finalizers.push_back(f); } +namespace { +bool initBefore(const Plugin* a, const Plugin* b) { + return a->initOrder() < b->initOrder(); +} +} + Plugin::Plugin() { // Register myself. thePlugins().push_back(this); + std::sort(thePlugins().begin(), thePlugins().end(), &initBefore); } Plugin::~Plugin() {} @@ -74,7 +81,14 @@ void Plugin::addOptions(Options& opts) { } } -void Plugin::earlyInitAll(Target& t) { each_plugin(boost::bind(&Plugin::earlyInitialize, _1, boost::ref(t))); } -void Plugin::initializeAll(Target& t) { each_plugin(boost::bind(&Plugin::initialize, _1, boost::ref(t))); } +int Plugin::initOrder() const { return DEFAULT_INIT_ORDER; } + +void Plugin::earlyInitAll(Target& t) { + each_plugin(boost::bind(&Plugin::earlyInitialize, _1, boost::ref(t))); +} + +void Plugin::initializeAll(Target& t) { + each_plugin(boost::bind(&Plugin::initialize, _1, boost::ref(t))); +} } // namespace qpid diff --git a/qpid/cpp/src/qpid/Plugin.h b/qpid/cpp/src/qpid/Plugin.h index 88214babb8..4a65ea6059 100644 --- a/qpid/cpp/src/qpid/Plugin.h +++ b/qpid/cpp/src/qpid/Plugin.h @@ -37,6 +37,8 @@ struct Options; class Plugin : private boost::noncopyable { public: typedef std::vector<Plugin*> Plugins; + /** Default value returned by initOrder() */ + static const int DEFAULT_INIT_ORDER=1000; /** * Base interface for targets that can receive plug-ins. @@ -99,6 +101,12 @@ class Plugin : private boost::noncopyable { */ virtual void initialize(Target&) = 0; + /** + * Initialization order, lower initOrder() plugins are + * initialized first. @see DEFAULT_INIT_ORDER + */ + virtual int initOrder() const; + /** List of registered Plugin objects. * Caller must not delete plugin pointers. */ diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 58569f5503..3c67c429a0 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -138,7 +138,7 @@ Broker::Broker(const Broker::Options& conf) : poller(new Poller), config(conf), managementAgent(conf.enableMgmt ? new ManagementAgent() : 0), - store(0), + store(new NullMessageStore), acl(0), dataDir(conf.noDataDir ? std::string() : conf.dataDir), queues(this), @@ -204,17 +204,11 @@ Broker::Broker(const Broker::Options& conf) : queues.setQueueEvents(&queueEvents); // Early-Initialize plugins - const Plugin::Plugins& plugins=Plugin::getPlugins(); - for (Plugin::Plugins::const_iterator i = plugins.begin(); - i != plugins.end(); - i++) - (*i)->earlyInitialize(*this); + Plugin::earlyInitAll(*this); // If no plugin store module registered itself, set up the null store. - if (store.get() == 0) { - boost::shared_ptr<MessageStore> p(new NullMessageStore()); - setStore (p); - } + if (NullMessageStore::isNullStore(store.get())) + setStore(); exchanges.declare(empty, DirectExchange::typeName); // Default exchange. @@ -259,10 +253,7 @@ Broker::Broker(const Broker::Options& conf) : } // Initialize plugins - for (Plugin::Plugins::const_iterator i = plugins.begin(); - i != plugins.end(); - i++) - (*i)->initialize(*this); + Plugin::initializeAll(*this); if (conf.queueCleanInterval) { queueCleaner.start(conf.queueCleanInterval * qpid::sys::TIME_SEC); @@ -304,6 +295,10 @@ boost::intrusive_ptr<Broker> Broker::create(const Options& opts) void Broker::setStore (boost::shared_ptr<MessageStore>& _store) { store.reset(new MessageStoreModule (_store)); + setStore(); +} + +void Broker::setStore () { queues.setStore (store.get()); dtxManager.setStore (store.get()); links.setStore (store.get()); diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 5e14aa487d..73d5860cb3 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -142,6 +142,7 @@ public: typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap; void declareStandardExchange(const std::string& name, const std::string& type); + void setStore (); boost::shared_ptr<sys::Poller> poller; sys::Timer timer; diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 5e962e9767..07fdc6fc93 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -83,6 +83,25 @@ * - Connection control events carrying cluster.connection commands. * - Connection control events carrying non-cluster frames: frames sent to the client. * e.g. flow-control frames generated on a timer. + * + * CLUSTER INITIALIZATION OVERVIEW + * + * When a new member joins the CPG group, all members (including the + * new one) multicast their "initial status." The new member is in + * INIT mode until it gets a complete set of initial status messages + * from all cluster members. + * + * The newcomer uses initial status to determine + * - The cluster UUID + * - Am I speaking the correct version of the cluster protocol? + * - Do I need to get an update from an existing active member? + * - Can I recover from my own store? + * + * Initialization happens in the Cluster constructor (plugin + * early-init phase) because it needs to be done before the store + * initializes. In INIT mode sending & receiving from the cluster are + * done single-threaded, bypassing the normal PollableQueues because + * the Poller is not active at this point to service them. */ #include "qpid/cluster/Cluster.h" #include "qpid/cluster/ClusterSettings.h" @@ -97,6 +116,7 @@ #include "qmf/org/apache/qpid/cluster/Package.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Connection.h" +#include "qpid/broker/NullMessageStore.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/SessionState.h" #include "qpid/broker/SignalHandler.h" @@ -162,9 +182,12 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {} void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); } - void initialStatus(bool active, bool persistent, const Uuid& clusterId, - uint32_t version, const std::string& url) { - cluster.initialStatus(member, active, persistent, clusterId, version, url, l); + + void initialStatus(uint32_t version, bool active, const Uuid& clusterId, + uint8_t storeState, const Uuid& start, const Uuid& stop) + { + cluster.initialStatus(member, version, active, clusterId, + framing::cluster::StoreState(storeState), start, stop, l); } void ready(const std::string& url) { cluster.ready(member, url, l); } void configChange(const std::string& current) { cluster.configChange(member, current, l); } @@ -204,11 +227,11 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : "Error delivering frames", poller), quorum(boost::bind(&Cluster::leave, this)), - initialized(false), decoder(boost::bind(&Cluster::deliverFrame, this, _1)), discarding(true), state(INIT), initMap(self, settings.size), + store(broker.getDataDir().getPath()), lastSize(0), lastBroker(false), updateRetracted(false), @@ -226,12 +249,17 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : failoverExchange.reset(new FailoverExchange(this)); broker.getExchanges().registerExchange(failoverExchange); - // Update exchange is used during updates to replicate messages without modifying delivery-properties.exchange. - broker.getExchanges().registerExchange(boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); + // Update exchange is used during updates to replicate messages + // without modifying delivery-properties.exchange. + broker.getExchanges().registerExchange( + boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); + // Load my store status before we go into initialization + if (! broker::NullMessageStore::isNullStore(&broker.getStore())) + store.load(); cpg.join(name); - // pump the CPG dispatch manually till we get initialized. - while (!initialized) + // Pump the CPG dispatch manually till we get initialized. + while (state == INIT) cpg.dispatchOne(); } @@ -243,12 +271,24 @@ void Cluster::initialize() { if (settings.quorum) quorum.start(poller); if (myUrl.empty()) myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT)); - QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl); + // Cluster constructor will leave us in either READY or JOINER state. + switch (state) { + case READY: + mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); + break; + case JOINER: + mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); + break; + default: + assert(0); + } + QPID_LOG(notice, *this << (state == READY ? "joined" : "joining") << " cluster " << name << " with url=" << myUrl); broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); broker.setExpiryPolicy(expiryPolicy); dispatcher.start(); deliverEventQueue.start(); deliverFrameQueue.start(); + // Add finalizer last for exception safety. broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); } @@ -344,11 +384,21 @@ void Cluster::deliver( } void Cluster::deliverEvent(const Event& e) { - deliverEventQueue.push(e); + // During initialization, execute events directly in the same thread. + // Once initialized, push to pollable queue to be processed in another thread. + if (state == INIT) + deliveredEvent(e); + else + deliverEventQueue.push(e); } void Cluster::deliverFrame(const EventFrame& e) { - deliverFrameQueue.push(e); + // During initialization, execute events directly in the same thread. + // Once initialized, push to pollable queue to be processed in another thread. + if (state == INIT) + deliveredFrame(e); + else + deliverFrameQueue.push(e); } const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) { @@ -524,12 +574,6 @@ void Cluster::configChange ( const cpg_address *joined, int nJoined) { Mutex::ScopedLock l(lock); - if (state == INIT) { - // FIXME aconway 2009-11-16: persistent restart - // Recover only if we are first in cluster. - broker.setRecovery(nCurrent == 1); - initialized = true; - } QPID_LOG(notice, *this << " membership change: " << AddrList(current, nCurrent) << "(" << AddrList(joined, nJoined, "joined: ") @@ -544,30 +588,42 @@ void Cluster::configChange ( void Cluster::setReady(Lock&) { state = READY; if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); - mcast.release(); + mcast.setReady(); broker.getQueueEvents().enable(); } void Cluster::initMapCompleted(Lock& l) { + // Called on completion of the initial status map. if (state == INIT) { + // We have status for all members so we can make join descisions. elders = initMap.getElders(); + QPID_LOG(debug, *this << " elders: " << elders); if (!elders.empty()) { // I'm not the elder, I don't handle links & replication. broker.getLinks().setPassive(true); broker.getQueueEvents().disable(); + QPID_LOG(info, *this << " not active for links."); } + else { + QPID_LOG(info, this << " active for links."); + } + setClusterId(initMap.getClusterId(), l); + // FIXME aconway 2009-11-20: store id == cluster id. + // Clean up redundant copy of id in InitialStatus + // Use store ID as advertized cluster ID. + // Consistency check on cluster ID vs. locally stored ID. + // throw rathr than assert in StoreStatus. + if (store.hasStore()) store.dirty(clusterId); if (initMap.isUpdateNeeded()) { // Joining established cluster. + broker.setRecovery(false); // Ditch my current store. state = JOINER; - mcast.mcastControl( - ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); } else { // I can go ready. - QPID_LOG(notice, *this << " ready."); discarding = false; setReady(l); - map = ClusterMap(initMap.getMemberUrls()); memberUpdate(l); } + QPID_LOG(debug, *this << "Initialization complete"); } } @@ -587,9 +643,11 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& initMap.configChange(config); if (initMap.isResendNeeded()) { mcast.mcastControl( - // FIXME aconway 2009-11-17: persistent restart, set persistence bit. - ClusterInitialStatusBody(ProtocolVersion(), state > INIT, false, clusterId, - CLUSTER_VERSION, myUrl.str()), self); + ClusterInitialStatusBody( + ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, + store.getState(), store.getStart(), store.getStop() + ), + self); } if (initMap.transitionToComplete()) initMapCompleted(l); @@ -597,6 +655,7 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& memberUpdate(l); if (elders.empty()) { // We are the oldest, reactive links if necessary + QPID_LOG(info, this << " becoming active for links."); broker.getLinks().setPassive(false); } } @@ -628,9 +687,11 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) makeOffer(id, l); } -void Cluster::initialStatus(const MemberId& member, bool active, bool persistent, - const framing::Uuid& id, uint32_t version, - const std::string& url, Lock& l) +void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active, + const framing::Uuid& id, + framing::cluster::StoreState store, + const framing::Uuid& start, const framing::Uuid& end, + Lock& l) { if (version != CLUSTER_VERSION) { QPID_LOG(critical, *this << " incompatible cluster versions " << @@ -640,9 +701,13 @@ void Cluster::initialStatus(const MemberId& member, bool active, bool persistent } initMap.received( member, - ClusterInitialStatusBody(ProtocolVersion(), active, persistent, id, version, url) + ClusterInitialStatusBody( + ProtocolVersion(), version, active, id, store, start, end) ); - if (initMap.transitionToComplete()) initMapCompleted(l); + if (initMap.transitionToComplete()) { + QPID_LOG(debug, *this << " initial status map complete. "); + initMapCompleted(l); + } } void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { @@ -650,7 +715,7 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { memberUpdate(l); if (state == CATCHUP && id == self) { setReady(l); - QPID_LOG(notice, *this << " caught up, active cluster member."); + QPID_LOG(notice, *this << " caught up."); } } @@ -770,8 +835,7 @@ void Cluster::updateOutDone(Lock& l) { QPID_LOG(notice, *this << " update sent"); assert(state == UPDATER); state = READY; - mcast.release(); - deliverEventQueue.start(); // Start processing events again. + deliverEventQueue.start(); // Start processing events again. makeOffer(map.firstJoiner(), l); // Try another offer } @@ -781,8 +845,10 @@ void Cluster::updateOutError(const std::exception& e) { updateOutDone(l); } -void Cluster ::shutdown(const MemberId& id, Lock& l) { - QPID_LOG(notice, *this << " received shutdown from " << id); +void Cluster ::shutdown(const MemberId& , Lock& l) { + QPID_LOG(notice, *this << " cluster shut down by administrator."); + // FIXME aconway 2009-11-20: incorrect! Need to pass UUID on shutdown command. + if (store.hasStore()) store.clean(Uuid(true)); leave(l); } diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index aff703c081..c1ee0c2be1 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -19,7 +19,6 @@ * */ -#include "InitialStatusMap.h" #include "ClusterMap.h" #include "ClusterSettings.h" #include "Cpg.h" @@ -29,12 +28,14 @@ #include "EventFrame.h" #include "ExpiryPolicy.h" #include "FailoverExchange.h" +#include "InitialStatusMap.h" #include "LockedConnectionMap.h" #include "Multicaster.h" #include "NoOpConnectionOutputHandler.h" #include "PollableQueue.h" #include "PollerDispatch.h" #include "Quorum.h" +#include "StoreStatus.h" #include "UpdateReceiver.h" #include "qmf/org/apache/qpid/cluster/Cluster.h" @@ -147,9 +148,14 @@ class Cluster : private Cpg::Handler, public management::Manageable { void updateRequest(const MemberId&, const std::string&, Lock&); void updateOffer(const MemberId& updater, uint64_t updatee, Lock&); void retractOffer(const MemberId& updater, uint64_t updatee, Lock&); - void initialStatus(const MemberId&, bool active, bool persistent, - const framing::Uuid& id, uint32_t version, - const std::string& url, Lock&); + void initialStatus(const MemberId&, + uint32_t version, + bool active, + const framing::Uuid& id, + framing::cluster::StoreState, + const framing::Uuid& start, + const framing::Uuid& end, + Lock&); void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, const std::string& current, Lock& l); void messageExpired(const MemberId&, uint64_t, Lock& l); @@ -228,9 +234,6 @@ class Cluster : private Cpg::Handler, public management::Manageable { Quorum quorum; LockedConnectionMap localConnections; - // Used only during initialization - bool initialized; - // Used only in deliverEventQueue thread or when stalled for update. Decoder decoder; bool discarding; @@ -259,6 +262,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { ConnectionMap connections; InitialStatusMap initMap; + StoreStatus store; ClusterMap map; MemberSet elders; size_t lastSize; diff --git a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp index 4eec388866..aab05f8ab4 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -126,6 +126,9 @@ struct ClusterPlugin : public Plugin { ClusterPlugin() : options(settings), cluster(0) {} + // Cluster needs to be initialized after the store + int initOrder() const { return Plugin::DEFAULT_INIT_ORDER+500; } + Options* getOptions() { return &options; } void earlyInitialize(Plugin::Target& target) { diff --git a/qpid/cpp/src/qpid/cluster/Event.cpp b/qpid/cpp/src/qpid/cluster/Event.cpp index 4831e7eabe..52564990f6 100644 --- a/qpid/cpp/src/qpid/cluster/Event.cpp +++ b/qpid/cpp/src/qpid/cluster/Event.cpp @@ -115,16 +115,16 @@ const AMQFrame& Event::getFrame() const { static const char* EVENT_TYPE_NAMES[] = { "data", "control" }; -std::ostream& operator << (std::ostream& o, EventType t) { +std::ostream& operator<< (std::ostream& o, EventType t) { return o << EVENT_TYPE_NAMES[t]; } -std::ostream& operator << (std::ostream& o, const EventHeader& e) { +std::ostream& operator<< (std::ostream& o, const EventHeader& e) { return o << "Event[" << e.getConnectionId() << " " << e.getType() << " " << e.getSize() << " bytes]"; } -std::ostream& operator << (std::ostream& o, const Event& e) { +std::ostream& operator<< (std::ostream& o, const Event& e) { o << "Event[" << e.getConnectionId() << " "; if (e.getType() == CONTROL) o << e.getFrame(); diff --git a/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp b/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp index f2251f4043..51d6140008 100644 --- a/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp +++ b/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp @@ -19,15 +19,17 @@ * */ #include "InitialStatusMap.h" +#include "StoreStatus.h" #include <algorithm> #include <boost/bind.hpp> -using namespace std; -using namespace boost; - namespace qpid { namespace cluster { +using namespace std; +using namespace boost; +using namespace framing::cluster; + InitialStatusMap::InitialStatusMap(const MemberId& self_, size_t size_) : self(self_), completed(), resendNeeded(), size(size_) {} @@ -78,10 +80,6 @@ bool InitialStatusMap::notInitialized(const Map::value_type& v) { return !v.second; } -bool InitialStatusMap::isActive(const Map::value_type& v) { - return v.second && v.second->getActive(); -} - bool InitialStatusMap::isComplete() { return !map.empty() && find_if(map.begin(), map.end(), ¬Initialized) == map.end() && (map.size() >= size); @@ -97,10 +95,35 @@ bool InitialStatusMap::isResendNeeded() { return ret; } +bool InitialStatusMap::isActive(const Map::value_type& v) { + return v.second && v.second->getActive(); +} + +bool InitialStatusMap::hasStore(const Map::value_type& v) { + return v.second && + (v.second->getStoreState() == STORE_STATE_CLEAN_STORE || + v.second->getStoreState() == STORE_STATE_DIRTY_STORE); +} + bool InitialStatusMap::isUpdateNeeded() { + // FIXME aconway 2009-11-20: consistency checks isComplete or here? assert(isComplete()); - // If there are any active members we need an update. - return find_if(map.begin(), map.end(), &isActive) != map.end(); + // We need an update if there are any active members. + if (find_if(map.begin(), map.end(), &isActive) != map.end()) return true; + + // Otherwise it depends on store status, get my own status: + Map::iterator me = map.find(self); + assert(me != map.end()); + assert(me->second); + switch (me->second->getStoreState()) { + case STORE_STATE_NO_STORE: + case STORE_STATE_EMPTY_STORE: + // If anybody has a store then we need an update. + return find_if(map.begin(), map.end(), &hasStore) != map.end(); + case STORE_STATE_DIRTY_STORE: return true; + case STORE_STATE_CLEAN_STORE: return false; // Use our own store + } + return false; } MemberSet InitialStatusMap::getElders() { @@ -125,15 +148,4 @@ framing::Uuid InitialStatusMap::getClusterId() { return map.begin()->second->getClusterId(); } -std::map<MemberId, Url> InitialStatusMap::getMemberUrls() { - assert(isComplete()); - assert(!isUpdateNeeded()); - std::map<MemberId, Url> urlMap; - for (Map::iterator i = map.begin(); i != map.end(); ++i) { - assert(i->second); - urlMap.insert(std::make_pair(i->first, i->second->getUrl())); - } - return urlMap; -} - }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/InitialStatusMap.h b/qpid/cpp/src/qpid/cluster/InitialStatusMap.h index 9e9b71e363..72963ea2bb 100644 --- a/qpid/cpp/src/qpid/cluster/InitialStatusMap.h +++ b/qpid/cpp/src/qpid/cluster/InitialStatusMap.h @@ -57,15 +57,11 @@ class InitialStatusMap /**@pre isComplete(). @return Cluster-wide cluster ID. */ framing::Uuid getClusterId(); - /**@pre isComplete() && !isUpdateNeeded(). - *@return member->URL map for all members. - */ - std::map<MemberId, Url> getMemberUrls(); - private: typedef std::map<MemberId, boost::optional<Status> > Map; static bool notInitialized(const Map::value_type&); static bool isActive(const Map::value_type&); + static bool hasStore(const Map::value_type&); void check(); Map map; MemberSet firstConfig; diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.cpp b/qpid/cpp/src/qpid/cluster/Multicaster.cpp index 72fc1533f8..229d7edb1e 100644 --- a/qpid/cpp/src/qpid/cluster/Multicaster.cpp +++ b/qpid/cpp/src/qpid/cluster/Multicaster.cpp @@ -33,36 +33,41 @@ Multicaster::Multicaster(Cpg& cpg_, boost::function<void()> onError_) : onError(onError_), cpg(cpg_), queue(boost::bind(&Multicaster::sendMcast, this, _1), poller), - holding(true) + ready(false) { queue.start(); } void Multicaster::mcastControl(const framing::AMQBody& body, const ConnectionId& id) { - QPID_LOG(trace, "MCAST " << id << ": " << body); mcast(Event::control(body, id)); } void Multicaster::mcastControl(const framing::AMQFrame& frame, const ConnectionId& id) { - QPID_LOG(trace, "MCAST " << id << ": " << frame); mcast(Event::control(frame, id)); } void Multicaster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) { Event e(DATA, id, size); memcpy(e.getData(), data, size); - QPID_LOG(trace, "MCAST " << e); mcast(e); } void Multicaster::mcast(const Event& e) { { sys::Mutex::ScopedLock l(lock); - if (e.isConnection() && holding) { - holdingQueue.push_back(e); + if (!ready) { + if (e.isConnection()) holdingQueue.push_back(e); + else { + iovec iov = e.toIovec(); + // FIXME aconway 2009-11-23: configurable retry --cluster-retry + if (!cpg.mcast(&iov, 1)) + throw Exception("CPG flow control error during initialization"); + QPID_LOG(trace, "MCAST (direct) " << e); + } return; } } + QPID_LOG(trace, "MCAST " << e); queue.push(e); } @@ -88,9 +93,9 @@ Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(co } } -void Multicaster::release() { +void Multicaster::setReady() { sys::Mutex::ScopedLock l(lock); - holding = false; + ready = true; std::for_each(holdingQueue.begin(), holdingQueue.end(), boost::bind(&Multicaster::mcast, this, _1)); holdingQueue.clear(); } diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.h b/qpid/cpp/src/qpid/cluster/Multicaster.h index c1a0ddffc6..2db84a9ce0 100644 --- a/qpid/cpp/src/qpid/cluster/Multicaster.h +++ b/qpid/cpp/src/qpid/cluster/Multicaster.h @@ -41,11 +41,21 @@ class Cpg; /** * Multicast to the cluster. Shared, thread safe object. + * + * Runs in two modes; + * + * initializing: Hold connection mcast events. Multicast cluster + * events directly in the calling thread. This mode is used before + * joining the cluster where the poller may not yet be active and we + * want to hold any connection traffic till we join. + * + * ready: normal operation. Queues all mcasts on a pollable queue, + * multicasts connection and cluster events. */ class Multicaster { public: - /** Starts in holding mode: connection data events are held, other events are mcast */ + /** Starts in initializing mode. */ Multicaster(Cpg& cpg_, const boost::shared_ptr<sys::Poller>&, boost::function<void()> onError @@ -54,9 +64,10 @@ class Multicaster void mcastControl(const framing::AMQFrame& controlFrame, const ConnectionId&); void mcastBuffer(const char*, size_t, const ConnectionId&); void mcast(const Event& e); - /** End holding mode, held events are mcast */ - void release(); - + + /** Switch to ready mode. */ + void setReady(); + private: typedef sys::PollableQueue<Event> PollableEventQueue; typedef std::deque<Event> PlainEventQueue; @@ -67,7 +78,7 @@ class Multicaster boost::function<void()> onError; Cpg& cpg; PollableEventQueue queue; - bool holding; + bool ready; PlainEventQueue holdingQueue; std::vector<struct ::iovec> ioVector; }; diff --git a/qpid/cpp/src/qpid/cluster/StoreStatus.cpp b/qpid/cpp/src/qpid/cluster/StoreStatus.cpp new file mode 100644 index 0000000000..1c5f581ea1 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/StoreStatus.cpp @@ -0,0 +1,96 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "StoreStatus.h" +#include "qpid/Exception.h" +#include <boost/filesystem/path.hpp> +#include <boost/filesystem/fstream.hpp> +#include <boost/filesystem/operations.hpp> +#include <fstream> + +namespace qpid { +namespace cluster { + +using framing::Uuid; +using namespace framing::cluster; +using namespace boost::filesystem; + +StoreStatus::StoreStatus(const std::string& d) + : state(STORE_STATE_NO_STORE), dataDir(d) +{} + +namespace { + +const char* SUBDIR="cluster"; +const char* START_FILE="start"; +const char* STOP_FILE="stop"; + +Uuid loadUuid(const path& path) { + Uuid ret; + if (exists(path)) { + ifstream i(path); + i >> ret; + } + return ret; +} + +void saveUuid(const path& path, const Uuid& uuid) { + ofstream o(path); + o << uuid; +} + +} // namespace + + +void StoreStatus::load() { + path dir = path(dataDir)/SUBDIR; + create_directory(dir); + start = loadUuid(dir/START_FILE); + stop = loadUuid(dir/STOP_FILE); + + if (start && stop) state = STORE_STATE_CLEAN_STORE; + else if (start) state = STORE_STATE_DIRTY_STORE; + else state = STORE_STATE_EMPTY_STORE; +} + +void StoreStatus::save() { + path dir = path(dataDir)/SUBDIR; + create_directory(dir); + saveUuid(dir/START_FILE, start); + saveUuid(dir/STOP_FILE, stop); +} + +void StoreStatus::dirty(const Uuid& start_) { + start = start_; + stop = Uuid(); + state = STORE_STATE_DIRTY_STORE; + save(); +} + +void StoreStatus::clean(const Uuid& stop_) { + assert(start); // FIXME aconway 2009-11-20: exception? + assert(stop_); + state = STORE_STATE_CLEAN_STORE; + stop = stop_; + save(); +} + +}} // namespace qpid::cluster + diff --git a/qpid/cpp/src/qpid/cluster/StoreStatus.h b/qpid/cpp/src/qpid/cluster/StoreStatus.h new file mode 100644 index 0000000000..b4c6bda480 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/StoreStatus.h @@ -0,0 +1,61 @@ +#ifndef QPID_CLUSTER_STORESTATE_H +#define QPID_CLUSTER_STORESTATE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/Uuid.h" +#include "qpid/framing/enum.h" + +namespace qpid { +namespace cluster { + +/** + * State of the store for cluster purposes. + */ +class StoreStatus +{ + public: + typedef framing::Uuid Uuid; + typedef framing::cluster::StoreState StoreState; + + StoreStatus(const std::string& dir); + + framing::cluster::StoreState getState() const { return state; } + Uuid getStart() const { return start; } + Uuid getStop() const { return stop; } + + void dirty(const Uuid& start); // Start using the store. + void clean(const Uuid& stop); // Stop using the store. + + void load(); + void save(); + + bool hasStore() { return state != framing::cluster::STORE_STATE_NO_STORE; } + bool isEmpty() { return state != framing::cluster::STORE_STATE_EMPTY_STORE; } + private: + framing::cluster::StoreState state; + Uuid start, stop; + std::string dataDir; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_STORESTATE_H*/ diff --git a/qpid/cpp/src/qpid/framing/Uuid.cpp b/qpid/cpp/src/qpid/framing/Uuid.cpp index f7c13ad8d4..432c7ab94e 100644 --- a/qpid/cpp/src/qpid/framing/Uuid.cpp +++ b/qpid/cpp/src/qpid/framing/Uuid.cpp @@ -57,7 +57,7 @@ void Uuid::clear() { } // Force int 0/!0 to false/true; avoids compile warnings. -bool Uuid::isNull() { +bool Uuid::isNull() const { return !!uuid_is_null(data()); } diff --git a/qpid/cpp/src/tests/InitialStatusMap.cpp b/qpid/cpp/src/tests/InitialStatusMap.cpp index c3587965e5..e6a3ec1620 100644 --- a/qpid/cpp/src/tests/InitialStatusMap.cpp +++ b/qpid/cpp/src/tests/InitialStatusMap.cpp @@ -26,6 +26,7 @@ using namespace std; using namespace qpid::cluster; using namespace qpid::framing; +using namespace qpid::framing::cluster; using namespace boost::assign; namespace qpid { @@ -35,8 +36,19 @@ QPID_AUTO_TEST_SUITE(InitialStatusMapTestSuite) typedef InitialStatusMap::Status Status; -Status activeStatus(const Uuid& id=Uuid()) { return Status(ProtocolVersion(), true, false, id, 0, ""); } -Status newcomerStatus(const Uuid& id=Uuid()) { return Status(ProtocolVersion(), false, false, id, 0, ""); } +Status activeStatus(const Uuid& id=Uuid()) { + return Status(ProtocolVersion(), 0, true, id, + STORE_STATE_NO_STORE, Uuid(), Uuid()); +} + +Status newcomerStatus(const Uuid& id=Uuid()) { + return Status(ProtocolVersion(), 0, false, id, + STORE_STATE_NO_STORE, Uuid(), Uuid()); +} + +Status storeStatus(bool active, StoreState state, Uuid start=Uuid(), Uuid stop=Uuid()) { + return Status(ProtocolVersion(), 0, active, Uuid(), state, start, stop); +} QPID_AUTO_TEST_CASE(testFirstInCluster) { // Single member is first in cluster. @@ -173,6 +185,66 @@ QPID_AUTO_TEST_CASE(testInitialSize) { BOOST_CHECK(map.isComplete()); } +QPID_AUTO_TEST_CASE(testAllCleanNoUpdate) { + InitialStatusMap map(MemberId(0), 3); + map.configChange(list_of<MemberId>(0)(1)(2)); + map.received(MemberId(0), storeStatus(false, STORE_STATE_CLEAN_STORE)); + map.received(MemberId(1), storeStatus(false, STORE_STATE_CLEAN_STORE)); + map.received(MemberId(2), storeStatus(false, STORE_STATE_CLEAN_STORE)); + BOOST_CHECK(!map.isUpdateNeeded()); +} + +QPID_AUTO_TEST_CASE(testAllEmptyNoUpdate) { + InitialStatusMap map(MemberId(0), 3); + map.configChange(list_of<MemberId>(0)(1)(2)); + map.received(MemberId(0), storeStatus(false, STORE_STATE_EMPTY_STORE)); + map.received(MemberId(1), storeStatus(false, STORE_STATE_EMPTY_STORE)); + map.received(MemberId(2), storeStatus(false, STORE_STATE_EMPTY_STORE)); + BOOST_CHECK(map.isComplete()); + BOOST_CHECK(!map.isUpdateNeeded()); +} + +QPID_AUTO_TEST_CASE(testAllNoStoreNoUpdate) { + InitialStatusMap map(MemberId(0), 3); + map.configChange(list_of<MemberId>(0)(1)(2)); + map.received(MemberId(0), storeStatus(false, STORE_STATE_NO_STORE)); + map.received(MemberId(1), storeStatus(false, STORE_STATE_NO_STORE)); + map.received(MemberId(2), storeStatus(false, STORE_STATE_NO_STORE)); + BOOST_CHECK(map.isComplete()); + BOOST_CHECK(!map.isUpdateNeeded()); +} + +QPID_AUTO_TEST_CASE(testDirtyNeedUpdate) { + InitialStatusMap map(MemberId(0), 3); + map.configChange(list_of<MemberId>(0)(1)(2)); + map.received(MemberId(0), storeStatus(false, STORE_STATE_DIRTY_STORE)); + map.received(MemberId(1), storeStatus(false, STORE_STATE_CLEAN_STORE)); + map.received(MemberId(2), storeStatus(false, STORE_STATE_CLEAN_STORE)); + BOOST_CHECK(map.transitionToComplete()); + BOOST_CHECK(map.isUpdateNeeded()); +} + +QPID_AUTO_TEST_CASE(testEmptyNeedUpdate) { + InitialStatusMap map(MemberId(0), 3); + map.configChange(list_of<MemberId>(0)(1)(2)); + map.received(MemberId(0), storeStatus(false, STORE_STATE_EMPTY_STORE)); + map.received(MemberId(1), storeStatus(false, STORE_STATE_CLEAN_STORE)); + map.received(MemberId(2), storeStatus(false, STORE_STATE_CLEAN_STORE)); + BOOST_CHECK(map.transitionToComplete()); + BOOST_CHECK(map.isUpdateNeeded()); +} + +QPID_AUTO_TEST_CASE(testEmptyAlone) { + InitialStatusMap map(MemberId(0), 1); + map.configChange(list_of<MemberId>(0)); + map.received(MemberId(0), storeStatus(false, STORE_STATE_EMPTY_STORE)); + BOOST_CHECK(map.transitionToComplete()); + BOOST_CHECK(!map.isUpdateNeeded()); +} + +// FIXME aconway 2009-11-20: consistency tests for mixed stores, +// tests for manual intervention case. + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/StoreStatus.cpp b/qpid/cpp/src/tests/StoreStatus.cpp new file mode 100644 index 0000000000..37ba19e34a --- /dev/null +++ b/qpid/cpp/src/tests/StoreStatus.cpp @@ -0,0 +1,109 @@ + /* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + +#include "unit_test.h" +#include "test_tools.h" +#include "qpid/cluster/StoreStatus.h" +#include "qpid/framing/Uuid.h" +#include <boost/assign.hpp> +#include <boost/filesystem/operations.hpp> + +using namespace std; +using namespace qpid::cluster; +using namespace qpid::framing; +using namespace qpid::framing::cluster; +using namespace boost::assign; +using namespace boost::filesystem; + + +namespace qpid { +namespace tests { + +QPID_AUTO_TEST_SUITE(StoreStatusTestSuite) + +const char* TEST_DIR = "StoreStatus.tmp"; + +QPID_AUTO_TEST_CASE(testLoadEmpty) { + create_directory(TEST_DIR); + StoreStatus ss(TEST_DIR); + BOOST_CHECK_EQUAL(ss.getState(), STORE_STATE_NO_STORE); + BOOST_CHECK(!ss.getStart()); + BOOST_CHECK(!ss.getStop()); + ss.load(); + BOOST_CHECK_EQUAL(ss.getState(), STORE_STATE_EMPTY_STORE); + BOOST_CHECK(!ss.getStop()); + remove_all(TEST_DIR); +} + +QPID_AUTO_TEST_CASE(testSaveLoadDirty) { + create_directory(TEST_DIR); + Uuid start = Uuid(true); + StoreStatus ss(TEST_DIR); + ss.load(); + ss.dirty(start); + BOOST_CHECK_EQUAL(ss.getState(), STORE_STATE_DIRTY_STORE); + + StoreStatus ss2(TEST_DIR); + ss2.load(); + BOOST_CHECK_EQUAL(ss2.getState(), STORE_STATE_DIRTY_STORE); + BOOST_CHECK_EQUAL(ss2.getStart(), start); + BOOST_CHECK(!ss2.getStop()); + remove_all(TEST_DIR); +} + +QPID_AUTO_TEST_CASE(testSaveLoadClean) { + create_directory(TEST_DIR); + Uuid start = Uuid(true); + Uuid stop = Uuid(true); + StoreStatus ss(TEST_DIR); + ss.load(); + ss.dirty(start); + ss.clean(stop); + BOOST_CHECK_EQUAL(ss.getState(), STORE_STATE_CLEAN_STORE); + + StoreStatus ss2(TEST_DIR); + ss2.load(); + BOOST_CHECK_EQUAL(ss2.getState(), STORE_STATE_CLEAN_STORE); + BOOST_CHECK_EQUAL(ss2.getStart(), start); + BOOST_CHECK_EQUAL(ss2.getStop(), stop); + remove_all(TEST_DIR); +} + +QPID_AUTO_TEST_CASE(testMarkDirty) { + // Save clean then mark to dirty. + create_directory(TEST_DIR); + Uuid start = Uuid(true); + Uuid stop = Uuid(true); + StoreStatus ss(TEST_DIR); + ss.load(); + ss.dirty(start); + ss.clean(stop); + ss.dirty(start); + + StoreStatus ss2(TEST_DIR); + ss2.load(); + BOOST_CHECK_EQUAL(ss2.getState(), STORE_STATE_DIRTY_STORE); + BOOST_CHECK_EQUAL(ss2.getStart(), start); + BOOST_CHECK(!ss2.getStop()); + remove_all(TEST_DIR); +} + +QPID_AUTO_TEST_SUITE_END() + +}} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk index f33f87ee62..20053788e4 100644 --- a/qpid/cpp/src/tests/cluster.mk +++ b/qpid/cpp/src/tests/cluster.mk @@ -76,7 +76,8 @@ cluster_test_SOURCES = \ ForkedBroker.cpp \ PartialFailure.cpp \ ClusterFailover.cpp \ - InitialStatusMap.cpp + InitialStatusMap.cpp \ + StoreStatus.cpp cluster_test_LDADD=$(lib_client) $(lib_broker) ../cluster.la -lboost_unit_test_framework diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index ed39277f77..65c91b1d81 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -18,13 +18,18 @@ # under the License. # -import os, signal, sys, time +import os, signal, sys, time, imp from qpid import datatypes, messaging from qpid.brokertest import * from qpid.harness import Skipped from qpid.messaging import Message from threading import Thread +from logging import getLogger +log = getLogger("qpid.cluster_tests") + +# Import scripts as modules +qpid_cluster=import_script(checkenv("QPID_CLUSTER_EXEC")) class ShortTests(BrokerTest): """Short cluster functionality tests.""" @@ -34,8 +39,8 @@ class ShortTests(BrokerTest): # Start a cluster, send some messages to member 0. cluster = self.cluster(2) s0 = cluster[0].connect().session() - s0.sender("q; {create:always}").send(messaging.Message("x")) - s0.sender("q; {create:always}").send(messaging.Message("y")) + s0.sender("q; {create:always}").send(Message("x")) + s0.sender("q; {create:always}").send(Message("y")) s0.connection.close() # Verify messages available on member 1. @@ -52,35 +57,6 @@ class ShortTests(BrokerTest): self.assertEqual("y", m.content) s2.connection.close() - def test_cluster_size(self): - """Verify cluster startup waits for N brokers if --cluster-size=N""" - class ConnectThread(Thread): - def __init__(self, broker): - Thread.__init__(self) - self.broker=broker - self.connected = False - self.error = None - - def run(self): - try: - self.broker.connect() - self.connected = True - except Exception, e: self.error = RethrownException(e) - - cluster = self.cluster(1, args=["--cluster-size=3"], wait_for_start=False) - c = ConnectThread(cluster[0]) - c.start() - time.sleep(.01) - assert not c.connected - cluster.start(wait_for_start=False) - time.sleep(.01) - assert not c.connected - cluster.start(wait_for_start=False) - c.join(1) - assert not c.isAlive() # Join didn't time out - assert c.connected - if c.error: raise c.error - class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): @@ -120,20 +96,22 @@ class StoreTests(BrokerTest): """ Cluster tests that can only be run if there is a store available. """ - args = ["--load-module",BrokerTest.store_lib] + def args(self): + assert BrokerTest.store_lib + return ["--load-module", BrokerTest.store_lib] def test_store_loaded(self): """Ensure we are indeed loading a working store""" - broker = self.broker(self.args, name="recoverme", expect=EXPECT_EXIT_FAIL) - m = messaging.Message("x", durable=True) + broker = self.broker(self.args(), name="recoverme", expect=EXPECT_EXIT_FAIL) + m = Message("x", durable=True) broker.send_message("q", m) broker.kill() - broker = self.broker(self.args, name="recoverme") + broker = self.broker(self.args(), name="recoverme") self.assertEqual("x", broker.get_message("q").content) def test_kill_restart(self): """Verify we can kill/resetart a broker with store in a cluster""" - cluster = self.cluster(1, self.args) + cluster = self.cluster(1, self.args()) cluster.start("restartme", expect=EXPECT_EXIT_FAIL).kill() # Send a message, retrieve from the restarted broker @@ -141,19 +119,42 @@ class StoreTests(BrokerTest): m = cluster.start("restartme").get_message("q") self.assertEqual("x", m.content) - def test_total_shutdown(self): - """Test we use the correct store to recover after total shutdown""" - cluster = self.cluster(2, args=self.args, expect=EXPECT_EXIT_FAIL) - cluster[0].send_message("q", Message("a", durable=True)) - cluster[0].kill() - self.assertEqual("a", cluster[1].get_message("q").content) - cluster[1].send_message("q", Message("b", durable=True)) - cluster[1].kill() - - # Start 1 first, we should see its store used. - cluster.start(name=cluster.name+"-1") - cluster.start(name=cluster.name+"-0") - self.assertEqual("b", cluster[2].get_message("q").content) - + def test_persistent_restart(self): + """Verify persistent cluster shutdown/restart scenarios""" + cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"]) + a = cluster.start("a", expect=EXPECT_EXIT_OK, wait_for_start=False) + b = cluster.start("b", expect=EXPECT_EXIT_OK, wait_for_start=False) + c = cluster.start("c", expect=EXPECT_EXIT_FAIL, wait_for_start=True) + a.send_message("q", Message("1", durable=True)) + # Kill & restart one member. + c.kill() + self.assertEqual(a.get_message("q").content, "1") + a.send_message("q", Message("2", durable=True)) + c = cluster.start("c", expect=EXPECT_EXIT_OK) + self.assertEqual(c.get_message("q").content, "2") + # Shut down the entire cluster cleanly and bring it back up + a.send_message("q", Message("3", durable=True)) + qpid_cluster.main(["qpid-cluster", "-kf", a.host_port()]) + a = cluster.start("a", wait_for_start=False) + b = cluster.start("b", wait_for_start=False) + c = cluster.start("c", wait_for_start=True) + self.assertEqual(a.get_message("q").content, "3") + + def test_persistent_partial_failure(self): + # Kill 2 members, shut down the last cleanly then restart + # Ensure we use the clean database + cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"]) + a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait_for_start=False) + b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait_for_start=False) + c = cluster.start("c", expect=EXPECT_EXIT_OK, wait_for_start=True) + a.send_message("q", Message("4", durable=True)) + a.kill() + b.kill() + self.assertEqual(c.get_message("q").content, "4") + c.send_message("q", Message("clean", durable=True)) + qpid_cluster.main(["qpid-cluster", "-kf", c.host_port()]) + a = cluster.start("a", wait_for_start=False) + b = cluster.start("b", wait_for_start=False) + c = cluster.start("c", wait_for_start=True) + self.assertEqual(a.get_message("q").content, "clean") - diff --git a/qpid/cpp/src/tests/clustered_replication_test b/qpid/cpp/src/tests/clustered_replication_test index 4f13b4672c..49d788f41e 100755 --- a/qpid/cpp/src/tests/clustered_replication_test +++ b/qpid/cpp/src/tests/clustered_replication_test @@ -54,10 +54,10 @@ if test -d $PYTHON_DIR; then . $srcdir/ais_check #todo: these cluster names need to be unique to prevent clashes - PRIMARY_CLUSTER=PRIMARY_$(hostname)_$(pwd) - DR_CLUSTER=DR_$(hostname)_$(pwd) + PRIMARY_CLUSTER=PRIMARY_$(hostname)_$$ + DR_CLUSTER=DR_$(hostname)_$$ - GENERAL_OPTS="--auth no --no-module-dir --no-data-dir --daemon --port 0 --log-enable notice+ --log-to-stderr false" + GENERAL_OPTS="--auth no --no-module-dir --no-data-dir --daemon --port 0 --log-to-stderr false" PRIMARY_OPTS="--load-module ../.libs/replicating_listener.so --create-replication-queue true --replication-queue REPLICATION_QUEUE --load-module ../.libs/cluster.so --cluster-name $PRIMARY_CLUSTER" DR_OPTS="--load-module ../.libs/replication_exchange.so --load-module ../.libs/cluster.so --cluster-name $DR_CLUSTER" diff --git a/qpid/cpp/src/tests/run_cluster_tests b/qpid/cpp/src/tests/run_cluster_tests index 9546ddf938..b6c144bb05 100755 --- a/qpid/cpp/src/tests/run_cluster_tests +++ b/qpid/cpp/src/tests/run_cluster_tests @@ -37,7 +37,6 @@ mkdir -p $OUTDIR CLUSTER_TESTS_IGNORE=${CLUSTER_TESTS_IGNORE:--i cluster_tests.StoreTests.* -I $srcdir/cluster_tests.fail} CLUSTER_TESTS=${CLUSTER_TESTS:-$*} -set -x with_ais_group $TEST_EXEC -DOUTDIR=$OUTDIR -m cluster_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1 rm -rf $OUTDIR #exit 0 |