diff options
author | Ted Ross <tross@apache.org> | 2010-02-27 00:38:13 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2010-02-27 00:38:13 +0000 |
commit | acf3a1931ec404d1b02a2e115ef18e531d3924e4 (patch) | |
tree | 2a0b998795a676bae4ddc53cdacc82197885f771 /qpid/cpp/src | |
parent | 3296ad1ca8f77bf82fe9fd059c5e44580a4f2f4b (diff) | |
download | qpid-python-acf3a1931ec404d1b02a2e115ef18e531d3924e4.tar.gz |
Rebased the wmf branch to the trunk.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7@916887 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
30 files changed, 196 insertions, 117 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index 62bab239be..a7078bea47 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -855,7 +855,6 @@ set (qmfengine_SOURCES qmf/engine/Protocol.h qmf/engine/QueryImpl.cpp qmf/engine/QueryImpl.h - qmf/engine/ResilientConnection.cpp qmf/engine/SequenceManager.cpp qmf/engine/SequenceManager.h qmf/engine/SchemaImpl.cpp @@ -863,6 +862,10 @@ set (qmfengine_SOURCES qmf/engine/ValueImpl.cpp qmf/engine/ValueImpl.h ) +if (NOT WIN32) + list(APPEND qmfengine_SOURCES qmf/engine/ResilientConnection.cpp) +endif (NOT WIN32) + add_library (qmfengine SHARED ${qmfengine_SOURCES}) target_link_libraries (qmfengine qpidclient) set_target_properties (qmfengine PROPERTIES diff --git a/qpid/cpp/src/qmf/engine/EventImpl.cpp b/qpid/cpp/src/qmf/engine/EventImpl.cpp index 79d5a491fc..27501cc396 100644 --- a/qpid/cpp/src/qmf/engine/EventImpl.cpp +++ b/qpid/cpp/src/qmf/engine/EventImpl.cpp @@ -20,6 +20,8 @@ #include <qmf/engine/EventImpl.h> #include <qmf/engine/ValueImpl.h> +#include <sstream> + using namespace std; using namespace qmf::engine; using qpid::framing::Buffer; diff --git a/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp b/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp index 670ee385a3..9216f7bac0 100644 --- a/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp +++ b/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp @@ -19,6 +19,7 @@ #include "qmf/engine/ObjectIdImpl.h" #include <stdlib.h> +#include <sstream> using namespace std; using namespace qmf::engine; diff --git a/qpid/cpp/src/qmf/engine/SchemaImpl.cpp b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp index a4f56464a7..e276df8cec 100644 --- a/qpid/cpp/src/qmf/engine/SchemaImpl.cpp +++ b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp @@ -23,6 +23,7 @@ #include <string> #include <vector> #include <assert.h> +#include <sstream> using namespace std; using namespace qmf::engine; diff --git a/qpid/cpp/src/qmf/engine/SchemaImpl.h b/qpid/cpp/src/qmf/engine/SchemaImpl.h index d78c921c67..71a10559cf 100644 --- a/qpid/cpp/src/qmf/engine/SchemaImpl.h +++ b/qpid/cpp/src/qmf/engine/SchemaImpl.h @@ -25,6 +25,7 @@ #include <string> #include <vector> #include <exception> +#include <memory> namespace qmf { namespace engine { diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index e718819f48..f49fbb03a5 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -285,6 +285,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : } Cluster::~Cluster() { + broker.setClusterTimer(std::auto_ptr<sys::Timer>(0)); // Delete cluster timer if (updateThread.id()) updateThread.join(); // Join the previous updatethread. } @@ -914,6 +915,12 @@ void Cluster::memberUpdate(Lock& l) { size_t size = urls.size(); failoverExchange->updateUrls(urls); + if (store.hasStore()) { + // Mark store clean if I am the only broker, dirty otherwise. + if (size == 1) store.clean(Uuid(true)); + else store.dirty(clusterId); + } + if (size == 1 && lastSize > 1 && state >= CATCHUP) { QPID_LOG(notice, *this << " last broker standing, update queue policies"); lastBroker = true; @@ -996,7 +1003,6 @@ void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNu } void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) { - QPID_LOG(debug, "Cluster timer wakeup " << map.getFrameSeq() << ": " << name) timer->deliverWakeup(name); } diff --git a/qpid/cpp/src/qpid/cluster/ClusterTimer.h b/qpid/cpp/src/qpid/cluster/ClusterTimer.h index 395e505451..69f6c622e4 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterTimer.h +++ b/qpid/cpp/src/qpid/cluster/ClusterTimer.h @@ -30,6 +30,12 @@ namespace cluster { class Cluster; +/** + * Timer implementation that executes tasks consistently in the + * deliver thread across a cluster. Task is not executed when timer + * fires, instead the elder multicasts a wakeup. The task is executed + * when the wakeup is delivered. + */ class ClusterTimer : public sys::Timer { public: ClusterTimer(Cluster&); diff --git a/qpid/cpp/src/qpid/cluster/Cpg.cpp b/qpid/cpp/src/qpid/cluster/Cpg.cpp index 3ae0c970c7..0856bcd824 100644 --- a/qpid/cpp/src/qpid/cluster/Cpg.cpp +++ b/qpid/cpp/src/qpid/cluster/Cpg.cpp @@ -54,7 +54,6 @@ void Cpg::callCpg ( CpgOp & c ) { unsigned int snooze = 10; for ( unsigned int nth_try = 0; nth_try < cpgRetries; ++ nth_try ) { if ( CPG_OK == (result = c.op(handle, & group))) { - QPID_LOG(info, c.opName << " successful."); break; } else if ( result == CPG_ERR_TRY_AGAIN ) { diff --git a/qpid/cpp/src/qpid/cluster/StoreStatus.cpp b/qpid/cpp/src/qpid/cluster/StoreStatus.cpp index cf75cd3b5f..648fcfbbd5 100644 --- a/qpid/cpp/src/qpid/cluster/StoreStatus.cpp +++ b/qpid/cpp/src/qpid/cluster/StoreStatus.cpp @@ -20,6 +20,7 @@ */ #include "StoreStatus.h" #include "qpid/Exception.h" +#include "qpid/Msg.h" #include <boost/filesystem/path.hpp> #include <boost/filesystem/fstream.hpp> #include <boost/filesystem/operations.hpp> @@ -113,7 +114,12 @@ void StoreStatus::save() { } } +bool StoreStatus::hasStore() const { + return state != framing::cluster::STORE_STATE_NO_STORE; +} + void StoreStatus::dirty(const Uuid& clusterId_) { + if (!hasStore()) return; assert(clusterId_); clusterId = clusterId_; shutdownId = Uuid(); @@ -122,6 +128,7 @@ void StoreStatus::dirty(const Uuid& clusterId_) { } void StoreStatus::clean(const Uuid& shutdownId_) { + if (!hasStore()) return; assert(shutdownId_); state = STORE_STATE_CLEAN_STORE; shutdownId = shutdownId_; diff --git a/qpid/cpp/src/qpid/cluster/StoreStatus.h b/qpid/cpp/src/qpid/cluster/StoreStatus.h index 911b3a2ba2..2371f0424e 100644 --- a/qpid/cpp/src/qpid/cluster/StoreStatus.h +++ b/qpid/cpp/src/qpid/cluster/StoreStatus.h @@ -46,14 +46,14 @@ class StoreStatus const Uuid& getShutdownId() const { return shutdownId; } framing::SequenceNumber getConfigSeq() const { return configSeq; } - void dirty(const Uuid& start); // Start using the store. - void clean(const Uuid& stop); // Stop using the store. + void dirty(const Uuid& clusterId); // Mark the store in use by clusterId. + void clean(const Uuid& shutdownId); // Mark the store clean at shutdownId void setConfigSeq(framing::SequenceNumber seq); // Update the config seq number. void load(); void save(); - bool hasStore() { return state != framing::cluster::STORE_STATE_NO_STORE; } + bool hasStore() const; private: framing::cluster::StoreState state; diff --git a/qpid/cpp/src/qpid/console/Value.cpp b/qpid/cpp/src/qpid/console/Value.cpp index c30660f1dc..47c6a4ce57 100644 --- a/qpid/cpp/src/qpid/console/Value.cpp +++ b/qpid/cpp/src/qpid/console/Value.cpp @@ -22,6 +22,8 @@ #include "qpid/console/Value.h" #include "qpid/framing/Buffer.h" +#include <sstream> + using namespace qpid; using namespace qpid::console; using namespace std; diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.cpp b/qpid/cpp/src/qpid/framing/AMQFrame.cpp index 5c5920d786..d863970ece 100644 --- a/qpid/cpp/src/qpid/framing/AMQFrame.cpp +++ b/qpid/cpp/src/qpid/framing/AMQFrame.cpp @@ -24,6 +24,8 @@ #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/BodyFactory.h" #include "qpid/framing/MethodBodyFactory.h" +#include "qpid/Msg.h" + #include <boost/format.hpp> #include <iostream> diff --git a/qpid/cpp/src/qpid/framing/Array.cpp b/qpid/cpp/src/qpid/framing/Array.cpp index d95e0d167d..454e8e298f 100644 --- a/qpid/cpp/src/qpid/framing/Array.cpp +++ b/qpid/cpp/src/qpid/framing/Array.cpp @@ -23,6 +23,7 @@ #include "qpid/framing/FieldValue.h" #include "qpid/Exception.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/Msg.h" #include <assert.h> namespace qpid { diff --git a/qpid/cpp/src/qpid/framing/BodyHandler.cpp b/qpid/cpp/src/qpid/framing/BodyHandler.cpp index e2128596ed..db302b1e4c 100644 --- a/qpid/cpp/src/qpid/framing/BodyHandler.cpp +++ b/qpid/cpp/src/qpid/framing/BodyHandler.cpp @@ -25,6 +25,7 @@ #include "qpid/framing/AMQHeartbeatBody.h" #include <boost/cast.hpp> #include "qpid/framing/reply_exceptions.h" +#include "qpid/Msg.h" using namespace qpid::framing; using namespace boost; diff --git a/qpid/cpp/src/qpid/framing/FieldTable.cpp b/qpid/cpp/src/qpid/framing/FieldTable.cpp index e2e91e450a..023e4af819 100644 --- a/qpid/cpp/src/qpid/framing/FieldTable.cpp +++ b/qpid/cpp/src/qpid/framing/FieldTable.cpp @@ -25,6 +25,7 @@ #include "qpid/framing/FieldValue.h" #include "qpid/Exception.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/Msg.h" #include <assert.h> namespace qpid { diff --git a/qpid/cpp/src/qpid/framing/FieldValue.cpp b/qpid/cpp/src/qpid/framing/FieldValue.cpp index 0b49748de8..fd911645f4 100644 --- a/qpid/cpp/src/qpid/framing/FieldValue.cpp +++ b/qpid/cpp/src/qpid/framing/FieldValue.cpp @@ -24,6 +24,7 @@ #include "qpid/framing/Endian.h" #include "qpid/framing/List.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/Msg.h" namespace qpid { namespace framing { diff --git a/qpid/cpp/src/qpid/framing/List.cpp b/qpid/cpp/src/qpid/framing/List.cpp index bde7dabbac..963ebc206b 100644 --- a/qpid/cpp/src/qpid/framing/List.cpp +++ b/qpid/cpp/src/qpid/framing/List.cpp @@ -23,6 +23,7 @@ #include "qpid/framing/FieldValue.h" #include "qpid/Exception.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/Msg.h" namespace qpid { namespace framing { diff --git a/qpid/cpp/src/qpid/framing/SequenceSet.cpp b/qpid/cpp/src/qpid/framing/SequenceSet.cpp index dcfb4689b6..72fcd8a9e2 100644 --- a/qpid/cpp/src/qpid/framing/SequenceSet.cpp +++ b/qpid/cpp/src/qpid/framing/SequenceSet.cpp @@ -22,6 +22,7 @@ #include "qpid/framing/SequenceSet.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/Msg.h" using namespace qpid::framing; using std::max; diff --git a/qpid/cpp/src/qpid/framing/Uuid.cpp b/qpid/cpp/src/qpid/framing/Uuid.cpp index 432c7ab94e..67ca96d53f 100644 --- a/qpid/cpp/src/qpid/framing/Uuid.cpp +++ b/qpid/cpp/src/qpid/framing/Uuid.cpp @@ -22,6 +22,7 @@ #include "qpid/Exception.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/Msg.h" namespace qpid { namespace framing { diff --git a/qpid/cpp/src/qpid/messaging/Variant.cpp b/qpid/cpp/src/qpid/messaging/Variant.cpp index 116018f797..ba93f160ec 100644 --- a/qpid/cpp/src/qpid/messaging/Variant.cpp +++ b/qpid/cpp/src/qpid/messaging/Variant.cpp @@ -19,6 +19,7 @@ * */ #include "qpid/messaging/Variant.h" +#include "qpid/Msg.h" #include <boost/format.hpp> #include <boost/lexical_cast.hpp> #include <algorithm> diff --git a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp index 4f0a4fa5af..fc95f46fb9 100644 --- a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp +++ b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp @@ -34,6 +34,7 @@ void AggregateOutput::activateOutput() { control.activateOutput(); } void AggregateOutput::giveReadCredit(int32_t credit) { control.giveReadCredit(credit); } +namespace { // Clear the busy flag and notify waiting threads in destructor. struct ScopedBusy { bool& flag; @@ -41,7 +42,8 @@ struct ScopedBusy { ScopedBusy(bool& f, Monitor& m) : flag(f), monitor(m) { f = true; } ~ScopedBusy() { flag = false; monitor.notifyAll(); } }; - +} + bool AggregateOutput::doOutput() { Mutex::ScopedLock l(lock); ScopedBusy sb(busy, lock); diff --git a/qpid/cpp/src/qpid/sys/posix/Shlib.cpp b/qpid/cpp/src/qpid/sys/posix/Shlib.cpp index 299331103c..3fb685d5b8 100644 --- a/qpid/cpp/src/qpid/sys/posix/Shlib.cpp +++ b/qpid/cpp/src/qpid/sys/posix/Shlib.cpp @@ -20,6 +20,7 @@ #include "qpid/sys/Shlib.h" #include "qpid/Exception.h" +#include "qpid/Msg.h" #include <dlfcn.h> diff --git a/qpid/cpp/src/qpid/sys/ssl/check.h b/qpid/cpp/src/qpid/sys/ssl/check.h index 984c338a18..94db120afa 100644 --- a/qpid/cpp/src/qpid/sys/ssl/check.h +++ b/qpid/cpp/src/qpid/sys/ssl/check.h @@ -21,6 +21,8 @@ * under the License. * */ +#include "qpid/Msg.h" + #include <iostream> #include <string> #include <nspr.h> diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index 4d65803ac1..4a931ab680 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -298,7 +298,6 @@ TESTS_ENVIRONMENT = \ VALGRIND=$(VALGRIND) \ LIBTOOL="$(LIBTOOL)" \ QPID_DATA_DIR= \ - BOOST_TEST_SHOW_PROGRESS=yes \ $(srcdir)/run_test system_tests = client_test quick_perftest quick_topictest run_header_test quick_txtest diff --git a/qpid/cpp/src/tests/TxMocks.h b/qpid/cpp/src/tests/TxMocks.h index a34d864bae..72cb50cd21 100644 --- a/qpid/cpp/src/tests/TxMocks.h +++ b/qpid/cpp/src/tests/TxMocks.h @@ -23,6 +23,7 @@ #include "qpid/Exception.h" +#include "qpid/Msg.h" #include "qpid/broker/TransactionalStore.h" #include "qpid/broker/TxOp.h" #include <iostream> diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index b3274b1b1e..22b7c8f5b8 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -305,21 +305,47 @@ class StoreTests(BrokerTest): self.assertRaises(Exception, lambda: a.ready()) self.assertRaises(Exception, lambda: b.ready()) - def test_total_failure(self): - # Verify we abort with sutiable error message if no clean stores. - cluster = self.cluster(0, args=self.args()+["--cluster-size=2"]) - a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False) - b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=True) - a.kill() - b.kill() - a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) - b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False) - self.assertRaises(Exception, lambda: a.ready()) - self.assertRaises(Exception, lambda: b.ready()) + def assert_dirty_store(self, broker): + self.assertRaises(Exception, lambda: broker.ready()) msg = re.compile("critical.*no clean store") - assert msg.search(readfile(a.log)) - assert msg.search(readfile(b.log)) + assert msg.search(readfile(broker.log)) + + def test_solo_store_clean(self): + # A single node cluster should always leave a clean store. + cluster = self.cluster(0, self.args()) + a = cluster.start("a", expect=EXPECT_EXIT_FAIL) + a.send_message("q", Message("x", durable=True)) + a.kill() + a = cluster.start("a") + self.assertEqual(a.get_message("q").content, "x") + + def test_last_store_clean(self): + + # Verify that only the last node in a cluster to shut down has + # a clean store. Start with cluster of 3, reduce to 1 then + # increase again to ensure that a node that was once alone but + # finally did not finish as the last node does not get a clean + # store. + cluster = self.cluster(0, self.args()) + a = cluster.start("a", expect=EXPECT_EXIT_FAIL) + b = cluster.start("b", expect=EXPECT_EXIT_FAIL) + c = cluster.start("c", expect=EXPECT_EXIT_FAIL) + a.send_message("q", Message("x", durable=True)) + a.kill() + b.kill() # c is last man + time.sleep(0.1) # pause for c to find out hes last. + a = cluster.start("a", expect=EXPECT_EXIT_FAIL) # c no longer last man + c.kill() # a is now last man + time.sleep(0.1) # pause for a to find out hes last. + a.kill() # really last + # b & c should be dirty + b = cluster.start("b", wait=False, expect=EXPECT_EXIT_OK) + self.assert_dirty_store(b) + c = cluster.start("c", wait=False, expect=EXPECT_EXIT_OK) + self.assert_dirty_store(c) + # a should be clean + a = cluster.start("a") + self.assertEqual(a.get_message("q").content, "x") - # FIXME aconway 2009-12-03: verify manual restore procedure diff --git a/qpid/cpp/src/tests/failover_soak.cpp b/qpid/cpp/src/tests/failover_soak.cpp index ed5c9cbee5..8bf6eca9e6 100644 --- a/qpid/cpp/src/tests/failover_soak.cpp +++ b/qpid/cpp/src/tests/failover_soak.cpp @@ -54,6 +54,8 @@ using namespace qpid::client; namespace qpid { namespace tests { +vector<pid_t> pids; + typedef vector<ForkedBroker *> brokerVector; typedef enum @@ -184,17 +186,29 @@ struct children : public vector<child *> int checkChildren ( ) { - vector<child *>::iterator i; - for ( i = begin(); i != end(); ++ i ) - if ( (COMPLETED == (*i)->status) && (0 != (*i)->retval) ) - { - cerr << "checkChildren: error on child of type " - << (*i)->type - << endl; - return (*i)->retval; - } + for ( unsigned int i = 0; i < pids.size(); ++ i ) + { + int pid = pids[i]; + int returned_pid; + int status; - return 0; + child * kid = get ( pid ); + + if ( kid->status != COMPLETED ) + { + returned_pid = waitpid ( pid, &status, WNOHANG ); + + if ( returned_pid == pid ) + { + int exit_status = WEXITSTATUS(status); + exited ( pid, exit_status ); + if ( exit_status ) // this is a child error. + return exit_status; + } + } + } + + return 0; } @@ -323,6 +337,7 @@ startNewBroker ( brokerVector & brokers, int verbosity, int durable ) { + // ("--log-enable=notice+") static int brokerId = 0; stringstream path, prefix; prefix << "soak-" << brokerId; @@ -516,6 +531,7 @@ startReceivingClient ( brokerVector brokers, argv.push_back ( 0 ); pid_t pid = fork(); + pids.push_back ( pid ); if ( ! pid ) { execv ( receiverPath, const_cast<char * const *>(&argv[0]) ); @@ -571,6 +587,7 @@ startSendingClient ( brokerVector brokers, argv.push_back ( 0 ); pid_t pid = fork(); + pids.push_back ( pid ); if ( ! pid ) { execv ( senderPath, const_cast<char * const *>(&argv[0]) ); @@ -602,6 +619,7 @@ using namespace qpid::tests; int main ( int argc, char const ** argv ) { + int brokerKills = 0; if ( argc != 11 ) { cerr << "Usage: " << argv[0] @@ -625,7 +643,6 @@ main ( int argc, char const ** argv ) int n_brokers = atoi(argv[i++]); char const * host = "127.0.0.1"; - int maxBrokers = 50; allMyChildren.verbosity = verbosity; @@ -722,104 +739,86 @@ main ( int argc, char const ** argv ) int minSleep = 2, - maxSleep = 4; + maxSleep = 6; + int totalBrokers = n_brokers; - for ( int totalBrokers = n_brokers; - totalBrokers < maxBrokers; - ++ totalBrokers - ) + int loop = 0; + + while ( 1 ) { + ++ loop; + + /* + if ( verbosity > 1 ) + std::cerr << "------- loop " << loop << " --------\n"; + if ( verbosity > 0 ) cout << totalBrokers << " brokers have been added to the cluster.\n\n\n"; + */ // Sleep for a while. ------------------------- int sleepyTime = mrand ( minSleep, maxSleep ); - if ( verbosity > 0 ) - cout << "Sleeping for " << sleepyTime << " seconds.\n"; sleep ( sleepyTime ); - // Kill the oldest broker. -------------------------- - if ( ! killFrontBroker ( brokers, verbosity ) ) + int bullet = mrand ( 100 ); + if ( bullet >= 95 ) + { + fprintf ( stderr, "Killing oldest broker...\n" ); + + // Kill the oldest broker. -------------------------- + if ( ! killFrontBroker ( brokers, verbosity ) ) + { + allMyChildren.killEverybody(); + killAllBrokers ( brokers, 5 ); + std::cerr << "END_OF_TEST ERROR_BROKER\n"; + return ERROR_KILLING_BROKER; + } + ++ brokerKills; + + // Start a new broker. -------------------------- + if ( verbosity > 0 ) + cout << "Starting new broker.\n\n"; + + startNewBroker ( brokers, + moduleOrDir, + clusterName, + verbosity, + durable ); + ++ totalBrokers; + printBrokers ( brokers ); + cerr << brokerKills << " brokers have been killed.\n\n\n"; + } + + int retval = allMyChildren.checkChildren(); + if ( retval ) { - allMyChildren.killEverybody(); - killAllBrokers ( brokers, 5 ); - std::cerr << "END_OF_TEST ERROR_BROKER\n"; - return ERROR_KILLING_BROKER; + std::cerr << "END_OF_TEST ERROR_CLIENT\n"; + allMyChildren.killEverybody(); + killAllBrokers ( brokers, 5 ); + return ERROR_ON_CHILD; } - // Sleep for a while. ------------------------- - sleepyTime = mrand ( minSleep, maxSleep ); - if ( verbosity > 1 ) - cerr << "Sleeping for " << sleepyTime << " seconds.\n"; - sleep ( sleepyTime ); - - // Start a new broker. -------------------------- - if ( verbosity > 0 ) - cout << "Starting new broker.\n\n"; - - startNewBroker ( brokers, - moduleOrDir, - clusterName, - verbosity, - durable ); - - if ( verbosity > 1 ) - printBrokers ( brokers ); - // If all children have exited, quit. int unfinished = allMyChildren.unfinished(); - if ( ! unfinished ) { + if ( unfinished == 0 ) { killAllBrokers ( brokers, 5 ); if ( verbosity > 1 ) cout << "failoverSoak: all children have exited.\n"; - int retval = allMyChildren.checkChildren(); - if ( verbosity > 1 ) - std::cerr << "failoverSoak: checkChildren: " << retval << endl; - - if ( retval ) - { - std::cerr << "END_OF_TEST ERROR_CLIENT\n"; - return ERROR_ON_CHILD; - } - else - { - std::cerr << "END_OF_TEST SUCCESSFUL\n"; - return HUNKY_DORY; - } - } - // Even if some are still running, if there's an error, quit. - if ( allMyChildren.checkChildren() ) - { - if ( verbosity > 0 ) - cout << "failoverSoak: error on child.\n"; - allMyChildren.killEverybody(); - killAllBrokers ( brokers, 5 ); - std::cerr << "END_OF_TEST ERROR_CLIENT\n"; - return ERROR_ON_CHILD; + std::cerr << "END_OF_TEST SUCCESSFUL\n"; + return HUNKY_DORY; } - if ( verbosity > 1 ) { - std::cerr << "------- next kill-broker loop --------\n"; - allMyChildren.print(); - } } - retval = allMyChildren.checkChildren(); - if ( verbosity > 1 ) - std::cerr << "failoverSoak: checkChildren: " << retval << endl; - - if ( verbosity > 1 ) - cout << "failoverSoak: maxBrokers reached.\n"; - allMyChildren.killEverybody(); killAllBrokers ( brokers, 5 ); std::cerr << "END_OF_TEST SUCCESSFUL\n"; - return retval ? ERROR_ON_CHILD : HUNKY_DORY; + return HUNKY_DORY; } diff --git a/qpid/cpp/src/tests/run_failover_soak b/qpid/cpp/src/tests/run_failover_soak index 69551a51c2..c276e9cc2f 100755 --- a/qpid/cpp/src/tests/run_failover_soak +++ b/qpid/cpp/src/tests/run_failover_soak @@ -26,12 +26,12 @@ host=127.0.0.1 unset QPID_NO_MODULE_DIR # failover_soak uses --module-dir, dont want clash MODULES=${MODULES:-$moduledir} -MESSAGES=${MESSAGES:-1000000} +MESSAGES=${MESSAGES:-500000} REPORT_FREQUENCY=${REPORT_FREQUENCY:-20000} VERBOSITY=${VERBOSITY:-10} DURABILITY=${DURABILITY:-0} N_QUEUES=${N_QUEUES:-1} -N_BROKERS=${N_BROKERS:-3} +N_BROKERS=${N_BROKERS:-4} rm -f soak-*.log exec ./failover_soak $MODULES ./declare_queues ./replaying_sender ./resuming_receiver $MESSAGES $REPORT_FREQUENCY $VERBOSITY $DURABILITY $N_QUEUES $N_BROKERS diff --git a/qpid/cpp/src/tests/test_env.sh.in b/qpid/cpp/src/tests/test_env.sh.in index 87fbbd128b..07bd4b2bee 100644 --- a/qpid/cpp/src/tests/test_env.sh.in +++ b/qpid/cpp/src/tests/test_env.sh.in @@ -73,3 +73,6 @@ exportmodule XML_LIB xml.so export QPID_NO_MODULE_DIR=1 # Don't accidentally load installed modules export QPID_DATA_DIR= # Default to no data dir, not ~/.qpidd +# Options for boost test framework +export BOOST_TEST_SHOW_PROGRESS=yes +export BOOST_TEST_CATCH_SYSTEM_ERRORS=no diff --git a/qpid/cpp/src/tests/verify_cluster_objects b/qpid/cpp/src/tests/verify_cluster_objects index cea875662f..664b88cb3b 100644..100755 --- a/qpid/cpp/src/tests/verify_cluster_objects +++ b/qpid/cpp/src/tests/verify_cluster_objects @@ -75,6 +75,12 @@ class IpAddr: bestAddr = addrPort return bestAddr +class ObjectId: + """Object identity, use for dictionaries by object id""" + def __init__(self, object): self.object = object + def __eq__(self, other): return self.object is other.object + def __hash__(self): return hash(id(self.object)) + class Broker(object): def __init__(self, qmf, broker): self.broker = broker @@ -94,6 +100,7 @@ class Broker(object): self.uptime = 0 self.tablesByName = {} self.package = "org.apache.qpid.broker" + self.id_cache = {} # Cache for getAbstractId def getUrl(self): return self.broker.getUrl() @@ -114,13 +121,14 @@ class Broker(object): # def getAbstractId(self, object): """ return a string the of the hierarchical name """ + if (ObjectId(object) in self.id_cache): return self.id_cache[ObjectId(object)] global _debug_recursion result = u"" valstr = u"" _debug_recursion += 1 debug_prefix = _debug_recursion if (_verbose > 9): - print debug_prefix, " enter gai: props ", self._properties + print debug_prefix, " enter gai: props ", object._properties for property, value in object._properties: # we want to recurse on things which are refs. we tell by @@ -138,6 +146,7 @@ class Broker(object): if property.name == "systemRef": _debug_recursion -= 1 + self.id_cache[ObjectId(object)] = "" return "" if property.index: @@ -176,6 +185,7 @@ class Broker(object): if (_verbose > 9): print debug_prefix, " id ", self, " -> ", result _debug_recursion -= 1 + self.id_cache[ObjectId(object)] = result return result def loadTable(self, cls): @@ -196,13 +206,12 @@ class Broker(object): # error (ie, the name-generation code is busted) if we do key = self.getAbstractId(obj) if key in self.tablesByName[cls.getClassName()]: - print "internal error: collision for %s on key %s\n" % (obj, key) - sys.exit(1) + raise Exception("internal error: collision for %s on key %s\n" + % (obj, key)) - self.tablesByName[cls.getClassName()][self.getAbstractId(obj)] = obj -# sys.exit(1) + self.tablesByName[cls.getClassName()][key] = obj if _verbose > 1: - print " ", obj.getObjectId(), " ", obj.getIndex(), " ", self.getAbstractId(obj) + print " ", obj.getObjectId(), " ", obj.getIndex(), " ", key class BrokerManager: @@ -253,9 +262,10 @@ class BrokerManager: raise Exception("Invalid URL 2") addrList.append((tokens[1], tokens[2])) - # Find the address in the list that is most likely to be in the same subnet as the address - # with which we made the original QMF connection. This increases the probability that we will - # be able to reach the cluster member. + # Find the address in the list that is most likely to be + # in the same subnet as the address with which we made the + # original QMF connection. This increases the probability + # that we will be able to reach the cluster member. best = hostAddr.bestAddr(addrList) bestUrl = best[0] + ":" + best[1] @@ -284,8 +294,7 @@ class BrokerManager: if _verbose > 0: print " ", b else: - print "Failed - Not a cluster" - sys.exit(1) + raise Exception("Failed - Not a cluster") failures = [] @@ -348,11 +357,10 @@ class BrokerManager: print "Failures:" for failure in failures: print " %s" % failure - sys.exit(1) + raise Exception("Failures") if _verbose > 0: print "Success" - sys.exit(0) ## ## Main Program |