diff options
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 20 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/StoreStatus.cpp | 124 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/StoreStatus.h | 12 | ||||
-rw-r--r-- | cpp/src/qpid/framing/Uuid.cpp | 6 | ||||
-rw-r--r-- | cpp/src/tests/StoreStatus.cpp | 10 | ||||
-rw-r--r-- | cpp/src/tests/Uuid.cpp | 24 | ||||
-rwxr-xr-x | cpp/src/tests/cluster_tests.py | 39 | ||||
-rw-r--r-- | cpp/src/tests/testagent.mk | 2 | ||||
-rw-r--r-- | python/qpid/brokertest.py | 7 |
9 files changed, 134 insertions, 110 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 0520503ad4..b1afeb6c7c 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -297,8 +297,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : // Load my store status before we go into initialization if (! broker::NullMessageStore::isNullStore(&broker.getStore())) { store.load(); - if (store.getClusterId()) - clusterId = store.getClusterId(); // Use stored ID if there is one. + clusterId = store.getClusterId(); QPID_LOG(notice, "Cluster store state: " << store) } cpg.join(name); @@ -626,7 +625,6 @@ void Cluster::initMapCompleted(Lock& l) { QPID_LOG(info, *this << " not active for links."); } setClusterId(initMap.getClusterId(), l); - if (store.hasStore()) store.dirty(clusterId); if (initMap.isUpdateNeeded()) { // Joining established cluster. broker.setRecovery(false); // Ditch my current store. @@ -919,7 +917,7 @@ void Cluster::updateOutError(const std::exception& e) { void Cluster ::shutdown(const MemberId& , const Uuid& id, Lock& l) { QPID_LOG(notice, *this << " cluster shut down by administrator."); - if (store.hasStore()) store.clean(Uuid(id)); + if (store.hasStore()) store.clean(id); leave(l); } @@ -967,9 +965,16 @@ void Cluster::memberUpdate(Lock& l) { if (store.hasStore()) { // Mark store clean if I am the only broker, dirty otherwise. if (size == 1 ) { - if (!store.isClean()) store.clean(Uuid(true)); - } else { - if (!store.isDirty()) store.dirty(clusterId); + if (store.getState() != STORE_STATE_CLEAN_STORE) { + QPID_LOG(notice, "Sole member of cluster, marking store clean."); + store.clean(Uuid(true)); + } + } + else { + if (store.getState() != STORE_STATE_DIRTY_STORE) { + QPID_LOG(notice, "No longer sole cluster member, marking store dirty."); + store.dirty(); + } } } @@ -1034,6 +1039,7 @@ broker::Broker& Cluster::getBroker() const { void Cluster::setClusterId(const Uuid& uuid, Lock&) { clusterId = uuid; + if (store.hasStore()) store.setClusterId(uuid); if (mgmtObject) { stringstream stream; stream << self; diff --git a/cpp/src/qpid/cluster/StoreStatus.cpp b/cpp/src/qpid/cluster/StoreStatus.cpp index ea5501aba2..14c999bb05 100644 --- a/cpp/src/qpid/cluster/StoreStatus.cpp +++ b/cpp/src/qpid/cluster/StoreStatus.cpp @@ -25,7 +25,9 @@ #include <boost/filesystem/path.hpp> #include <boost/filesystem/fstream.hpp> #include <boost/filesystem/operations.hpp> +#include <boost/scoped_array.hpp> #include <fstream> +#include <sstream> namespace qpid { namespace cluster { @@ -42,53 +44,29 @@ StoreStatus::StoreStatus(const std::string& d) namespace { const char* SUBDIR="cluster"; -const char* CLUSTER_ID_FILE="cluster.uuid"; -const char* SHUTDOWN_ID_FILE="shutdown.uuid"; - -void throw_exceptions(ios& ios) { - // Have stream throw an exception on error. - ios.exceptions(std::ios::badbit | std::ios::failbit); -} - -Uuid loadUuid(const fs::path& path) { - Uuid ret; - if (exists(path)) { - fs::ifstream i(path); - try { - throw_exceptions(i); - i >> ret; - } catch (const std::exception& e) { - QPID_LOG(error, "Cant load UUID from " << path.string() << ": " << e.what()); - throw; - } - } - return ret; +const char* STORE_STATUS="store.status"; + +string readFile(const fs::path& path) { + fs::ifstream is; + is.exceptions(std::ios::badbit | std::ios::failbit); + is.open(path); + // get length of file: + is.seekg (0, ios::end); + size_t length = is.tellg(); + is.seekg (0, ios::beg); + // load data + boost::scoped_array<char> buffer(new char[length]); + is.read(buffer.get(), length); + is.close(); + return string(buffer.get(), length); } -void saveUuid(const fs::path& path, const Uuid& uuid) { - fs::ofstream o(path); - try { - throw_exceptions(o); - o << uuid; - } catch (const std::exception& e) { - QPID_LOG(error, "Cant save UUID to " << path.string() << ": " << e.what()); - throw; - } -} - -framing::SequenceNumber loadSeqNum(const fs::path& path) { - uint32_t n = 0; - if (exists(path)) { - fs::ifstream i(path); - try { - throw_exceptions(i); - i >> n; - } catch (const std::exception& e) { - QPID_LOG(error, "Cant load sequence number from " << path.string() << ": " << e.what()); - throw; - } - } - return framing::SequenceNumber(n); +void writeFile(const fs::path& path, const string& data) { + fs::ofstream os; + os.exceptions(std::ios::badbit | std::ios::failbit); + os.open(path); + os.write(data.data(), data.size()); + os.close(); } } // namespace @@ -98,14 +76,25 @@ void StoreStatus::load() { if (dataDir.empty()) { throw Exception(QPID_MSG("No data-dir: When a store is loaded together with clustering, --data-dir must be specified.")); } - fs::path dir = fs::path(dataDir, fs::native)/SUBDIR; try { + fs::path dir = fs::path(dataDir, fs::native)/SUBDIR; create_directory(dir); - clusterId = loadUuid(dir/CLUSTER_ID_FILE); - shutdownId = loadUuid(dir/SHUTDOWN_ID_FILE); - if (clusterId && shutdownId) state = STORE_STATE_CLEAN_STORE; - else if (clusterId) state = STORE_STATE_DIRTY_STORE; - else state = STORE_STATE_EMPTY_STORE; + fs::path file = dir/STORE_STATUS; + if (fs::exists(file)) { + string data = readFile(file); + istringstream is(data); + is.exceptions(std::ios::badbit | std::ios::failbit); + is >> ws >> clusterId >> ws >> shutdownId; + if (!clusterId) + throw Exception(QPID_MSG("Invalid cluster store state, no cluster-id")); + if (shutdownId) state = STORE_STATE_CLEAN_STORE; + else state = STORE_STATE_DIRTY_STORE; + } + else { // Starting from empty store + clusterId = Uuid(true); + save(); + state = STORE_STATE_EMPTY_STORE; + } } catch (const std::exception&e) { throw Exception(QPID_MSG("Cannot load cluster store status: " << e.what())); @@ -114,13 +103,13 @@ void StoreStatus::load() { void StoreStatus::save() { if (dataDir.empty()) return; - fs::path dir = fs::path(dataDir, fs::native)/SUBDIR; try { - create_directory(dir); - saveUuid(dir/CLUSTER_ID_FILE, clusterId); - saveUuid(dir/SHUTDOWN_ID_FILE, shutdownId); + ostringstream os; + os << clusterId << endl << shutdownId << endl; + fs::path file = fs::path(dataDir, fs::native)/SUBDIR/STORE_STATUS; + writeFile(file, os.str()); } - catch (const std::exception&e) { + catch (const std::exception& e) { throw Exception(QPID_MSG("Cannot save cluster store status: " << e.what())); } } @@ -129,20 +118,27 @@ bool StoreStatus::hasStore() const { return state != framing::cluster::STORE_STATE_NO_STORE; } -void StoreStatus::dirty(const Uuid& clusterId_) { - if (!hasStore()) return; - assert(clusterId_); - clusterId = clusterId_; - shutdownId = Uuid(); +void StoreStatus::dirty() { + assert(hasStore()); + if (shutdownId) { + shutdownId = Uuid(); + save(); + } state = STORE_STATE_DIRTY_STORE; - save(); } void StoreStatus::clean(const Uuid& shutdownId_) { - if (!hasStore()) return; + assert(hasStore()); assert(shutdownId_); + if (shutdownId_ != shutdownId) { + shutdownId = shutdownId_; + save(); + } state = STORE_STATE_CLEAN_STORE; - shutdownId = shutdownId_; +} + +void StoreStatus::setClusterId(const Uuid& clusterId_) { + clusterId = clusterId_; save(); } diff --git a/cpp/src/qpid/cluster/StoreStatus.h b/cpp/src/qpid/cluster/StoreStatus.h index d9e4549175..7442fcf02c 100644 --- a/cpp/src/qpid/cluster/StoreStatus.h +++ b/cpp/src/qpid/cluster/StoreStatus.h @@ -42,21 +42,19 @@ class StoreStatus StoreStatus(const std::string& dir); framing::cluster::StoreState getState() const { return state; } - bool isClean() { return state == framing::cluster::STORE_STATE_CLEAN_STORE; } - bool isDirty() { return state == framing::cluster::STORE_STATE_DIRTY_STORE; } const Uuid& getClusterId() const { return clusterId; } + void setClusterId(const Uuid&); const Uuid& getShutdownId() const { return shutdownId; } - void dirty(const Uuid& clusterId); // Mark the store in use by clusterId. - void clean(const Uuid& shutdownId); // Mark the store clean at shutdownId - void load(); - void save(); - + void dirty(); // Mark the store in use. + void clean(const Uuid& shutdownId); // Mark the store clean. bool hasStore() const; private: + void save(); + framing::cluster::StoreState state; Uuid clusterId, shutdownId; std::string dataDir; diff --git a/cpp/src/qpid/framing/Uuid.cpp b/cpp/src/qpid/framing/Uuid.cpp index 67ca96d53f..945c0a4d24 100644 --- a/cpp/src/qpid/framing/Uuid.cpp +++ b/cpp/src/qpid/framing/Uuid.cpp @@ -81,8 +81,10 @@ ostream& operator<<(ostream& out, Uuid uuid) { istream& operator>>(istream& in, Uuid& uuid) { char unparsed[UNPARSED_SIZE + 1] = {0}; in.get(unparsed, sizeof(unparsed)); - if (uuid_parse(unparsed, uuid.c_array()) != 0) - in.setstate(ios::failbit); + if (!in.fail()) { + if (uuid_parse(unparsed, uuid.c_array()) != 0) + in.setstate(ios::failbit); + } return in; } diff --git a/cpp/src/tests/StoreStatus.cpp b/cpp/src/tests/StoreStatus.cpp index 87b3531236..43d4cfd920 100644 --- a/cpp/src/tests/StoreStatus.cpp +++ b/cpp/src/tests/StoreStatus.cpp @@ -65,7 +65,8 @@ QPID_AUTO_TEST_CASE(testSaveLoadDirty) { Uuid clusterId = Uuid(true); StoreStatus ss(TEST_DIR); ss.load(); - ss.dirty(clusterId); + ss.setClusterId(clusterId); + ss.dirty(); BOOST_CHECK_EQUAL(ss.getState(), STORE_STATE_DIRTY_STORE); StoreStatus ss2(TEST_DIR); @@ -81,7 +82,7 @@ QPID_AUTO_TEST_CASE(testSaveLoadClean) { Uuid shutdownId = Uuid(true); StoreStatus ss(TEST_DIR); ss.load(); - ss.dirty(clusterId); + ss.setClusterId(clusterId); ss.clean(shutdownId); BOOST_CHECK_EQUAL(ss.getState(), STORE_STATE_CLEAN_STORE); @@ -99,9 +100,10 @@ QPID_AUTO_TEST_CASE(testMarkDirty) { Uuid shutdownId = Uuid(true); StoreStatus ss(TEST_DIR); ss.load(); - ss.dirty(clusterId); + ss.setClusterId(clusterId); + ss.dirty(); ss.clean(shutdownId); - ss.dirty(clusterId); + ss.dirty(); StoreStatus ss2(TEST_DIR); ss2.load(); diff --git a/cpp/src/tests/Uuid.cpp b/cpp/src/tests/Uuid.cpp index 4c6505b139..0195455ca3 100644 --- a/cpp/src/tests/Uuid.cpp +++ b/cpp/src/tests/Uuid.cpp @@ -50,6 +50,7 @@ QPID_AUTO_TEST_CASE(testUuidCtor) { boost::array<uint8_t, 16> sample = {{'\x1b', '\x4e', '\x28', '\xba', '\x2f', '\xa1', '\x11', '\xd2', '\x88', '\x3f', '\xb9', '\xa7', '\x61', '\xbd', '\xe3', '\xfb'}}; const string sampleStr("1b4e28ba-2fa1-11d2-883f-b9a761bde3fb"); +const string zeroStr("00000000-0000-0000-0000-000000000000"); QPID_AUTO_TEST_CASE(testUuidIstream) { Uuid uuid; @@ -57,6 +58,12 @@ QPID_AUTO_TEST_CASE(testUuidIstream) { in >> uuid; BOOST_CHECK(!in.fail()); BOOST_CHECK(uuid == sample); + + istringstream is(zeroStr); + Uuid zero; + is >> zero; + BOOST_CHECK(!in.fail()); + BOOST_CHECK_EQUAL(zero, Uuid()); } QPID_AUTO_TEST_CASE(testUuidOstream) { @@ -65,6 +72,23 @@ QPID_AUTO_TEST_CASE(testUuidOstream) { out << uuid; BOOST_CHECK(out.good()); BOOST_CHECK_EQUAL(out.str(), sampleStr); + + ostringstream os; + os << Uuid(); + BOOST_CHECK(out.good()); + BOOST_CHECK_EQUAL(os.str(), zeroStr); +} + +QPID_AUTO_TEST_CASE(testUuidIOstream) { + Uuid a(true), b(true); + ostringstream os; + os << a << endl << b; + Uuid aa, bb; + istringstream is(os.str()); + is >> aa >> ws >> bb; + BOOST_CHECK(os.good()); + BOOST_CHECK_EQUAL(a, aa); + BOOST_CHECK_EQUAL(b, bb); } QPID_AUTO_TEST_CASE(testUuidEncodeDecode) { diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index 6a748e6f2a..893fb9928d 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -310,7 +310,6 @@ class StoreTests(BrokerTest): c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=True) a.send_message("q", Message("4", durable=True)) a.kill() - time.sleep(0.1) # pause for b to write status. b.kill() self.assertEqual(c.get_message("q").content, "4") c.send_message("q", Message("clean", durable=True)) @@ -356,11 +355,6 @@ class StoreTests(BrokerTest): self.assertRaises(Exception, lambda: a.ready()) self.assertRaises(Exception, lambda: b.ready()) - def assert_dirty_store(self, broker): - assert retry(lambda: os.path.exists(broker.log)), "Missing log file %s"%broker.log - msg = re.compile("critical.*no clean store") - assert retry(lambda: msg.search(readfile(broker.log))), "Expected dirty store message in %s"%broker.log - def test_solo_store_clean(self): # A single node cluster should always leave a clean store. cluster = self.cluster(0, self.args()) @@ -378,29 +372,25 @@ class StoreTests(BrokerTest): # store. cluster = self.cluster(0, self.args()) a = cluster.start("a", expect=EXPECT_EXIT_FAIL) + self.assertEqual(a.store_state(), "clean") b = cluster.start("b", expect=EXPECT_EXIT_FAIL) c = cluster.start("c", expect=EXPECT_EXIT_FAIL) + self.assertEqual(b.store_state(), "dirty") + self.assertEqual(c.store_state(), "dirty") + retry(lambda: a.store_state() == "dirty") + a.send_message("q", Message("x", durable=True)) a.kill() - # FIXME aconway 2010-03-29: this test has too many sleeps. - # Need to tighten up status persistence to be more atomic and less - # prone to interruption. - time.sleep(0.1) # pause for b to update status. - b.kill() # c is last man - time.sleep(0.1) # pause for c to find out hes last. + b.kill() # c is last man, will mark store clean + retry(lambda: c.store_state() == "clean") a = cluster.start("a", expect=EXPECT_EXIT_FAIL) # c no longer last man - time.sleep(0.1) # pause for c to find out hes no longer last. - c.kill() # a is now last man - time.sleep(0.1) # pause for a to find out hes last. - a.kill() # really last, should be clean. - # b & c should be dirty - b = cluster.start("b", wait=False, expect=EXPECT_EXIT_FAIL) - self.assert_dirty_store(b) - c = cluster.start("c", wait=False, expect=EXPECT_EXIT_FAIL) - self.assert_dirty_store(c) - # a should be clean - a = cluster.start("a") - self.assertEqual(a.get_message("q").content, "x") + retry(lambda: c.store_state() == "dirty") + c.kill() # a is now last man + retry(lambda: a.store_state() == "clean") + a.kill() + self.assertEqual(a.store_state(), "clean") + self.assertEqual(b.store_state(), "dirty") + self.assertEqual(c.store_state(), "dirty") def test_restart_clean(self): """Verify that we can re-start brokers one by one in a @@ -426,7 +416,6 @@ class StoreTests(BrokerTest): a.send_message("q", Message("x", durable=True)) a.send_message("q", Message("y", durable=True)) a.kill() - time.sleep(0.1) # pause for b to write status. b.kill() a = cluster.start("a") self.assertEqual(c.get_message("q").content, "x") diff --git a/cpp/src/tests/testagent.mk b/cpp/src/tests/testagent.mk index f5a84b07ff..cca380214f 100644 --- a/cpp/src/tests/testagent.mk +++ b/cpp/src/tests/testagent.mk @@ -36,7 +36,7 @@ TESTAGENT_GEN_SRC= \ $(TESTAGENT_GEN_SRC): testagent_gen.timestamp -testagent_gen.timestamp: testagent.xml +testagent_gen.timestamp: testagent.xml $(mgen_generator) $(QMF_GEN) -o testagent_gen/qmf $(srcdir)/testagent.xml touch testagent_gen.timestamp diff --git a/python/qpid/brokertest.py b/python/qpid/brokertest.py index 2f064f59b6..3608a959b1 100644 --- a/python/qpid/brokertest.py +++ b/python/qpid/brokertest.py @@ -373,6 +373,13 @@ class Broker(Popen): try: self.connect().close() except: raise RethrownException("Broker %s failed ready test"%self.name) + def store_state(self): + uuids = open(os.path.join(self.datadir, "cluster", "store.status")).readlines() + null_uuid="00000000-0000-0000-0000-000000000000\n" + if uuids[0] == null_uuid: return "empty" + if uuids[1] == null_uuid: return "dirty" + return "clean" + class Cluster: """A cluster of brokers in a test.""" |