summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-02-27 00:38:13 +0000
committerTed Ross <tross@apache.org>2010-02-27 00:38:13 +0000
commitacf3a1931ec404d1b02a2e115ef18e531d3924e4 (patch)
tree2a0b998795a676bae4ddc53cdacc82197885f771 /qpid/cpp/src
parent3296ad1ca8f77bf82fe9fd059c5e44580a4f2f4b (diff)
downloadqpid-python-acf3a1931ec404d1b02a2e115ef18e531d3924e4.tar.gz
Rebased the wmf branch to the trunk.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7@916887 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/CMakeLists.txt5
-rw-r--r--qpid/cpp/src/qmf/engine/EventImpl.cpp2
-rw-r--r--qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp1
-rw-r--r--qpid/cpp/src/qmf/engine/SchemaImpl.cpp1
-rw-r--r--qpid/cpp/src/qmf/engine/SchemaImpl.h1
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp8
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterTimer.h6
-rw-r--r--qpid/cpp/src/qpid/cluster/Cpg.cpp1
-rw-r--r--qpid/cpp/src/qpid/cluster/StoreStatus.cpp7
-rw-r--r--qpid/cpp/src/qpid/cluster/StoreStatus.h6
-rw-r--r--qpid/cpp/src/qpid/console/Value.cpp2
-rw-r--r--qpid/cpp/src/qpid/framing/AMQFrame.cpp2
-rw-r--r--qpid/cpp/src/qpid/framing/Array.cpp1
-rw-r--r--qpid/cpp/src/qpid/framing/BodyHandler.cpp1
-rw-r--r--qpid/cpp/src/qpid/framing/FieldTable.cpp1
-rw-r--r--qpid/cpp/src/qpid/framing/FieldValue.cpp1
-rw-r--r--qpid/cpp/src/qpid/framing/List.cpp1
-rw-r--r--qpid/cpp/src/qpid/framing/SequenceSet.cpp1
-rw-r--r--qpid/cpp/src/qpid/framing/Uuid.cpp1
-rw-r--r--qpid/cpp/src/qpid/messaging/Variant.cpp1
-rw-r--r--qpid/cpp/src/qpid/sys/AggregateOutput.cpp4
-rw-r--r--qpid/cpp/src/qpid/sys/posix/Shlib.cpp1
-rw-r--r--qpid/cpp/src/qpid/sys/ssl/check.h2
-rw-r--r--qpid/cpp/src/tests/Makefile.am1
-rw-r--r--qpid/cpp/src/tests/TxMocks.h1
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py54
-rw-r--r--qpid/cpp/src/tests/failover_soak.cpp159
-rwxr-xr-xqpid/cpp/src/tests/run_failover_soak4
-rw-r--r--qpid/cpp/src/tests/test_env.sh.in3
-rwxr-xr-x[-rw-r--r--]qpid/cpp/src/tests/verify_cluster_objects34
30 files changed, 196 insertions, 117 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index 62bab239be..a7078bea47 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -855,7 +855,6 @@ set (qmfengine_SOURCES
qmf/engine/Protocol.h
qmf/engine/QueryImpl.cpp
qmf/engine/QueryImpl.h
- qmf/engine/ResilientConnection.cpp
qmf/engine/SequenceManager.cpp
qmf/engine/SequenceManager.h
qmf/engine/SchemaImpl.cpp
@@ -863,6 +862,10 @@ set (qmfengine_SOURCES
qmf/engine/ValueImpl.cpp
qmf/engine/ValueImpl.h
)
+if (NOT WIN32)
+ list(APPEND qmfengine_SOURCES qmf/engine/ResilientConnection.cpp)
+endif (NOT WIN32)
+
add_library (qmfengine SHARED ${qmfengine_SOURCES})
target_link_libraries (qmfengine qpidclient)
set_target_properties (qmfengine PROPERTIES
diff --git a/qpid/cpp/src/qmf/engine/EventImpl.cpp b/qpid/cpp/src/qmf/engine/EventImpl.cpp
index 79d5a491fc..27501cc396 100644
--- a/qpid/cpp/src/qmf/engine/EventImpl.cpp
+++ b/qpid/cpp/src/qmf/engine/EventImpl.cpp
@@ -20,6 +20,8 @@
#include <qmf/engine/EventImpl.h>
#include <qmf/engine/ValueImpl.h>
+#include <sstream>
+
using namespace std;
using namespace qmf::engine;
using qpid::framing::Buffer;
diff --git a/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp b/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp
index 670ee385a3..9216f7bac0 100644
--- a/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp
+++ b/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp
@@ -19,6 +19,7 @@
#include "qmf/engine/ObjectIdImpl.h"
#include <stdlib.h>
+#include <sstream>
using namespace std;
using namespace qmf::engine;
diff --git a/qpid/cpp/src/qmf/engine/SchemaImpl.cpp b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp
index a4f56464a7..e276df8cec 100644
--- a/qpid/cpp/src/qmf/engine/SchemaImpl.cpp
+++ b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp
@@ -23,6 +23,7 @@
#include <string>
#include <vector>
#include <assert.h>
+#include <sstream>
using namespace std;
using namespace qmf::engine;
diff --git a/qpid/cpp/src/qmf/engine/SchemaImpl.h b/qpid/cpp/src/qmf/engine/SchemaImpl.h
index d78c921c67..71a10559cf 100644
--- a/qpid/cpp/src/qmf/engine/SchemaImpl.h
+++ b/qpid/cpp/src/qmf/engine/SchemaImpl.h
@@ -25,6 +25,7 @@
#include <string>
#include <vector>
#include <exception>
+#include <memory>
namespace qmf {
namespace engine {
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index e718819f48..f49fbb03a5 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -285,6 +285,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
}
Cluster::~Cluster() {
+ broker.setClusterTimer(std::auto_ptr<sys::Timer>(0)); // Delete cluster timer
if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
}
@@ -914,6 +915,12 @@ void Cluster::memberUpdate(Lock& l) {
size_t size = urls.size();
failoverExchange->updateUrls(urls);
+ if (store.hasStore()) {
+ // Mark store clean if I am the only broker, dirty otherwise.
+ if (size == 1) store.clean(Uuid(true));
+ else store.dirty(clusterId);
+ }
+
if (size == 1 && lastSize > 1 && state >= CATCHUP) {
QPID_LOG(notice, *this << " last broker standing, update queue policies");
lastBroker = true;
@@ -996,7 +1003,6 @@ void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNu
}
void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) {
- QPID_LOG(debug, "Cluster timer wakeup " << map.getFrameSeq() << ": " << name)
timer->deliverWakeup(name);
}
diff --git a/qpid/cpp/src/qpid/cluster/ClusterTimer.h b/qpid/cpp/src/qpid/cluster/ClusterTimer.h
index 395e505451..69f6c622e4 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterTimer.h
+++ b/qpid/cpp/src/qpid/cluster/ClusterTimer.h
@@ -30,6 +30,12 @@ namespace cluster {
class Cluster;
+/**
+ * Timer implementation that executes tasks consistently in the
+ * deliver thread across a cluster. Task is not executed when timer
+ * fires, instead the elder multicasts a wakeup. The task is executed
+ * when the wakeup is delivered.
+ */
class ClusterTimer : public sys::Timer {
public:
ClusterTimer(Cluster&);
diff --git a/qpid/cpp/src/qpid/cluster/Cpg.cpp b/qpid/cpp/src/qpid/cluster/Cpg.cpp
index 3ae0c970c7..0856bcd824 100644
--- a/qpid/cpp/src/qpid/cluster/Cpg.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cpg.cpp
@@ -54,7 +54,6 @@ void Cpg::callCpg ( CpgOp & c ) {
unsigned int snooze = 10;
for ( unsigned int nth_try = 0; nth_try < cpgRetries; ++ nth_try ) {
if ( CPG_OK == (result = c.op(handle, & group))) {
- QPID_LOG(info, c.opName << " successful.");
break;
}
else if ( result == CPG_ERR_TRY_AGAIN ) {
diff --git a/qpid/cpp/src/qpid/cluster/StoreStatus.cpp b/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
index cf75cd3b5f..648fcfbbd5 100644
--- a/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
+++ b/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
@@ -20,6 +20,7 @@
*/
#include "StoreStatus.h"
#include "qpid/Exception.h"
+#include "qpid/Msg.h"
#include <boost/filesystem/path.hpp>
#include <boost/filesystem/fstream.hpp>
#include <boost/filesystem/operations.hpp>
@@ -113,7 +114,12 @@ void StoreStatus::save() {
}
}
+bool StoreStatus::hasStore() const {
+ return state != framing::cluster::STORE_STATE_NO_STORE;
+}
+
void StoreStatus::dirty(const Uuid& clusterId_) {
+ if (!hasStore()) return;
assert(clusterId_);
clusterId = clusterId_;
shutdownId = Uuid();
@@ -122,6 +128,7 @@ void StoreStatus::dirty(const Uuid& clusterId_) {
}
void StoreStatus::clean(const Uuid& shutdownId_) {
+ if (!hasStore()) return;
assert(shutdownId_);
state = STORE_STATE_CLEAN_STORE;
shutdownId = shutdownId_;
diff --git a/qpid/cpp/src/qpid/cluster/StoreStatus.h b/qpid/cpp/src/qpid/cluster/StoreStatus.h
index 911b3a2ba2..2371f0424e 100644
--- a/qpid/cpp/src/qpid/cluster/StoreStatus.h
+++ b/qpid/cpp/src/qpid/cluster/StoreStatus.h
@@ -46,14 +46,14 @@ class StoreStatus
const Uuid& getShutdownId() const { return shutdownId; }
framing::SequenceNumber getConfigSeq() const { return configSeq; }
- void dirty(const Uuid& start); // Start using the store.
- void clean(const Uuid& stop); // Stop using the store.
+ void dirty(const Uuid& clusterId); // Mark the store in use by clusterId.
+ void clean(const Uuid& shutdownId); // Mark the store clean at shutdownId
void setConfigSeq(framing::SequenceNumber seq); // Update the config seq number.
void load();
void save();
- bool hasStore() { return state != framing::cluster::STORE_STATE_NO_STORE; }
+ bool hasStore() const;
private:
framing::cluster::StoreState state;
diff --git a/qpid/cpp/src/qpid/console/Value.cpp b/qpid/cpp/src/qpid/console/Value.cpp
index c30660f1dc..47c6a4ce57 100644
--- a/qpid/cpp/src/qpid/console/Value.cpp
+++ b/qpid/cpp/src/qpid/console/Value.cpp
@@ -22,6 +22,8 @@
#include "qpid/console/Value.h"
#include "qpid/framing/Buffer.h"
+#include <sstream>
+
using namespace qpid;
using namespace qpid::console;
using namespace std;
diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.cpp b/qpid/cpp/src/qpid/framing/AMQFrame.cpp
index 5c5920d786..d863970ece 100644
--- a/qpid/cpp/src/qpid/framing/AMQFrame.cpp
+++ b/qpid/cpp/src/qpid/framing/AMQFrame.cpp
@@ -24,6 +24,8 @@
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/BodyFactory.h"
#include "qpid/framing/MethodBodyFactory.h"
+#include "qpid/Msg.h"
+
#include <boost/format.hpp>
#include <iostream>
diff --git a/qpid/cpp/src/qpid/framing/Array.cpp b/qpid/cpp/src/qpid/framing/Array.cpp
index d95e0d167d..454e8e298f 100644
--- a/qpid/cpp/src/qpid/framing/Array.cpp
+++ b/qpid/cpp/src/qpid/framing/Array.cpp
@@ -23,6 +23,7 @@
#include "qpid/framing/FieldValue.h"
#include "qpid/Exception.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/Msg.h"
#include <assert.h>
namespace qpid {
diff --git a/qpid/cpp/src/qpid/framing/BodyHandler.cpp b/qpid/cpp/src/qpid/framing/BodyHandler.cpp
index e2128596ed..db302b1e4c 100644
--- a/qpid/cpp/src/qpid/framing/BodyHandler.cpp
+++ b/qpid/cpp/src/qpid/framing/BodyHandler.cpp
@@ -25,6 +25,7 @@
#include "qpid/framing/AMQHeartbeatBody.h"
#include <boost/cast.hpp>
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/Msg.h"
using namespace qpid::framing;
using namespace boost;
diff --git a/qpid/cpp/src/qpid/framing/FieldTable.cpp b/qpid/cpp/src/qpid/framing/FieldTable.cpp
index e2e91e450a..023e4af819 100644
--- a/qpid/cpp/src/qpid/framing/FieldTable.cpp
+++ b/qpid/cpp/src/qpid/framing/FieldTable.cpp
@@ -25,6 +25,7 @@
#include "qpid/framing/FieldValue.h"
#include "qpid/Exception.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/Msg.h"
#include <assert.h>
namespace qpid {
diff --git a/qpid/cpp/src/qpid/framing/FieldValue.cpp b/qpid/cpp/src/qpid/framing/FieldValue.cpp
index 0b49748de8..fd911645f4 100644
--- a/qpid/cpp/src/qpid/framing/FieldValue.cpp
+++ b/qpid/cpp/src/qpid/framing/FieldValue.cpp
@@ -24,6 +24,7 @@
#include "qpid/framing/Endian.h"
#include "qpid/framing/List.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/Msg.h"
namespace qpid {
namespace framing {
diff --git a/qpid/cpp/src/qpid/framing/List.cpp b/qpid/cpp/src/qpid/framing/List.cpp
index bde7dabbac..963ebc206b 100644
--- a/qpid/cpp/src/qpid/framing/List.cpp
+++ b/qpid/cpp/src/qpid/framing/List.cpp
@@ -23,6 +23,7 @@
#include "qpid/framing/FieldValue.h"
#include "qpid/Exception.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/Msg.h"
namespace qpid {
namespace framing {
diff --git a/qpid/cpp/src/qpid/framing/SequenceSet.cpp b/qpid/cpp/src/qpid/framing/SequenceSet.cpp
index dcfb4689b6..72fcd8a9e2 100644
--- a/qpid/cpp/src/qpid/framing/SequenceSet.cpp
+++ b/qpid/cpp/src/qpid/framing/SequenceSet.cpp
@@ -22,6 +22,7 @@
#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/Msg.h"
using namespace qpid::framing;
using std::max;
diff --git a/qpid/cpp/src/qpid/framing/Uuid.cpp b/qpid/cpp/src/qpid/framing/Uuid.cpp
index 432c7ab94e..67ca96d53f 100644
--- a/qpid/cpp/src/qpid/framing/Uuid.cpp
+++ b/qpid/cpp/src/qpid/framing/Uuid.cpp
@@ -22,6 +22,7 @@
#include "qpid/Exception.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/Msg.h"
namespace qpid {
namespace framing {
diff --git a/qpid/cpp/src/qpid/messaging/Variant.cpp b/qpid/cpp/src/qpid/messaging/Variant.cpp
index 116018f797..ba93f160ec 100644
--- a/qpid/cpp/src/qpid/messaging/Variant.cpp
+++ b/qpid/cpp/src/qpid/messaging/Variant.cpp
@@ -19,6 +19,7 @@
*
*/
#include "qpid/messaging/Variant.h"
+#include "qpid/Msg.h"
#include <boost/format.hpp>
#include <boost/lexical_cast.hpp>
#include <algorithm>
diff --git a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
index 4f0a4fa5af..fc95f46fb9 100644
--- a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
+++ b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
@@ -34,6 +34,7 @@ void AggregateOutput::activateOutput() { control.activateOutput(); }
void AggregateOutput::giveReadCredit(int32_t credit) { control.giveReadCredit(credit); }
+namespace {
// Clear the busy flag and notify waiting threads in destructor.
struct ScopedBusy {
bool& flag;
@@ -41,7 +42,8 @@ struct ScopedBusy {
ScopedBusy(bool& f, Monitor& m) : flag(f), monitor(m) { f = true; }
~ScopedBusy() { flag = false; monitor.notifyAll(); }
};
-
+}
+
bool AggregateOutput::doOutput() {
Mutex::ScopedLock l(lock);
ScopedBusy sb(busy, lock);
diff --git a/qpid/cpp/src/qpid/sys/posix/Shlib.cpp b/qpid/cpp/src/qpid/sys/posix/Shlib.cpp
index 299331103c..3fb685d5b8 100644
--- a/qpid/cpp/src/qpid/sys/posix/Shlib.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/Shlib.cpp
@@ -20,6 +20,7 @@
#include "qpid/sys/Shlib.h"
#include "qpid/Exception.h"
+#include "qpid/Msg.h"
#include <dlfcn.h>
diff --git a/qpid/cpp/src/qpid/sys/ssl/check.h b/qpid/cpp/src/qpid/sys/ssl/check.h
index 984c338a18..94db120afa 100644
--- a/qpid/cpp/src/qpid/sys/ssl/check.h
+++ b/qpid/cpp/src/qpid/sys/ssl/check.h
@@ -21,6 +21,8 @@
* under the License.
*
*/
+#include "qpid/Msg.h"
+
#include <iostream>
#include <string>
#include <nspr.h>
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index 4d65803ac1..4a931ab680 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -298,7 +298,6 @@ TESTS_ENVIRONMENT = \
VALGRIND=$(VALGRIND) \
LIBTOOL="$(LIBTOOL)" \
QPID_DATA_DIR= \
- BOOST_TEST_SHOW_PROGRESS=yes \
$(srcdir)/run_test
system_tests = client_test quick_perftest quick_topictest run_header_test quick_txtest
diff --git a/qpid/cpp/src/tests/TxMocks.h b/qpid/cpp/src/tests/TxMocks.h
index a34d864bae..72cb50cd21 100644
--- a/qpid/cpp/src/tests/TxMocks.h
+++ b/qpid/cpp/src/tests/TxMocks.h
@@ -23,6 +23,7 @@
#include "qpid/Exception.h"
+#include "qpid/Msg.h"
#include "qpid/broker/TransactionalStore.h"
#include "qpid/broker/TxOp.h"
#include <iostream>
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index b3274b1b1e..22b7c8f5b8 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -305,21 +305,47 @@ class StoreTests(BrokerTest):
self.assertRaises(Exception, lambda: a.ready())
self.assertRaises(Exception, lambda: b.ready())
- def test_total_failure(self):
- # Verify we abort with sutiable error message if no clean stores.
- cluster = self.cluster(0, args=self.args()+["--cluster-size=2"])
- a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False)
- b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=True)
- a.kill()
- b.kill()
- a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
- b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
- self.assertRaises(Exception, lambda: a.ready())
- self.assertRaises(Exception, lambda: b.ready())
+ def assert_dirty_store(self, broker):
+ self.assertRaises(Exception, lambda: broker.ready())
msg = re.compile("critical.*no clean store")
- assert msg.search(readfile(a.log))
- assert msg.search(readfile(b.log))
+ assert msg.search(readfile(broker.log))
+
+ def test_solo_store_clean(self):
+ # A single node cluster should always leave a clean store.
+ cluster = self.cluster(0, self.args())
+ a = cluster.start("a", expect=EXPECT_EXIT_FAIL)
+ a.send_message("q", Message("x", durable=True))
+ a.kill()
+ a = cluster.start("a")
+ self.assertEqual(a.get_message("q").content, "x")
+
+ def test_last_store_clean(self):
+
+ # Verify that only the last node in a cluster to shut down has
+ # a clean store. Start with cluster of 3, reduce to 1 then
+ # increase again to ensure that a node that was once alone but
+ # finally did not finish as the last node does not get a clean
+ # store.
+ cluster = self.cluster(0, self.args())
+ a = cluster.start("a", expect=EXPECT_EXIT_FAIL)
+ b = cluster.start("b", expect=EXPECT_EXIT_FAIL)
+ c = cluster.start("c", expect=EXPECT_EXIT_FAIL)
+ a.send_message("q", Message("x", durable=True))
+ a.kill()
+ b.kill() # c is last man
+ time.sleep(0.1) # pause for c to find out hes last.
+ a = cluster.start("a", expect=EXPECT_EXIT_FAIL) # c no longer last man
+ c.kill() # a is now last man
+ time.sleep(0.1) # pause for a to find out hes last.
+ a.kill() # really last
+ # b & c should be dirty
+ b = cluster.start("b", wait=False, expect=EXPECT_EXIT_OK)
+ self.assert_dirty_store(b)
+ c = cluster.start("c", wait=False, expect=EXPECT_EXIT_OK)
+ self.assert_dirty_store(c)
+ # a should be clean
+ a = cluster.start("a")
+ self.assertEqual(a.get_message("q").content, "x")
- # FIXME aconway 2009-12-03: verify manual restore procedure
diff --git a/qpid/cpp/src/tests/failover_soak.cpp b/qpid/cpp/src/tests/failover_soak.cpp
index ed5c9cbee5..8bf6eca9e6 100644
--- a/qpid/cpp/src/tests/failover_soak.cpp
+++ b/qpid/cpp/src/tests/failover_soak.cpp
@@ -54,6 +54,8 @@ using namespace qpid::client;
namespace qpid {
namespace tests {
+vector<pid_t> pids;
+
typedef vector<ForkedBroker *> brokerVector;
typedef enum
@@ -184,17 +186,29 @@ struct children : public vector<child *>
int
checkChildren ( )
{
- vector<child *>::iterator i;
- for ( i = begin(); i != end(); ++ i )
- if ( (COMPLETED == (*i)->status) && (0 != (*i)->retval) )
- {
- cerr << "checkChildren: error on child of type "
- << (*i)->type
- << endl;
- return (*i)->retval;
- }
+ for ( unsigned int i = 0; i < pids.size(); ++ i )
+ {
+ int pid = pids[i];
+ int returned_pid;
+ int status;
- return 0;
+ child * kid = get ( pid );
+
+ if ( kid->status != COMPLETED )
+ {
+ returned_pid = waitpid ( pid, &status, WNOHANG );
+
+ if ( returned_pid == pid )
+ {
+ int exit_status = WEXITSTATUS(status);
+ exited ( pid, exit_status );
+ if ( exit_status ) // this is a child error.
+ return exit_status;
+ }
+ }
+ }
+
+ return 0;
}
@@ -323,6 +337,7 @@ startNewBroker ( brokerVector & brokers,
int verbosity,
int durable )
{
+ // ("--log-enable=notice+")
static int brokerId = 0;
stringstream path, prefix;
prefix << "soak-" << brokerId;
@@ -516,6 +531,7 @@ startReceivingClient ( brokerVector brokers,
argv.push_back ( 0 );
pid_t pid = fork();
+ pids.push_back ( pid );
if ( ! pid ) {
execv ( receiverPath, const_cast<char * const *>(&argv[0]) );
@@ -571,6 +587,7 @@ startSendingClient ( brokerVector brokers,
argv.push_back ( 0 );
pid_t pid = fork();
+ pids.push_back ( pid );
if ( ! pid ) {
execv ( senderPath, const_cast<char * const *>(&argv[0]) );
@@ -602,6 +619,7 @@ using namespace qpid::tests;
int
main ( int argc, char const ** argv )
{
+ int brokerKills = 0;
if ( argc != 11 ) {
cerr << "Usage: "
<< argv[0]
@@ -625,7 +643,6 @@ main ( int argc, char const ** argv )
int n_brokers = atoi(argv[i++]);
char const * host = "127.0.0.1";
- int maxBrokers = 50;
allMyChildren.verbosity = verbosity;
@@ -722,104 +739,86 @@ main ( int argc, char const ** argv )
int minSleep = 2,
- maxSleep = 4;
+ maxSleep = 6;
+ int totalBrokers = n_brokers;
- for ( int totalBrokers = n_brokers;
- totalBrokers < maxBrokers;
- ++ totalBrokers
- )
+ int loop = 0;
+
+ while ( 1 )
{
+ ++ loop;
+
+ /*
+ if ( verbosity > 1 )
+ std::cerr << "------- loop " << loop << " --------\n";
+
if ( verbosity > 0 )
cout << totalBrokers << " brokers have been added to the cluster.\n\n\n";
+ */
// Sleep for a while. -------------------------
int sleepyTime = mrand ( minSleep, maxSleep );
- if ( verbosity > 0 )
- cout << "Sleeping for " << sleepyTime << " seconds.\n";
sleep ( sleepyTime );
- // Kill the oldest broker. --------------------------
- if ( ! killFrontBroker ( brokers, verbosity ) )
+ int bullet = mrand ( 100 );
+ if ( bullet >= 95 )
+ {
+ fprintf ( stderr, "Killing oldest broker...\n" );
+
+ // Kill the oldest broker. --------------------------
+ if ( ! killFrontBroker ( brokers, verbosity ) )
+ {
+ allMyChildren.killEverybody();
+ killAllBrokers ( brokers, 5 );
+ std::cerr << "END_OF_TEST ERROR_BROKER\n";
+ return ERROR_KILLING_BROKER;
+ }
+ ++ brokerKills;
+
+ // Start a new broker. --------------------------
+ if ( verbosity > 0 )
+ cout << "Starting new broker.\n\n";
+
+ startNewBroker ( brokers,
+ moduleOrDir,
+ clusterName,
+ verbosity,
+ durable );
+ ++ totalBrokers;
+ printBrokers ( brokers );
+ cerr << brokerKills << " brokers have been killed.\n\n\n";
+ }
+
+ int retval = allMyChildren.checkChildren();
+ if ( retval )
{
- allMyChildren.killEverybody();
- killAllBrokers ( brokers, 5 );
- std::cerr << "END_OF_TEST ERROR_BROKER\n";
- return ERROR_KILLING_BROKER;
+ std::cerr << "END_OF_TEST ERROR_CLIENT\n";
+ allMyChildren.killEverybody();
+ killAllBrokers ( brokers, 5 );
+ return ERROR_ON_CHILD;
}
- // Sleep for a while. -------------------------
- sleepyTime = mrand ( minSleep, maxSleep );
- if ( verbosity > 1 )
- cerr << "Sleeping for " << sleepyTime << " seconds.\n";
- sleep ( sleepyTime );
-
- // Start a new broker. --------------------------
- if ( verbosity > 0 )
- cout << "Starting new broker.\n\n";
-
- startNewBroker ( brokers,
- moduleOrDir,
- clusterName,
- verbosity,
- durable );
-
- if ( verbosity > 1 )
- printBrokers ( brokers );
-
// If all children have exited, quit.
int unfinished = allMyChildren.unfinished();
- if ( ! unfinished ) {
+ if ( unfinished == 0 ) {
killAllBrokers ( brokers, 5 );
if ( verbosity > 1 )
cout << "failoverSoak: all children have exited.\n";
- int retval = allMyChildren.checkChildren();
- if ( verbosity > 1 )
- std::cerr << "failoverSoak: checkChildren: " << retval << endl;
-
- if ( retval )
- {
- std::cerr << "END_OF_TEST ERROR_CLIENT\n";
- return ERROR_ON_CHILD;
- }
- else
- {
- std::cerr << "END_OF_TEST SUCCESSFUL\n";
- return HUNKY_DORY;
- }
- }
- // Even if some are still running, if there's an error, quit.
- if ( allMyChildren.checkChildren() )
- {
- if ( verbosity > 0 )
- cout << "failoverSoak: error on child.\n";
- allMyChildren.killEverybody();
- killAllBrokers ( brokers, 5 );
- std::cerr << "END_OF_TEST ERROR_CLIENT\n";
- return ERROR_ON_CHILD;
+ std::cerr << "END_OF_TEST SUCCESSFUL\n";
+ return HUNKY_DORY;
}
- if ( verbosity > 1 ) {
- std::cerr << "------- next kill-broker loop --------\n";
- allMyChildren.print();
- }
}
- retval = allMyChildren.checkChildren();
- if ( verbosity > 1 )
- std::cerr << "failoverSoak: checkChildren: " << retval << endl;
-
- if ( verbosity > 1 )
- cout << "failoverSoak: maxBrokers reached.\n";
-
allMyChildren.killEverybody();
killAllBrokers ( brokers, 5 );
std::cerr << "END_OF_TEST SUCCESSFUL\n";
- return retval ? ERROR_ON_CHILD : HUNKY_DORY;
+ return HUNKY_DORY;
}
diff --git a/qpid/cpp/src/tests/run_failover_soak b/qpid/cpp/src/tests/run_failover_soak
index 69551a51c2..c276e9cc2f 100755
--- a/qpid/cpp/src/tests/run_failover_soak
+++ b/qpid/cpp/src/tests/run_failover_soak
@@ -26,12 +26,12 @@ host=127.0.0.1
unset QPID_NO_MODULE_DIR # failover_soak uses --module-dir, dont want clash
MODULES=${MODULES:-$moduledir}
-MESSAGES=${MESSAGES:-1000000}
+MESSAGES=${MESSAGES:-500000}
REPORT_FREQUENCY=${REPORT_FREQUENCY:-20000}
VERBOSITY=${VERBOSITY:-10}
DURABILITY=${DURABILITY:-0}
N_QUEUES=${N_QUEUES:-1}
-N_BROKERS=${N_BROKERS:-3}
+N_BROKERS=${N_BROKERS:-4}
rm -f soak-*.log
exec ./failover_soak $MODULES ./declare_queues ./replaying_sender ./resuming_receiver $MESSAGES $REPORT_FREQUENCY $VERBOSITY $DURABILITY $N_QUEUES $N_BROKERS
diff --git a/qpid/cpp/src/tests/test_env.sh.in b/qpid/cpp/src/tests/test_env.sh.in
index 87fbbd128b..07bd4b2bee 100644
--- a/qpid/cpp/src/tests/test_env.sh.in
+++ b/qpid/cpp/src/tests/test_env.sh.in
@@ -73,3 +73,6 @@ exportmodule XML_LIB xml.so
export QPID_NO_MODULE_DIR=1 # Don't accidentally load installed modules
export QPID_DATA_DIR= # Default to no data dir, not ~/.qpidd
+# Options for boost test framework
+export BOOST_TEST_SHOW_PROGRESS=yes
+export BOOST_TEST_CATCH_SYSTEM_ERRORS=no
diff --git a/qpid/cpp/src/tests/verify_cluster_objects b/qpid/cpp/src/tests/verify_cluster_objects
index cea875662f..664b88cb3b 100644..100755
--- a/qpid/cpp/src/tests/verify_cluster_objects
+++ b/qpid/cpp/src/tests/verify_cluster_objects
@@ -75,6 +75,12 @@ class IpAddr:
bestAddr = addrPort
return bestAddr
+class ObjectId:
+ """Object identity, use for dictionaries by object id"""
+ def __init__(self, object): self.object = object
+ def __eq__(self, other): return self.object is other.object
+ def __hash__(self): return hash(id(self.object))
+
class Broker(object):
def __init__(self, qmf, broker):
self.broker = broker
@@ -94,6 +100,7 @@ class Broker(object):
self.uptime = 0
self.tablesByName = {}
self.package = "org.apache.qpid.broker"
+ self.id_cache = {} # Cache for getAbstractId
def getUrl(self):
return self.broker.getUrl()
@@ -114,13 +121,14 @@ class Broker(object):
#
def getAbstractId(self, object):
""" return a string the of the hierarchical name """
+ if (ObjectId(object) in self.id_cache): return self.id_cache[ObjectId(object)]
global _debug_recursion
result = u""
valstr = u""
_debug_recursion += 1
debug_prefix = _debug_recursion
if (_verbose > 9):
- print debug_prefix, " enter gai: props ", self._properties
+ print debug_prefix, " enter gai: props ", object._properties
for property, value in object._properties:
# we want to recurse on things which are refs. we tell by
@@ -138,6 +146,7 @@ class Broker(object):
if property.name == "systemRef":
_debug_recursion -= 1
+ self.id_cache[ObjectId(object)] = ""
return ""
if property.index:
@@ -176,6 +185,7 @@ class Broker(object):
if (_verbose > 9):
print debug_prefix, " id ", self, " -> ", result
_debug_recursion -= 1
+ self.id_cache[ObjectId(object)] = result
return result
def loadTable(self, cls):
@@ -196,13 +206,12 @@ class Broker(object):
# error (ie, the name-generation code is busted) if we do
key = self.getAbstractId(obj)
if key in self.tablesByName[cls.getClassName()]:
- print "internal error: collision for %s on key %s\n" % (obj, key)
- sys.exit(1)
+ raise Exception("internal error: collision for %s on key %s\n"
+ % (obj, key))
- self.tablesByName[cls.getClassName()][self.getAbstractId(obj)] = obj
-# sys.exit(1)
+ self.tablesByName[cls.getClassName()][key] = obj
if _verbose > 1:
- print " ", obj.getObjectId(), " ", obj.getIndex(), " ", self.getAbstractId(obj)
+ print " ", obj.getObjectId(), " ", obj.getIndex(), " ", key
class BrokerManager:
@@ -253,9 +262,10 @@ class BrokerManager:
raise Exception("Invalid URL 2")
addrList.append((tokens[1], tokens[2]))
- # Find the address in the list that is most likely to be in the same subnet as the address
- # with which we made the original QMF connection. This increases the probability that we will
- # be able to reach the cluster member.
+ # Find the address in the list that is most likely to be
+ # in the same subnet as the address with which we made the
+ # original QMF connection. This increases the probability
+ # that we will be able to reach the cluster member.
best = hostAddr.bestAddr(addrList)
bestUrl = best[0] + ":" + best[1]
@@ -284,8 +294,7 @@ class BrokerManager:
if _verbose > 0:
print " ", b
else:
- print "Failed - Not a cluster"
- sys.exit(1)
+ raise Exception("Failed - Not a cluster")
failures = []
@@ -348,11 +357,10 @@ class BrokerManager:
print "Failures:"
for failure in failures:
print " %s" % failure
- sys.exit(1)
+ raise Exception("Failures")
if _verbose > 0:
print "Success"
- sys.exit(0)
##
## Main Program