summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/include/qpid/framing/Uuid.h4
-rw-r--r--qpid/cpp/src/cluster.mk4
-rw-r--r--qpid/cpp/src/qpid/Plugin.cpp18
-rw-r--r--qpid/cpp/src/qpid/Plugin.h8
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp23
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h1
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp136
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h18
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp3
-rw-r--r--qpid/cpp/src/qpid/cluster/Event.cpp6
-rw-r--r--qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp52
-rw-r--r--qpid/cpp/src/qpid/cluster/InitialStatusMap.h6
-rw-r--r--qpid/cpp/src/qpid/cluster/Multicaster.cpp21
-rw-r--r--qpid/cpp/src/qpid/cluster/Multicaster.h21
-rw-r--r--qpid/cpp/src/qpid/cluster/StoreStatus.cpp96
-rw-r--r--qpid/cpp/src/qpid/cluster/StoreStatus.h61
-rw-r--r--qpid/cpp/src/qpid/framing/Uuid.cpp2
-rw-r--r--qpid/cpp/src/tests/InitialStatusMap.cpp76
-rw-r--r--qpid/cpp/src/tests/StoreStatus.cpp109
-rw-r--r--qpid/cpp/src/tests/cluster.mk3
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py105
-rwxr-xr-xqpid/cpp/src/tests/clustered_replication_test6
-rwxr-xr-xqpid/cpp/src/tests/run_cluster_tests1
-rw-r--r--qpid/cpp/xml/cluster.xml22
24 files changed, 636 insertions, 166 deletions
diff --git a/qpid/cpp/include/qpid/framing/Uuid.h b/qpid/cpp/include/qpid/framing/Uuid.h
index 618515622d..2cca6e9dfe 100644
--- a/qpid/cpp/include/qpid/framing/Uuid.h
+++ b/qpid/cpp/include/qpid/framing/Uuid.h
@@ -61,7 +61,9 @@ struct Uuid : public boost::array<uint8_t, 16> {
void clear();
/** Test for null (all zeros). */
- bool isNull();
+ bool isNull() const;
+ operator bool() const { return !isNull(); }
+ bool operator!() const { return isNull(); }
QPID_COMMON_EXTERN void encode(framing::Buffer& buf) const;
QPID_COMMON_EXTERN void decode(framing::Buffer& buf);
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk
index a2ec661ccf..9da6578df0 100644
--- a/qpid/cpp/src/cluster.mk
+++ b/qpid/cpp/src/cluster.mk
@@ -86,7 +86,9 @@ cluster_la_SOURCES = \
qpid/cluster/InitialStatusMap.cpp \
qpid/cluster/MemberSet.h \
qpid/cluster/MemberSet.cpp \
- qpid/cluster/types.h
+ qpid/cluster/types.h \
+ qpid/cluster/StoreStatus.h \
+ qpid/cluster/StoreStatus.cpp
cluster_la_LIBADD= -lcpg $(libcman) libqpidbroker.la libqpidclient.la
cluster_la_CXXFLAGS = $(AM_CXXFLAGS) -fno-strict-aliasing
diff --git a/qpid/cpp/src/qpid/Plugin.cpp b/qpid/cpp/src/qpid/Plugin.cpp
index 4368e15d27..196b5c2333 100644
--- a/qpid/cpp/src/qpid/Plugin.cpp
+++ b/qpid/cpp/src/qpid/Plugin.cpp
@@ -50,9 +50,16 @@ void Plugin::Target::addFinalizer(const boost::function<void()>& f) {
finalizers.push_back(f);
}
+namespace {
+bool initBefore(const Plugin* a, const Plugin* b) {
+ return a->initOrder() < b->initOrder();
+}
+}
+
Plugin::Plugin() {
// Register myself.
thePlugins().push_back(this);
+ std::sort(thePlugins().begin(), thePlugins().end(), &initBefore);
}
Plugin::~Plugin() {}
@@ -74,7 +81,14 @@ void Plugin::addOptions(Options& opts) {
}
}
-void Plugin::earlyInitAll(Target& t) { each_plugin(boost::bind(&Plugin::earlyInitialize, _1, boost::ref(t))); }
-void Plugin::initializeAll(Target& t) { each_plugin(boost::bind(&Plugin::initialize, _1, boost::ref(t))); }
+int Plugin::initOrder() const { return DEFAULT_INIT_ORDER; }
+
+void Plugin::earlyInitAll(Target& t) {
+ each_plugin(boost::bind(&Plugin::earlyInitialize, _1, boost::ref(t)));
+}
+
+void Plugin::initializeAll(Target& t) {
+ each_plugin(boost::bind(&Plugin::initialize, _1, boost::ref(t)));
+}
} // namespace qpid
diff --git a/qpid/cpp/src/qpid/Plugin.h b/qpid/cpp/src/qpid/Plugin.h
index 88214babb8..4a65ea6059 100644
--- a/qpid/cpp/src/qpid/Plugin.h
+++ b/qpid/cpp/src/qpid/Plugin.h
@@ -37,6 +37,8 @@ struct Options;
class Plugin : private boost::noncopyable {
public:
typedef std::vector<Plugin*> Plugins;
+ /** Default value returned by initOrder() */
+ static const int DEFAULT_INIT_ORDER=1000;
/**
* Base interface for targets that can receive plug-ins.
@@ -99,6 +101,12 @@ class Plugin : private boost::noncopyable {
*/
virtual void initialize(Target&) = 0;
+ /**
+ * Initialization order, lower initOrder() plugins are
+ * initialized first. @see DEFAULT_INIT_ORDER
+ */
+ virtual int initOrder() const;
+
/** List of registered Plugin objects.
* Caller must not delete plugin pointers.
*/
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 58569f5503..3c67c429a0 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -138,7 +138,7 @@ Broker::Broker(const Broker::Options& conf) :
poller(new Poller),
config(conf),
managementAgent(conf.enableMgmt ? new ManagementAgent() : 0),
- store(0),
+ store(new NullMessageStore),
acl(0),
dataDir(conf.noDataDir ? std::string() : conf.dataDir),
queues(this),
@@ -204,17 +204,11 @@ Broker::Broker(const Broker::Options& conf) :
queues.setQueueEvents(&queueEvents);
// Early-Initialize plugins
- const Plugin::Plugins& plugins=Plugin::getPlugins();
- for (Plugin::Plugins::const_iterator i = plugins.begin();
- i != plugins.end();
- i++)
- (*i)->earlyInitialize(*this);
+ Plugin::earlyInitAll(*this);
// If no plugin store module registered itself, set up the null store.
- if (store.get() == 0) {
- boost::shared_ptr<MessageStore> p(new NullMessageStore());
- setStore (p);
- }
+ if (NullMessageStore::isNullStore(store.get()))
+ setStore();
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
@@ -259,10 +253,7 @@ Broker::Broker(const Broker::Options& conf) :
}
// Initialize plugins
- for (Plugin::Plugins::const_iterator i = plugins.begin();
- i != plugins.end();
- i++)
- (*i)->initialize(*this);
+ Plugin::initializeAll(*this);
if (conf.queueCleanInterval) {
queueCleaner.start(conf.queueCleanInterval * qpid::sys::TIME_SEC);
@@ -304,6 +295,10 @@ boost::intrusive_ptr<Broker> Broker::create(const Options& opts)
void Broker::setStore (boost::shared_ptr<MessageStore>& _store)
{
store.reset(new MessageStoreModule (_store));
+ setStore();
+}
+
+void Broker::setStore () {
queues.setStore (store.get());
dtxManager.setStore (store.get());
links.setStore (store.get());
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 5e14aa487d..73d5860cb3 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -142,6 +142,7 @@ public:
typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap;
void declareStandardExchange(const std::string& name, const std::string& type);
+ void setStore ();
boost::shared_ptr<sys::Poller> poller;
sys::Timer timer;
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 5e962e9767..07fdc6fc93 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -83,6 +83,25 @@
* - Connection control events carrying cluster.connection commands.
* - Connection control events carrying non-cluster frames: frames sent to the client.
* e.g. flow-control frames generated on a timer.
+ *
+ * CLUSTER INITIALIZATION OVERVIEW
+ *
+ * When a new member joins the CPG group, all members (including the
+ * new one) multicast their "initial status." The new member is in
+ * INIT mode until it gets a complete set of initial status messages
+ * from all cluster members.
+ *
+ * The newcomer uses initial status to determine
+ * - The cluster UUID
+ * - Am I speaking the correct version of the cluster protocol?
+ * - Do I need to get an update from an existing active member?
+ * - Can I recover from my own store?
+ *
+ * Initialization happens in the Cluster constructor (plugin
+ * early-init phase) because it needs to be done before the store
+ * initializes. In INIT mode sending & receiving from the cluster are
+ * done single-threaded, bypassing the normal PollableQueues because
+ * the Poller is not active at this point to service them.
*/
#include "qpid/cluster/Cluster.h"
#include "qpid/cluster/ClusterSettings.h"
@@ -97,6 +116,7 @@
#include "qmf/org/apache/qpid/cluster/Package.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Connection.h"
+#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SignalHandler.h"
@@ -162,9 +182,12 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {}
void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); }
- void initialStatus(bool active, bool persistent, const Uuid& clusterId,
- uint32_t version, const std::string& url) {
- cluster.initialStatus(member, active, persistent, clusterId, version, url, l);
+
+ void initialStatus(uint32_t version, bool active, const Uuid& clusterId,
+ uint8_t storeState, const Uuid& start, const Uuid& stop)
+ {
+ cluster.initialStatus(member, version, active, clusterId,
+ framing::cluster::StoreState(storeState), start, stop, l);
}
void ready(const std::string& url) { cluster.ready(member, url, l); }
void configChange(const std::string& current) { cluster.configChange(member, current, l); }
@@ -204,11 +227,11 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
"Error delivering frames",
poller),
quorum(boost::bind(&Cluster::leave, this)),
- initialized(false),
decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
discarding(true),
state(INIT),
initMap(self, settings.size),
+ store(broker.getDataDir().getPath()),
lastSize(0),
lastBroker(false),
updateRetracted(false),
@@ -226,12 +249,17 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
failoverExchange.reset(new FailoverExchange(this));
broker.getExchanges().registerExchange(failoverExchange);
- // Update exchange is used during updates to replicate messages without modifying delivery-properties.exchange.
- broker.getExchanges().registerExchange(boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
+ // Update exchange is used during updates to replicate messages
+ // without modifying delivery-properties.exchange.
+ broker.getExchanges().registerExchange(
+ boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
+ // Load my store status before we go into initialization
+ if (! broker::NullMessageStore::isNullStore(&broker.getStore()))
+ store.load();
cpg.join(name);
- // pump the CPG dispatch manually till we get initialized.
- while (!initialized)
+ // Pump the CPG dispatch manually till we get initialized.
+ while (state == INIT)
cpg.dispatchOne();
}
@@ -243,12 +271,24 @@ void Cluster::initialize() {
if (settings.quorum) quorum.start(poller);
if (myUrl.empty())
myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT));
- QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
+ // Cluster constructor will leave us in either READY or JOINER state.
+ switch (state) {
+ case READY:
+ mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
+ break;
+ case JOINER:
+ mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
+ break;
+ default:
+ assert(0);
+ }
+ QPID_LOG(notice, *this << (state == READY ? "joined" : "joining") << " cluster " << name << " with url=" << myUrl);
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
broker.setExpiryPolicy(expiryPolicy);
dispatcher.start();
deliverEventQueue.start();
deliverFrameQueue.start();
+
// Add finalizer last for exception safety.
broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
}
@@ -344,11 +384,21 @@ void Cluster::deliver(
}
void Cluster::deliverEvent(const Event& e) {
- deliverEventQueue.push(e);
+ // During initialization, execute events directly in the same thread.
+ // Once initialized, push to pollable queue to be processed in another thread.
+ if (state == INIT)
+ deliveredEvent(e);
+ else
+ deliverEventQueue.push(e);
}
void Cluster::deliverFrame(const EventFrame& e) {
- deliverFrameQueue.push(e);
+ // During initialization, execute events directly in the same thread.
+ // Once initialized, push to pollable queue to be processed in another thread.
+ if (state == INIT)
+ deliveredFrame(e);
+ else
+ deliverFrameQueue.push(e);
}
const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) {
@@ -524,12 +574,6 @@ void Cluster::configChange (
const cpg_address *joined, int nJoined)
{
Mutex::ScopedLock l(lock);
- if (state == INIT) {
- // FIXME aconway 2009-11-16: persistent restart
- // Recover only if we are first in cluster.
- broker.setRecovery(nCurrent == 1);
- initialized = true;
- }
QPID_LOG(notice, *this << " membership change: "
<< AddrList(current, nCurrent) << "("
<< AddrList(joined, nJoined, "joined: ")
@@ -544,30 +588,42 @@ void Cluster::configChange (
void Cluster::setReady(Lock&) {
state = READY;
if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
- mcast.release();
+ mcast.setReady();
broker.getQueueEvents().enable();
}
void Cluster::initMapCompleted(Lock& l) {
+ // Called on completion of the initial status map.
if (state == INIT) {
+ // We have status for all members so we can make join descisions.
elders = initMap.getElders();
+ QPID_LOG(debug, *this << " elders: " << elders);
if (!elders.empty()) { // I'm not the elder, I don't handle links & replication.
broker.getLinks().setPassive(true);
broker.getQueueEvents().disable();
+ QPID_LOG(info, *this << " not active for links.");
}
+ else {
+ QPID_LOG(info, this << " active for links.");
+ }
+
setClusterId(initMap.getClusterId(), l);
+ // FIXME aconway 2009-11-20: store id == cluster id.
+ // Clean up redundant copy of id in InitialStatus
+ // Use store ID as advertized cluster ID.
+ // Consistency check on cluster ID vs. locally stored ID.
+ // throw rathr than assert in StoreStatus.
+ if (store.hasStore()) store.dirty(clusterId);
if (initMap.isUpdateNeeded()) { // Joining established cluster.
+ broker.setRecovery(false); // Ditch my current store.
state = JOINER;
- mcast.mcastControl(
- ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
}
else { // I can go ready.
- QPID_LOG(notice, *this << " ready.");
discarding = false;
setReady(l);
- map = ClusterMap(initMap.getMemberUrls());
memberUpdate(l);
}
+ QPID_LOG(debug, *this << "Initialization complete");
}
}
@@ -587,9 +643,11 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock&
initMap.configChange(config);
if (initMap.isResendNeeded()) {
mcast.mcastControl(
- // FIXME aconway 2009-11-17: persistent restart, set persistence bit.
- ClusterInitialStatusBody(ProtocolVersion(), state > INIT, false, clusterId,
- CLUSTER_VERSION, myUrl.str()), self);
+ ClusterInitialStatusBody(
+ ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
+ store.getState(), store.getStart(), store.getStop()
+ ),
+ self);
}
if (initMap.transitionToComplete()) initMapCompleted(l);
@@ -597,6 +655,7 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock&
memberUpdate(l);
if (elders.empty()) {
// We are the oldest, reactive links if necessary
+ QPID_LOG(info, this << " becoming active for links.");
broker.getLinks().setPassive(false);
}
}
@@ -628,9 +687,11 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l)
makeOffer(id, l);
}
-void Cluster::initialStatus(const MemberId& member, bool active, bool persistent,
- const framing::Uuid& id, uint32_t version,
- const std::string& url, Lock& l)
+void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active,
+ const framing::Uuid& id,
+ framing::cluster::StoreState store,
+ const framing::Uuid& start, const framing::Uuid& end,
+ Lock& l)
{
if (version != CLUSTER_VERSION) {
QPID_LOG(critical, *this << " incompatible cluster versions " <<
@@ -640,9 +701,13 @@ void Cluster::initialStatus(const MemberId& member, bool active, bool persistent
}
initMap.received(
member,
- ClusterInitialStatusBody(ProtocolVersion(), active, persistent, id, version, url)
+ ClusterInitialStatusBody(
+ ProtocolVersion(), version, active, id, store, start, end)
);
- if (initMap.transitionToComplete()) initMapCompleted(l);
+ if (initMap.transitionToComplete()) {
+ QPID_LOG(debug, *this << " initial status map complete. ");
+ initMapCompleted(l);
+ }
}
void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
@@ -650,7 +715,7 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
memberUpdate(l);
if (state == CATCHUP && id == self) {
setReady(l);
- QPID_LOG(notice, *this << " caught up, active cluster member.");
+ QPID_LOG(notice, *this << " caught up.");
}
}
@@ -770,8 +835,7 @@ void Cluster::updateOutDone(Lock& l) {
QPID_LOG(notice, *this << " update sent");
assert(state == UPDATER);
state = READY;
- mcast.release();
- deliverEventQueue.start(); // Start processing events again.
+ deliverEventQueue.start(); // Start processing events again.
makeOffer(map.firstJoiner(), l); // Try another offer
}
@@ -781,8 +845,10 @@ void Cluster::updateOutError(const std::exception& e) {
updateOutDone(l);
}
-void Cluster ::shutdown(const MemberId& id, Lock& l) {
- QPID_LOG(notice, *this << " received shutdown from " << id);
+void Cluster ::shutdown(const MemberId& , Lock& l) {
+ QPID_LOG(notice, *this << " cluster shut down by administrator.");
+ // FIXME aconway 2009-11-20: incorrect! Need to pass UUID on shutdown command.
+ if (store.hasStore()) store.clean(Uuid(true));
leave(l);
}
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index aff703c081..c1ee0c2be1 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -19,7 +19,6 @@
*
*/
-#include "InitialStatusMap.h"
#include "ClusterMap.h"
#include "ClusterSettings.h"
#include "Cpg.h"
@@ -29,12 +28,14 @@
#include "EventFrame.h"
#include "ExpiryPolicy.h"
#include "FailoverExchange.h"
+#include "InitialStatusMap.h"
#include "LockedConnectionMap.h"
#include "Multicaster.h"
#include "NoOpConnectionOutputHandler.h"
#include "PollableQueue.h"
#include "PollerDispatch.h"
#include "Quorum.h"
+#include "StoreStatus.h"
#include "UpdateReceiver.h"
#include "qmf/org/apache/qpid/cluster/Cluster.h"
@@ -147,9 +148,14 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void updateRequest(const MemberId&, const std::string&, Lock&);
void updateOffer(const MemberId& updater, uint64_t updatee, Lock&);
void retractOffer(const MemberId& updater, uint64_t updatee, Lock&);
- void initialStatus(const MemberId&, bool active, bool persistent,
- const framing::Uuid& id, uint32_t version,
- const std::string& url, Lock&);
+ void initialStatus(const MemberId&,
+ uint32_t version,
+ bool active,
+ const framing::Uuid& id,
+ framing::cluster::StoreState,
+ const framing::Uuid& start,
+ const framing::Uuid& end,
+ Lock&);
void ready(const MemberId&, const std::string&, Lock&);
void configChange(const MemberId&, const std::string& current, Lock& l);
void messageExpired(const MemberId&, uint64_t, Lock& l);
@@ -228,9 +234,6 @@ class Cluster : private Cpg::Handler, public management::Manageable {
Quorum quorum;
LockedConnectionMap localConnections;
- // Used only during initialization
- bool initialized;
-
// Used only in deliverEventQueue thread or when stalled for update.
Decoder decoder;
bool discarding;
@@ -259,6 +262,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
ConnectionMap connections;
InitialStatusMap initMap;
+ StoreStatus store;
ClusterMap map;
MemberSet elders;
size_t lastSize;
diff --git a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 4eec388866..aab05f8ab4 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -126,6 +126,9 @@ struct ClusterPlugin : public Plugin {
ClusterPlugin() : options(settings), cluster(0) {}
+ // Cluster needs to be initialized after the store
+ int initOrder() const { return Plugin::DEFAULT_INIT_ORDER+500; }
+
Options* getOptions() { return &options; }
void earlyInitialize(Plugin::Target& target) {
diff --git a/qpid/cpp/src/qpid/cluster/Event.cpp b/qpid/cpp/src/qpid/cluster/Event.cpp
index 4831e7eabe..52564990f6 100644
--- a/qpid/cpp/src/qpid/cluster/Event.cpp
+++ b/qpid/cpp/src/qpid/cluster/Event.cpp
@@ -115,16 +115,16 @@ const AMQFrame& Event::getFrame() const {
static const char* EVENT_TYPE_NAMES[] = { "data", "control" };
-std::ostream& operator << (std::ostream& o, EventType t) {
+std::ostream& operator<< (std::ostream& o, EventType t) {
return o << EVENT_TYPE_NAMES[t];
}
-std::ostream& operator << (std::ostream& o, const EventHeader& e) {
+std::ostream& operator<< (std::ostream& o, const EventHeader& e) {
return o << "Event[" << e.getConnectionId() << " " << e.getType()
<< " " << e.getSize() << " bytes]";
}
-std::ostream& operator << (std::ostream& o, const Event& e) {
+std::ostream& operator<< (std::ostream& o, const Event& e) {
o << "Event[" << e.getConnectionId() << " ";
if (e.getType() == CONTROL)
o << e.getFrame();
diff --git a/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp b/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
index f2251f4043..51d6140008 100644
--- a/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
+++ b/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
@@ -19,15 +19,17 @@
*
*/
#include "InitialStatusMap.h"
+#include "StoreStatus.h"
#include <algorithm>
#include <boost/bind.hpp>
-using namespace std;
-using namespace boost;
-
namespace qpid {
namespace cluster {
+using namespace std;
+using namespace boost;
+using namespace framing::cluster;
+
InitialStatusMap::InitialStatusMap(const MemberId& self_, size_t size_)
: self(self_), completed(), resendNeeded(), size(size_)
{}
@@ -78,10 +80,6 @@ bool InitialStatusMap::notInitialized(const Map::value_type& v) {
return !v.second;
}
-bool InitialStatusMap::isActive(const Map::value_type& v) {
- return v.second && v.second->getActive();
-}
-
bool InitialStatusMap::isComplete() {
return !map.empty() && find_if(map.begin(), map.end(), &notInitialized) == map.end()
&& (map.size() >= size);
@@ -97,10 +95,35 @@ bool InitialStatusMap::isResendNeeded() {
return ret;
}
+bool InitialStatusMap::isActive(const Map::value_type& v) {
+ return v.second && v.second->getActive();
+}
+
+bool InitialStatusMap::hasStore(const Map::value_type& v) {
+ return v.second &&
+ (v.second->getStoreState() == STORE_STATE_CLEAN_STORE ||
+ v.second->getStoreState() == STORE_STATE_DIRTY_STORE);
+}
+
bool InitialStatusMap::isUpdateNeeded() {
+ // FIXME aconway 2009-11-20: consistency checks isComplete or here?
assert(isComplete());
- // If there are any active members we need an update.
- return find_if(map.begin(), map.end(), &isActive) != map.end();
+ // We need an update if there are any active members.
+ if (find_if(map.begin(), map.end(), &isActive) != map.end()) return true;
+
+ // Otherwise it depends on store status, get my own status:
+ Map::iterator me = map.find(self);
+ assert(me != map.end());
+ assert(me->second);
+ switch (me->second->getStoreState()) {
+ case STORE_STATE_NO_STORE:
+ case STORE_STATE_EMPTY_STORE:
+ // If anybody has a store then we need an update.
+ return find_if(map.begin(), map.end(), &hasStore) != map.end();
+ case STORE_STATE_DIRTY_STORE: return true;
+ case STORE_STATE_CLEAN_STORE: return false; // Use our own store
+ }
+ return false;
}
MemberSet InitialStatusMap::getElders() {
@@ -125,15 +148,4 @@ framing::Uuid InitialStatusMap::getClusterId() {
return map.begin()->second->getClusterId();
}
-std::map<MemberId, Url> InitialStatusMap::getMemberUrls() {
- assert(isComplete());
- assert(!isUpdateNeeded());
- std::map<MemberId, Url> urlMap;
- for (Map::iterator i = map.begin(); i != map.end(); ++i) {
- assert(i->second);
- urlMap.insert(std::make_pair(i->first, i->second->getUrl()));
- }
- return urlMap;
-}
-
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/InitialStatusMap.h b/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
index 9e9b71e363..72963ea2bb 100644
--- a/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
+++ b/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
@@ -57,15 +57,11 @@ class InitialStatusMap
/**@pre isComplete(). @return Cluster-wide cluster ID. */
framing::Uuid getClusterId();
- /**@pre isComplete() && !isUpdateNeeded().
- *@return member->URL map for all members.
- */
- std::map<MemberId, Url> getMemberUrls();
-
private:
typedef std::map<MemberId, boost::optional<Status> > Map;
static bool notInitialized(const Map::value_type&);
static bool isActive(const Map::value_type&);
+ static bool hasStore(const Map::value_type&);
void check();
Map map;
MemberSet firstConfig;
diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.cpp b/qpid/cpp/src/qpid/cluster/Multicaster.cpp
index 72fc1533f8..229d7edb1e 100644
--- a/qpid/cpp/src/qpid/cluster/Multicaster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Multicaster.cpp
@@ -33,36 +33,41 @@ Multicaster::Multicaster(Cpg& cpg_,
boost::function<void()> onError_) :
onError(onError_), cpg(cpg_),
queue(boost::bind(&Multicaster::sendMcast, this, _1), poller),
- holding(true)
+ ready(false)
{
queue.start();
}
void Multicaster::mcastControl(const framing::AMQBody& body, const ConnectionId& id) {
- QPID_LOG(trace, "MCAST " << id << ": " << body);
mcast(Event::control(body, id));
}
void Multicaster::mcastControl(const framing::AMQFrame& frame, const ConnectionId& id) {
- QPID_LOG(trace, "MCAST " << id << ": " << frame);
mcast(Event::control(frame, id));
}
void Multicaster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) {
Event e(DATA, id, size);
memcpy(e.getData(), data, size);
- QPID_LOG(trace, "MCAST " << e);
mcast(e);
}
void Multicaster::mcast(const Event& e) {
{
sys::Mutex::ScopedLock l(lock);
- if (e.isConnection() && holding) {
- holdingQueue.push_back(e);
+ if (!ready) {
+ if (e.isConnection()) holdingQueue.push_back(e);
+ else {
+ iovec iov = e.toIovec();
+ // FIXME aconway 2009-11-23: configurable retry --cluster-retry
+ if (!cpg.mcast(&iov, 1))
+ throw Exception("CPG flow control error during initialization");
+ QPID_LOG(trace, "MCAST (direct) " << e);
+ }
return;
}
}
+ QPID_LOG(trace, "MCAST " << e);
queue.push(e);
}
@@ -88,9 +93,9 @@ Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(co
}
}
-void Multicaster::release() {
+void Multicaster::setReady() {
sys::Mutex::ScopedLock l(lock);
- holding = false;
+ ready = true;
std::for_each(holdingQueue.begin(), holdingQueue.end(), boost::bind(&Multicaster::mcast, this, _1));
holdingQueue.clear();
}
diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.h b/qpid/cpp/src/qpid/cluster/Multicaster.h
index c1a0ddffc6..2db84a9ce0 100644
--- a/qpid/cpp/src/qpid/cluster/Multicaster.h
+++ b/qpid/cpp/src/qpid/cluster/Multicaster.h
@@ -41,11 +41,21 @@ class Cpg;
/**
* Multicast to the cluster. Shared, thread safe object.
+ *
+ * Runs in two modes;
+ *
+ * initializing: Hold connection mcast events. Multicast cluster
+ * events directly in the calling thread. This mode is used before
+ * joining the cluster where the poller may not yet be active and we
+ * want to hold any connection traffic till we join.
+ *
+ * ready: normal operation. Queues all mcasts on a pollable queue,
+ * multicasts connection and cluster events.
*/
class Multicaster
{
public:
- /** Starts in holding mode: connection data events are held, other events are mcast */
+ /** Starts in initializing mode. */
Multicaster(Cpg& cpg_,
const boost::shared_ptr<sys::Poller>&,
boost::function<void()> onError
@@ -54,9 +64,10 @@ class Multicaster
void mcastControl(const framing::AMQFrame& controlFrame, const ConnectionId&);
void mcastBuffer(const char*, size_t, const ConnectionId&);
void mcast(const Event& e);
- /** End holding mode, held events are mcast */
- void release();
-
+
+ /** Switch to ready mode. */
+ void setReady();
+
private:
typedef sys::PollableQueue<Event> PollableEventQueue;
typedef std::deque<Event> PlainEventQueue;
@@ -67,7 +78,7 @@ class Multicaster
boost::function<void()> onError;
Cpg& cpg;
PollableEventQueue queue;
- bool holding;
+ bool ready;
PlainEventQueue holdingQueue;
std::vector<struct ::iovec> ioVector;
};
diff --git a/qpid/cpp/src/qpid/cluster/StoreStatus.cpp b/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
new file mode 100644
index 0000000000..1c5f581ea1
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
@@ -0,0 +1,96 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "StoreStatus.h"
+#include "qpid/Exception.h"
+#include <boost/filesystem/path.hpp>
+#include <boost/filesystem/fstream.hpp>
+#include <boost/filesystem/operations.hpp>
+#include <fstream>
+
+namespace qpid {
+namespace cluster {
+
+using framing::Uuid;
+using namespace framing::cluster;
+using namespace boost::filesystem;
+
+StoreStatus::StoreStatus(const std::string& d)
+ : state(STORE_STATE_NO_STORE), dataDir(d)
+{}
+
+namespace {
+
+const char* SUBDIR="cluster";
+const char* START_FILE="start";
+const char* STOP_FILE="stop";
+
+Uuid loadUuid(const path& path) {
+ Uuid ret;
+ if (exists(path)) {
+ ifstream i(path);
+ i >> ret;
+ }
+ return ret;
+}
+
+void saveUuid(const path& path, const Uuid& uuid) {
+ ofstream o(path);
+ o << uuid;
+}
+
+} // namespace
+
+
+void StoreStatus::load() {
+ path dir = path(dataDir)/SUBDIR;
+ create_directory(dir);
+ start = loadUuid(dir/START_FILE);
+ stop = loadUuid(dir/STOP_FILE);
+
+ if (start && stop) state = STORE_STATE_CLEAN_STORE;
+ else if (start) state = STORE_STATE_DIRTY_STORE;
+ else state = STORE_STATE_EMPTY_STORE;
+}
+
+void StoreStatus::save() {
+ path dir = path(dataDir)/SUBDIR;
+ create_directory(dir);
+ saveUuid(dir/START_FILE, start);
+ saveUuid(dir/STOP_FILE, stop);
+}
+
+void StoreStatus::dirty(const Uuid& start_) {
+ start = start_;
+ stop = Uuid();
+ state = STORE_STATE_DIRTY_STORE;
+ save();
+}
+
+void StoreStatus::clean(const Uuid& stop_) {
+ assert(start); // FIXME aconway 2009-11-20: exception?
+ assert(stop_);
+ state = STORE_STATE_CLEAN_STORE;
+ stop = stop_;
+ save();
+}
+
+}} // namespace qpid::cluster
+
diff --git a/qpid/cpp/src/qpid/cluster/StoreStatus.h b/qpid/cpp/src/qpid/cluster/StoreStatus.h
new file mode 100644
index 0000000000..b4c6bda480
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/StoreStatus.h
@@ -0,0 +1,61 @@
+#ifndef QPID_CLUSTER_STORESTATE_H
+#define QPID_CLUSTER_STORESTATE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/Uuid.h"
+#include "qpid/framing/enum.h"
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * State of the store for cluster purposes.
+ */
+class StoreStatus
+{
+ public:
+ typedef framing::Uuid Uuid;
+ typedef framing::cluster::StoreState StoreState;
+
+ StoreStatus(const std::string& dir);
+
+ framing::cluster::StoreState getState() const { return state; }
+ Uuid getStart() const { return start; }
+ Uuid getStop() const { return stop; }
+
+ void dirty(const Uuid& start); // Start using the store.
+ void clean(const Uuid& stop); // Stop using the store.
+
+ void load();
+ void save();
+
+ bool hasStore() { return state != framing::cluster::STORE_STATE_NO_STORE; }
+ bool isEmpty() { return state != framing::cluster::STORE_STATE_EMPTY_STORE; }
+ private:
+ framing::cluster::StoreState state;
+ Uuid start, stop;
+ std::string dataDir;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_STORESTATE_H*/
diff --git a/qpid/cpp/src/qpid/framing/Uuid.cpp b/qpid/cpp/src/qpid/framing/Uuid.cpp
index f7c13ad8d4..432c7ab94e 100644
--- a/qpid/cpp/src/qpid/framing/Uuid.cpp
+++ b/qpid/cpp/src/qpid/framing/Uuid.cpp
@@ -57,7 +57,7 @@ void Uuid::clear() {
}
// Force int 0/!0 to false/true; avoids compile warnings.
-bool Uuid::isNull() {
+bool Uuid::isNull() const {
return !!uuid_is_null(data());
}
diff --git a/qpid/cpp/src/tests/InitialStatusMap.cpp b/qpid/cpp/src/tests/InitialStatusMap.cpp
index c3587965e5..e6a3ec1620 100644
--- a/qpid/cpp/src/tests/InitialStatusMap.cpp
+++ b/qpid/cpp/src/tests/InitialStatusMap.cpp
@@ -26,6 +26,7 @@
using namespace std;
using namespace qpid::cluster;
using namespace qpid::framing;
+using namespace qpid::framing::cluster;
using namespace boost::assign;
namespace qpid {
@@ -35,8 +36,19 @@ QPID_AUTO_TEST_SUITE(InitialStatusMapTestSuite)
typedef InitialStatusMap::Status Status;
-Status activeStatus(const Uuid& id=Uuid()) { return Status(ProtocolVersion(), true, false, id, 0, ""); }
-Status newcomerStatus(const Uuid& id=Uuid()) { return Status(ProtocolVersion(), false, false, id, 0, ""); }
+Status activeStatus(const Uuid& id=Uuid()) {
+ return Status(ProtocolVersion(), 0, true, id,
+ STORE_STATE_NO_STORE, Uuid(), Uuid());
+}
+
+Status newcomerStatus(const Uuid& id=Uuid()) {
+ return Status(ProtocolVersion(), 0, false, id,
+ STORE_STATE_NO_STORE, Uuid(), Uuid());
+}
+
+Status storeStatus(bool active, StoreState state, Uuid start=Uuid(), Uuid stop=Uuid()) {
+ return Status(ProtocolVersion(), 0, active, Uuid(), state, start, stop);
+}
QPID_AUTO_TEST_CASE(testFirstInCluster) {
// Single member is first in cluster.
@@ -173,6 +185,66 @@ QPID_AUTO_TEST_CASE(testInitialSize) {
BOOST_CHECK(map.isComplete());
}
+QPID_AUTO_TEST_CASE(testAllCleanNoUpdate) {
+ InitialStatusMap map(MemberId(0), 3);
+ map.configChange(list_of<MemberId>(0)(1)(2));
+ map.received(MemberId(0), storeStatus(false, STORE_STATE_CLEAN_STORE));
+ map.received(MemberId(1), storeStatus(false, STORE_STATE_CLEAN_STORE));
+ map.received(MemberId(2), storeStatus(false, STORE_STATE_CLEAN_STORE));
+ BOOST_CHECK(!map.isUpdateNeeded());
+}
+
+QPID_AUTO_TEST_CASE(testAllEmptyNoUpdate) {
+ InitialStatusMap map(MemberId(0), 3);
+ map.configChange(list_of<MemberId>(0)(1)(2));
+ map.received(MemberId(0), storeStatus(false, STORE_STATE_EMPTY_STORE));
+ map.received(MemberId(1), storeStatus(false, STORE_STATE_EMPTY_STORE));
+ map.received(MemberId(2), storeStatus(false, STORE_STATE_EMPTY_STORE));
+ BOOST_CHECK(map.isComplete());
+ BOOST_CHECK(!map.isUpdateNeeded());
+}
+
+QPID_AUTO_TEST_CASE(testAllNoStoreNoUpdate) {
+ InitialStatusMap map(MemberId(0), 3);
+ map.configChange(list_of<MemberId>(0)(1)(2));
+ map.received(MemberId(0), storeStatus(false, STORE_STATE_NO_STORE));
+ map.received(MemberId(1), storeStatus(false, STORE_STATE_NO_STORE));
+ map.received(MemberId(2), storeStatus(false, STORE_STATE_NO_STORE));
+ BOOST_CHECK(map.isComplete());
+ BOOST_CHECK(!map.isUpdateNeeded());
+}
+
+QPID_AUTO_TEST_CASE(testDirtyNeedUpdate) {
+ InitialStatusMap map(MemberId(0), 3);
+ map.configChange(list_of<MemberId>(0)(1)(2));
+ map.received(MemberId(0), storeStatus(false, STORE_STATE_DIRTY_STORE));
+ map.received(MemberId(1), storeStatus(false, STORE_STATE_CLEAN_STORE));
+ map.received(MemberId(2), storeStatus(false, STORE_STATE_CLEAN_STORE));
+ BOOST_CHECK(map.transitionToComplete());
+ BOOST_CHECK(map.isUpdateNeeded());
+}
+
+QPID_AUTO_TEST_CASE(testEmptyNeedUpdate) {
+ InitialStatusMap map(MemberId(0), 3);
+ map.configChange(list_of<MemberId>(0)(1)(2));
+ map.received(MemberId(0), storeStatus(false, STORE_STATE_EMPTY_STORE));
+ map.received(MemberId(1), storeStatus(false, STORE_STATE_CLEAN_STORE));
+ map.received(MemberId(2), storeStatus(false, STORE_STATE_CLEAN_STORE));
+ BOOST_CHECK(map.transitionToComplete());
+ BOOST_CHECK(map.isUpdateNeeded());
+}
+
+QPID_AUTO_TEST_CASE(testEmptyAlone) {
+ InitialStatusMap map(MemberId(0), 1);
+ map.configChange(list_of<MemberId>(0));
+ map.received(MemberId(0), storeStatus(false, STORE_STATE_EMPTY_STORE));
+ BOOST_CHECK(map.transitionToComplete());
+ BOOST_CHECK(!map.isUpdateNeeded());
+}
+
+// FIXME aconway 2009-11-20: consistency tests for mixed stores,
+// tests for manual intervention case.
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
diff --git a/qpid/cpp/src/tests/StoreStatus.cpp b/qpid/cpp/src/tests/StoreStatus.cpp
new file mode 100644
index 0000000000..37ba19e34a
--- /dev/null
+++ b/qpid/cpp/src/tests/StoreStatus.cpp
@@ -0,0 +1,109 @@
+ /*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+#include "unit_test.h"
+#include "test_tools.h"
+#include "qpid/cluster/StoreStatus.h"
+#include "qpid/framing/Uuid.h"
+#include <boost/assign.hpp>
+#include <boost/filesystem/operations.hpp>
+
+using namespace std;
+using namespace qpid::cluster;
+using namespace qpid::framing;
+using namespace qpid::framing::cluster;
+using namespace boost::assign;
+using namespace boost::filesystem;
+
+
+namespace qpid {
+namespace tests {
+
+QPID_AUTO_TEST_SUITE(StoreStatusTestSuite)
+
+const char* TEST_DIR = "StoreStatus.tmp";
+
+QPID_AUTO_TEST_CASE(testLoadEmpty) {
+ create_directory(TEST_DIR);
+ StoreStatus ss(TEST_DIR);
+ BOOST_CHECK_EQUAL(ss.getState(), STORE_STATE_NO_STORE);
+ BOOST_CHECK(!ss.getStart());
+ BOOST_CHECK(!ss.getStop());
+ ss.load();
+ BOOST_CHECK_EQUAL(ss.getState(), STORE_STATE_EMPTY_STORE);
+ BOOST_CHECK(!ss.getStop());
+ remove_all(TEST_DIR);
+}
+
+QPID_AUTO_TEST_CASE(testSaveLoadDirty) {
+ create_directory(TEST_DIR);
+ Uuid start = Uuid(true);
+ StoreStatus ss(TEST_DIR);
+ ss.load();
+ ss.dirty(start);
+ BOOST_CHECK_EQUAL(ss.getState(), STORE_STATE_DIRTY_STORE);
+
+ StoreStatus ss2(TEST_DIR);
+ ss2.load();
+ BOOST_CHECK_EQUAL(ss2.getState(), STORE_STATE_DIRTY_STORE);
+ BOOST_CHECK_EQUAL(ss2.getStart(), start);
+ BOOST_CHECK(!ss2.getStop());
+ remove_all(TEST_DIR);
+}
+
+QPID_AUTO_TEST_CASE(testSaveLoadClean) {
+ create_directory(TEST_DIR);
+ Uuid start = Uuid(true);
+ Uuid stop = Uuid(true);
+ StoreStatus ss(TEST_DIR);
+ ss.load();
+ ss.dirty(start);
+ ss.clean(stop);
+ BOOST_CHECK_EQUAL(ss.getState(), STORE_STATE_CLEAN_STORE);
+
+ StoreStatus ss2(TEST_DIR);
+ ss2.load();
+ BOOST_CHECK_EQUAL(ss2.getState(), STORE_STATE_CLEAN_STORE);
+ BOOST_CHECK_EQUAL(ss2.getStart(), start);
+ BOOST_CHECK_EQUAL(ss2.getStop(), stop);
+ remove_all(TEST_DIR);
+}
+
+QPID_AUTO_TEST_CASE(testMarkDirty) {
+ // Save clean then mark to dirty.
+ create_directory(TEST_DIR);
+ Uuid start = Uuid(true);
+ Uuid stop = Uuid(true);
+ StoreStatus ss(TEST_DIR);
+ ss.load();
+ ss.dirty(start);
+ ss.clean(stop);
+ ss.dirty(start);
+
+ StoreStatus ss2(TEST_DIR);
+ ss2.load();
+ BOOST_CHECK_EQUAL(ss2.getState(), STORE_STATE_DIRTY_STORE);
+ BOOST_CHECK_EQUAL(ss2.getStart(), start);
+ BOOST_CHECK(!ss2.getStop());
+ remove_all(TEST_DIR);
+}
+
+QPID_AUTO_TEST_SUITE_END()
+
+}} // namespace qpid::tests
diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk
index f33f87ee62..20053788e4 100644
--- a/qpid/cpp/src/tests/cluster.mk
+++ b/qpid/cpp/src/tests/cluster.mk
@@ -76,7 +76,8 @@ cluster_test_SOURCES = \
ForkedBroker.cpp \
PartialFailure.cpp \
ClusterFailover.cpp \
- InitialStatusMap.cpp
+ InitialStatusMap.cpp \
+ StoreStatus.cpp
cluster_test_LDADD=$(lib_client) $(lib_broker) ../cluster.la -lboost_unit_test_framework
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index ed39277f77..65c91b1d81 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -18,13 +18,18 @@
# under the License.
#
-import os, signal, sys, time
+import os, signal, sys, time, imp
from qpid import datatypes, messaging
from qpid.brokertest import *
from qpid.harness import Skipped
from qpid.messaging import Message
from threading import Thread
+from logging import getLogger
+log = getLogger("qpid.cluster_tests")
+
+# Import scripts as modules
+qpid_cluster=import_script(checkenv("QPID_CLUSTER_EXEC"))
class ShortTests(BrokerTest):
"""Short cluster functionality tests."""
@@ -34,8 +39,8 @@ class ShortTests(BrokerTest):
# Start a cluster, send some messages to member 0.
cluster = self.cluster(2)
s0 = cluster[0].connect().session()
- s0.sender("q; {create:always}").send(messaging.Message("x"))
- s0.sender("q; {create:always}").send(messaging.Message("y"))
+ s0.sender("q; {create:always}").send(Message("x"))
+ s0.sender("q; {create:always}").send(Message("y"))
s0.connection.close()
# Verify messages available on member 1.
@@ -52,35 +57,6 @@ class ShortTests(BrokerTest):
self.assertEqual("y", m.content)
s2.connection.close()
- def test_cluster_size(self):
- """Verify cluster startup waits for N brokers if --cluster-size=N"""
- class ConnectThread(Thread):
- def __init__(self, broker):
- Thread.__init__(self)
- self.broker=broker
- self.connected = False
- self.error = None
-
- def run(self):
- try:
- self.broker.connect()
- self.connected = True
- except Exception, e: self.error = RethrownException(e)
-
- cluster = self.cluster(1, args=["--cluster-size=3"], wait_for_start=False)
- c = ConnectThread(cluster[0])
- c.start()
- time.sleep(.01)
- assert not c.connected
- cluster.start(wait_for_start=False)
- time.sleep(.01)
- assert not c.connected
- cluster.start(wait_for_start=False)
- c.join(1)
- assert not c.isAlive() # Join didn't time out
- assert c.connected
- if c.error: raise c.error
-
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):
@@ -120,20 +96,22 @@ class StoreTests(BrokerTest):
"""
Cluster tests that can only be run if there is a store available.
"""
- args = ["--load-module",BrokerTest.store_lib]
+ def args(self):
+ assert BrokerTest.store_lib
+ return ["--load-module", BrokerTest.store_lib]
def test_store_loaded(self):
"""Ensure we are indeed loading a working store"""
- broker = self.broker(self.args, name="recoverme", expect=EXPECT_EXIT_FAIL)
- m = messaging.Message("x", durable=True)
+ broker = self.broker(self.args(), name="recoverme", expect=EXPECT_EXIT_FAIL)
+ m = Message("x", durable=True)
broker.send_message("q", m)
broker.kill()
- broker = self.broker(self.args, name="recoverme")
+ broker = self.broker(self.args(), name="recoverme")
self.assertEqual("x", broker.get_message("q").content)
def test_kill_restart(self):
"""Verify we can kill/resetart a broker with store in a cluster"""
- cluster = self.cluster(1, self.args)
+ cluster = self.cluster(1, self.args())
cluster.start("restartme", expect=EXPECT_EXIT_FAIL).kill()
# Send a message, retrieve from the restarted broker
@@ -141,19 +119,42 @@ class StoreTests(BrokerTest):
m = cluster.start("restartme").get_message("q")
self.assertEqual("x", m.content)
- def test_total_shutdown(self):
- """Test we use the correct store to recover after total shutdown"""
- cluster = self.cluster(2, args=self.args, expect=EXPECT_EXIT_FAIL)
- cluster[0].send_message("q", Message("a", durable=True))
- cluster[0].kill()
- self.assertEqual("a", cluster[1].get_message("q").content)
- cluster[1].send_message("q", Message("b", durable=True))
- cluster[1].kill()
-
- # Start 1 first, we should see its store used.
- cluster.start(name=cluster.name+"-1")
- cluster.start(name=cluster.name+"-0")
- self.assertEqual("b", cluster[2].get_message("q").content)
-
+ def test_persistent_restart(self):
+ """Verify persistent cluster shutdown/restart scenarios"""
+ cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"])
+ a = cluster.start("a", expect=EXPECT_EXIT_OK, wait_for_start=False)
+ b = cluster.start("b", expect=EXPECT_EXIT_OK, wait_for_start=False)
+ c = cluster.start("c", expect=EXPECT_EXIT_FAIL, wait_for_start=True)
+ a.send_message("q", Message("1", durable=True))
+ # Kill & restart one member.
+ c.kill()
+ self.assertEqual(a.get_message("q").content, "1")
+ a.send_message("q", Message("2", durable=True))
+ c = cluster.start("c", expect=EXPECT_EXIT_OK)
+ self.assertEqual(c.get_message("q").content, "2")
+ # Shut down the entire cluster cleanly and bring it back up
+ a.send_message("q", Message("3", durable=True))
+ qpid_cluster.main(["qpid-cluster", "-kf", a.host_port()])
+ a = cluster.start("a", wait_for_start=False)
+ b = cluster.start("b", wait_for_start=False)
+ c = cluster.start("c", wait_for_start=True)
+ self.assertEqual(a.get_message("q").content, "3")
+
+ def test_persistent_partial_failure(self):
+ # Kill 2 members, shut down the last cleanly then restart
+ # Ensure we use the clean database
+ cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"])
+ a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait_for_start=False)
+ b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait_for_start=False)
+ c = cluster.start("c", expect=EXPECT_EXIT_OK, wait_for_start=True)
+ a.send_message("q", Message("4", durable=True))
+ a.kill()
+ b.kill()
+ self.assertEqual(c.get_message("q").content, "4")
+ c.send_message("q", Message("clean", durable=True))
+ qpid_cluster.main(["qpid-cluster", "-kf", c.host_port()])
+ a = cluster.start("a", wait_for_start=False)
+ b = cluster.start("b", wait_for_start=False)
+ c = cluster.start("c", wait_for_start=True)
+ self.assertEqual(a.get_message("q").content, "clean")
-
diff --git a/qpid/cpp/src/tests/clustered_replication_test b/qpid/cpp/src/tests/clustered_replication_test
index 4f13b4672c..49d788f41e 100755
--- a/qpid/cpp/src/tests/clustered_replication_test
+++ b/qpid/cpp/src/tests/clustered_replication_test
@@ -54,10 +54,10 @@ if test -d $PYTHON_DIR; then
. $srcdir/ais_check
#todo: these cluster names need to be unique to prevent clashes
- PRIMARY_CLUSTER=PRIMARY_$(hostname)_$(pwd)
- DR_CLUSTER=DR_$(hostname)_$(pwd)
+ PRIMARY_CLUSTER=PRIMARY_$(hostname)_$$
+ DR_CLUSTER=DR_$(hostname)_$$
- GENERAL_OPTS="--auth no --no-module-dir --no-data-dir --daemon --port 0 --log-enable notice+ --log-to-stderr false"
+ GENERAL_OPTS="--auth no --no-module-dir --no-data-dir --daemon --port 0 --log-to-stderr false"
PRIMARY_OPTS="--load-module ../.libs/replicating_listener.so --create-replication-queue true --replication-queue REPLICATION_QUEUE --load-module ../.libs/cluster.so --cluster-name $PRIMARY_CLUSTER"
DR_OPTS="--load-module ../.libs/replication_exchange.so --load-module ../.libs/cluster.so --cluster-name $DR_CLUSTER"
diff --git a/qpid/cpp/src/tests/run_cluster_tests b/qpid/cpp/src/tests/run_cluster_tests
index 9546ddf938..b6c144bb05 100755
--- a/qpid/cpp/src/tests/run_cluster_tests
+++ b/qpid/cpp/src/tests/run_cluster_tests
@@ -37,7 +37,6 @@ mkdir -p $OUTDIR
CLUSTER_TESTS_IGNORE=${CLUSTER_TESTS_IGNORE:--i cluster_tests.StoreTests.* -I $srcdir/cluster_tests.fail}
CLUSTER_TESTS=${CLUSTER_TESTS:-$*}
-set -x
with_ais_group $TEST_EXEC -DOUTDIR=$OUTDIR -m cluster_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1
rm -rf $OUTDIR
#exit 0
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 209143b31b..2c3502ffee 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -47,13 +47,25 @@
<field name="updatee" type="uint64"/>
</control>
- <!-- Status exchanged when new members join the cluster -->
- <control name="initial-status" code="0x4">
+ <!-- Possible states for persistent store -->
+ <domain name="store-state" type="uint8">
+ <enum>
+ <choice name="no-store" value="0"/>
+ <choice name="empty-store" value="1"/>
+ <choice name="clean-store" value="2"/>
+ <choice name="dirty-store" value="3"/>
+ </enum>
+ </domain>
+
+ <!-- Status exchanged when new members join the cluster. -->
+ <control name="initial-status" code="0x5">
+ <field name="version" type="uint32"/>
<field name="active" type="bit"/>
- <field name="persistent" type="bit"/>
<field name="cluster-id" type="uuid"/>>
- <field name="version" type="uint32"/>
- <field name="url" type="str16"/>>
+ <!-- Related to persistent store -->
+ <field name="store-state" type="store-state"/>
+ <field name="start-uuid" type="uuid"/>
+ <field name="stop-uuid" type="uuid"/>
</control>
<!-- New member or updater is ready as an active member. -->