summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp6
-rw-r--r--cpp/src/qpid/cluster/StoreStatus.cpp6
-rw-r--r--cpp/src/qpid/cluster/StoreStatus.h6
-rwxr-xr-xcpp/src/tests/cluster_tests.py54
4 files changed, 55 insertions, 17 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index e718819f48..08646e5a6b 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -914,6 +914,12 @@ void Cluster::memberUpdate(Lock& l) {
size_t size = urls.size();
failoverExchange->updateUrls(urls);
+ if (store.hasStore()) {
+ // Mark store clean if I am the only broker, dirty otherwise.
+ if (size == 1) store.clean(Uuid(true));
+ else store.dirty(clusterId);
+ }
+
if (size == 1 && lastSize > 1 && state >= CATCHUP) {
QPID_LOG(notice, *this << " last broker standing, update queue policies");
lastBroker = true;
diff --git a/cpp/src/qpid/cluster/StoreStatus.cpp b/cpp/src/qpid/cluster/StoreStatus.cpp
index 947f81d596..648fcfbbd5 100644
--- a/cpp/src/qpid/cluster/StoreStatus.cpp
+++ b/cpp/src/qpid/cluster/StoreStatus.cpp
@@ -114,7 +114,12 @@ void StoreStatus::save() {
}
}
+bool StoreStatus::hasStore() const {
+ return state != framing::cluster::STORE_STATE_NO_STORE;
+}
+
void StoreStatus::dirty(const Uuid& clusterId_) {
+ if (!hasStore()) return;
assert(clusterId_);
clusterId = clusterId_;
shutdownId = Uuid();
@@ -123,6 +128,7 @@ void StoreStatus::dirty(const Uuid& clusterId_) {
}
void StoreStatus::clean(const Uuid& shutdownId_) {
+ if (!hasStore()) return;
assert(shutdownId_);
state = STORE_STATE_CLEAN_STORE;
shutdownId = shutdownId_;
diff --git a/cpp/src/qpid/cluster/StoreStatus.h b/cpp/src/qpid/cluster/StoreStatus.h
index 911b3a2ba2..2371f0424e 100644
--- a/cpp/src/qpid/cluster/StoreStatus.h
+++ b/cpp/src/qpid/cluster/StoreStatus.h
@@ -46,14 +46,14 @@ class StoreStatus
const Uuid& getShutdownId() const { return shutdownId; }
framing::SequenceNumber getConfigSeq() const { return configSeq; }
- void dirty(const Uuid& start); // Start using the store.
- void clean(const Uuid& stop); // Stop using the store.
+ void dirty(const Uuid& clusterId); // Mark the store in use by clusterId.
+ void clean(const Uuid& shutdownId); // Mark the store clean at shutdownId
void setConfigSeq(framing::SequenceNumber seq); // Update the config seq number.
void load();
void save();
- bool hasStore() { return state != framing::cluster::STORE_STATE_NO_STORE; }
+ bool hasStore() const;
private:
framing::cluster::StoreState state;
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index b3274b1b1e..22b7c8f5b8 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -305,21 +305,47 @@ class StoreTests(BrokerTest):
self.assertRaises(Exception, lambda: a.ready())
self.assertRaises(Exception, lambda: b.ready())
- def test_total_failure(self):
- # Verify we abort with sutiable error message if no clean stores.
- cluster = self.cluster(0, args=self.args()+["--cluster-size=2"])
- a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False)
- b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=True)
- a.kill()
- b.kill()
- a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
- b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
- self.assertRaises(Exception, lambda: a.ready())
- self.assertRaises(Exception, lambda: b.ready())
+ def assert_dirty_store(self, broker):
+ self.assertRaises(Exception, lambda: broker.ready())
msg = re.compile("critical.*no clean store")
- assert msg.search(readfile(a.log))
- assert msg.search(readfile(b.log))
+ assert msg.search(readfile(broker.log))
+
+ def test_solo_store_clean(self):
+ # A single node cluster should always leave a clean store.
+ cluster = self.cluster(0, self.args())
+ a = cluster.start("a", expect=EXPECT_EXIT_FAIL)
+ a.send_message("q", Message("x", durable=True))
+ a.kill()
+ a = cluster.start("a")
+ self.assertEqual(a.get_message("q").content, "x")
+
+ def test_last_store_clean(self):
+
+ # Verify that only the last node in a cluster to shut down has
+ # a clean store. Start with cluster of 3, reduce to 1 then
+ # increase again to ensure that a node that was once alone but
+ # finally did not finish as the last node does not get a clean
+ # store.
+ cluster = self.cluster(0, self.args())
+ a = cluster.start("a", expect=EXPECT_EXIT_FAIL)
+ b = cluster.start("b", expect=EXPECT_EXIT_FAIL)
+ c = cluster.start("c", expect=EXPECT_EXIT_FAIL)
+ a.send_message("q", Message("x", durable=True))
+ a.kill()
+ b.kill() # c is last man
+ time.sleep(0.1) # pause for c to find out hes last.
+ a = cluster.start("a", expect=EXPECT_EXIT_FAIL) # c no longer last man
+ c.kill() # a is now last man
+ time.sleep(0.1) # pause for a to find out hes last.
+ a.kill() # really last
+ # b & c should be dirty
+ b = cluster.start("b", wait=False, expect=EXPECT_EXIT_OK)
+ self.assert_dirty_store(b)
+ c = cluster.start("c", wait=False, expect=EXPECT_EXIT_OK)
+ self.assert_dirty_store(c)
+ # a should be clean
+ a = cluster.start("a")
+ self.assertEqual(a.get_message("q").content, "x")
- # FIXME aconway 2009-12-03: verify manual restore procedure