summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-09-19 15:13:18 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-09-19 15:13:18 +0000
commit69ae049561adde6b053c5906d0514c375814cae7 (patch)
tree11974930517338c66148a246a212c8eb6e00d486 /qpid/cpp/src/tests
parent2b6022a4eecaf48f44b5a4031d5b9f25b84adb1f (diff)
downloadqpid-python-69ae049561adde6b053c5906d0514c375814cae7.tar.gz
QPID-3346: merge in latest from trunk (r1172628)
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1172657 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
-rw-r--r--qpid/cpp/src/tests/BrokerFixture.h32
-rw-r--r--qpid/cpp/src/tests/CMakeLists.txt11
-rw-r--r--qpid/cpp/src/tests/ClientSessionTest.cpp26
-rw-r--r--qpid/cpp/src/tests/ExchangeTest.cpp2
-rw-r--r--qpid/cpp/src/tests/Makefile.am7
-rw-r--r--qpid/cpp/src/tests/MessageReplayTracker.cpp4
-rw-r--r--qpid/cpp/src/tests/MessagingSessionTests.cpp36
-rw-r--r--qpid/cpp/src/tests/Qmf2.cpp104
-rw-r--r--qpid/cpp/src/tests/QueueEvents.cpp4
-rw-r--r--qpid/cpp/src/tests/QueuePolicyTest.cpp14
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp60
-rw-r--r--qpid/cpp/src/tests/ReplicationTest.cpp2
-rw-r--r--qpid/cpp/src/tests/SessionState.cpp8
-rw-r--r--qpid/cpp/src/tests/SocketProxy.h183
-rw-r--r--qpid/cpp/src/tests/TxPublishTest.cpp3
-rw-r--r--qpid/cpp/src/tests/Url.cpp26
-rw-r--r--qpid/cpp/src/tests/XmlClientSessionTest.cpp2
-rwxr-xr-xqpid/cpp/src/tests/allhosts4
-rw-r--r--qpid/cpp/src/tests/brokertest.py33
-rw-r--r--qpid/cpp/src/tests/cluster_python_tests_failing.txt28
-rwxr-xr-xqpid/cpp/src/tests/cluster_test_logs.py1
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py387
-rw-r--r--qpid/cpp/src/tests/exception_test.cpp14
-rwxr-xr-xqpid/cpp/src/tests/federated_topic_test27
-rwxr-xr-xqpid/cpp/src/tests/federation.py85
-rwxr-xr-xqpid/cpp/src/tests/federation_sys.py1900
-rwxr-xr-xqpid/cpp/src/tests/ipv6_test150
-rwxr-xr-xqpid/cpp/src/tests/qpid-cpp-benchmark3
-rw-r--r--qpid/cpp/src/tests/qpid-perftest.cpp7
-rwxr-xr-xqpid/cpp/src/tests/run_federation_sys_tests97
-rwxr-xr-xqpid/cpp/src/tests/run_federation_tests2
-rw-r--r--qpid/cpp/src/tests/run_long_federation_sys_tests24
-rw-r--r--qpid/cpp/src/tests/run_store_tests.ps12
-rwxr-xr-xqpid/cpp/src/tests/sasl_test_setup.sh1
34 files changed, 2913 insertions, 376 deletions
diff --git a/qpid/cpp/src/tests/BrokerFixture.h b/qpid/cpp/src/tests/BrokerFixture.h
index 672d954572..92c6d22b57 100644
--- a/qpid/cpp/src/tests/BrokerFixture.h
+++ b/qpid/cpp/src/tests/BrokerFixture.h
@@ -22,8 +22,6 @@
*
*/
-#include "SocketProxy.h"
-
#include "qpid/broker/Broker.h"
#include "qpid/client/Connection.h"
#include "qpid/client/ConnectionImpl.h"
@@ -71,16 +69,15 @@ struct BrokerFixture : private boost::noncopyable {
brokerThread = qpid::sys::Thread(*broker);
};
- void shutdownBroker()
- {
- broker->shutdown();
- broker = BrokerPtr();
+ void shutdownBroker() {
+ if (broker) {
+ broker->shutdown();
+ brokerThread.join();
+ broker = BrokerPtr();
+ }
}
- ~BrokerFixture() {
- if (broker) broker->shutdown();
- brokerThread.join();
- }
+ ~BrokerFixture() { shutdownBroker(); }
/** Open a connection to the broker. */
void open(qpid::client::Connection& c) {
@@ -97,20 +94,6 @@ struct LocalConnection : public qpid::client::Connection {
~LocalConnection() { close(); }
};
-/** A local client connection via a socket proxy. */
-struct ProxyConnection : public qpid::client::Connection {
- SocketProxy proxy;
- ProxyConnection(int brokerPort) : proxy(brokerPort) {
- open("localhost", proxy.getPort());
- }
- ProxyConnection(const qpid::client::ConnectionSettings& s) : proxy(s.port) {
- qpid::client::ConnectionSettings proxySettings(s);
- proxySettings.port = proxy.getPort();
- open(proxySettings);
- }
- ~ProxyConnection() { close(); }
-};
-
/** Convenience class to create and open a connection and session
* and some related useful objects.
*/
@@ -147,7 +130,6 @@ struct SessionFixtureT : BrokerFixture, ClientT<ConnectionType,SessionType> {
};
typedef SessionFixtureT<LocalConnection> SessionFixture;
-typedef SessionFixtureT<ProxyConnection> ProxySessionFixture;
}} // namespace qpid::tests
diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt
index 405718f12b..cc33478114 100644
--- a/qpid/cpp/src/tests/CMakeLists.txt
+++ b/qpid/cpp/src/tests/CMakeLists.txt
@@ -264,6 +264,14 @@ add_executable (qpid-send qpid-send.cpp Statistics.cpp ${platform_test_additions
target_link_libraries (qpid-send qpidmessaging)
remember_location(qpid-send)
+add_executable (qpid-ping qpid-ping.cpp ${platform_test_additions})
+target_link_libraries (qpid-ping qpidclient)
+remember_location(qpid-ping)
+
+add_executable (datagen datagen.cpp ${platform_test_additions})
+target_link_libraries (datagen qpidclient)
+remember_location(datagen)
+
# qpid-perftest and qpid-latency-test are generally useful so install them
install (TARGETS qpid-perftest qpid-latency-test RUNTIME
DESTINATION ${QPID_INSTALL_BINDIR})
@@ -278,7 +286,7 @@ set(test_wrap ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_test${test_script_suffix}
add_test (unit_test ${test_wrap} ${unit_test_LOCATION})
add_test (start_broker ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/start_broker${test_script_suffix})
-add_test (qpid-client-test ${test_wrap} ${qpid-client_test_LOCATION})
+add_test (qpid-client-test ${test_wrap} ${qpid-client-test_LOCATION})
add_test (quick_perftest ${test_wrap} ${qpid-perftest_LOCATION} --summary --count 100)
add_test (quick_topictest ${test_wrap} ${CMAKE_CURRENT_SOURCE_DIR}/quick_topictest${test_script_suffix})
add_test (quick_txtest ${test_wrap} ${qpid-txtest_LOCATION} --queues 4 --tx-count 10 --quiet)
@@ -288,6 +296,7 @@ if (PYTHON_EXECUTABLE)
endif (PYTHON_EXECUTABLE)
add_test (stop_broker ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/stop_broker${test_script_suffix})
if (PYTHON_EXECUTABLE)
+ add_test (ipv6_test ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/ipv6_test${test_script_suffix})
add_test (federation_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_federation_tests${test_script_suffix})
if (BUILD_ACL)
add_test (acl_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_acl_tests${test_script_suffix})
diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp
index 3c0cff7350..30441cd03c 100644
--- a/qpid/cpp/src/tests/ClientSessionTest.cpp
+++ b/qpid/cpp/src/tests/ClientSessionTest.cpp
@@ -102,9 +102,9 @@ struct SimpleListener : public MessageListener
}
};
-struct ClientSessionFixture : public ProxySessionFixture
+struct ClientSessionFixture : public SessionFixture
{
- ClientSessionFixture(Broker::Options opts = Broker::Options()) : ProxySessionFixture(opts) {
+ ClientSessionFixture(Broker::Options opts = Broker::Options()) : SessionFixture(opts) {
session.queueDeclare(arg::queue="my-queue");
}
};
@@ -150,16 +150,6 @@ QPID_AUTO_TEST_CASE(testDispatcherThread)
BOOST_CHECK_EQUAL(boost::lexical_cast<string>(i), listener.messages[i].getData());
}
-// FIXME aconway 2009-06-17: test for unimplemented feature, enable when implemented.
-void testSuspend0Timeout() {
- ClientSessionFixture fix;
- fix.session.suspend(); // session has 0 timeout.
- try {
- fix.connection.resume(fix.session);
- BOOST_FAIL("Expected InvalidArgumentException.");
- } catch(const InternalErrorException&) {}
-}
-
QPID_AUTO_TEST_CASE(testUseSuspendedError)
{
ClientSessionFixture fix;
@@ -171,18 +161,6 @@ QPID_AUTO_TEST_CASE(testUseSuspendedError)
} catch(const NotAttachedException&) {}
}
-// FIXME aconway 2009-06-17: test for unimplemented feature, enable when implemented.
-void testSuspendResume() {
- ClientSessionFixture fix;
- fix.session.timeout(60);
- fix.session.suspend();
- // Make sure we are still subscribed after resume.
- fix.connection.resume(fix.session);
- fix.session.messageTransfer(arg::content=Message("my-message", "my-queue"));
- BOOST_CHECK_EQUAL("my-message", fix.subs.get("my-queue", TIME_SEC).getData());
-}
-
-
QPID_AUTO_TEST_CASE(testSendToSelf) {
ClientSessionFixture fix;
SimpleListener mylistener;
diff --git a/qpid/cpp/src/tests/ExchangeTest.cpp b/qpid/cpp/src/tests/ExchangeTest.cpp
index 88a1cd99c2..fe72f42a46 100644
--- a/qpid/cpp/src/tests/ExchangeTest.cpp
+++ b/qpid/cpp/src/tests/ExchangeTest.cpp
@@ -253,7 +253,7 @@ QPID_AUTO_TEST_CASE(testIVEOption)
TopicExchange topic ("topic1", false, args);
intrusive_ptr<Message> msg1 = cmessage("direct1", "abc");
- msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString("a", "abc");
+ msg1->insertCustomProperty("a", "abc");
DeliverableMessage dmsg1(msg1);
FieldTable args2;
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index dceabe36cc..78ac6db5f1 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -304,9 +304,9 @@ TESTS_ENVIRONMENT = \
system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest \
run_msg_group_tests
-TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests \
+TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_federation_sys_tests \
run_acl_tests run_cli_tests replication_test dynamic_log_level_test \
- run_queue_flow_limit_tests
+ run_queue_flow_limit_tests ipv6_test
EXTRA_DIST += \
run_test vg_check \
@@ -321,6 +321,8 @@ EXTRA_DIST += \
config.null \
ais_check \
run_federation_tests \
+ run_federation_sys_tests \
+ run_long_federation_sys_tests \
run_cli_tests \
run_acl_tests \
.valgrind.supp \
@@ -362,6 +364,7 @@ LONG_TESTS+=start_broker \
fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test \
run_msg_groups_tests_soak \
stop_broker \
+ run_long_federation_sys_tests \
run_failover_soak reliable_replication_test \
federated_cluster_test_with_node_failure
diff --git a/qpid/cpp/src/tests/MessageReplayTracker.cpp b/qpid/cpp/src/tests/MessageReplayTracker.cpp
index 3d79ee53c2..e35f673683 100644
--- a/qpid/cpp/src/tests/MessageReplayTracker.cpp
+++ b/qpid/cpp/src/tests/MessageReplayTracker.cpp
@@ -51,7 +51,7 @@ class ReplayBufferChecker
QPID_AUTO_TEST_CASE(testReplay)
{
- ProxySessionFixture fix;
+ SessionFixture fix;
fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true);
MessageReplayTracker tracker(10);
@@ -77,7 +77,7 @@ QPID_AUTO_TEST_CASE(testReplay)
QPID_AUTO_TEST_CASE(testCheckCompletion)
{
- ProxySessionFixture fix;
+ SessionFixture fix;
fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true);
MessageReplayTracker tracker(10);
diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp
index fae45a94d0..418653978b 100644
--- a/qpid/cpp/src/tests/MessagingSessionTests.cpp
+++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp
@@ -611,6 +611,28 @@ QPID_AUTO_TEST_CASE(testAssertPolicyQueue)
fix.admin.deleteQueue("q");
}
+QPID_AUTO_TEST_CASE(testAssertExchangeOption)
+{
+ MessagingFixture fix;
+ std::string a1 = "e; {create:always, assert:always, node:{type:topic, x-declare:{type:direct, arguments:{qpid.msg_sequence:True}}}}";
+ Sender s1 = fix.session.createSender(a1);
+ s1.close();
+ Receiver r1 = fix.session.createReceiver(a1);
+ r1.close();
+
+ std::string a2 = "e; {assert:receiver, node:{type:topic, x-declare:{type:fanout, arguments:{qpid.msg_sequence:True}}}}";
+ Sender s2 = fix.session.createSender(a2);
+ s2.close();
+ BOOST_CHECK_THROW(fix.session.createReceiver(a2), qpid::messaging::AssertionFailed);
+
+ std::string a3 = "e; {assert:sender, node:{x-declare:{arguments:{qpid.msg_sequence:False}}}}";
+ BOOST_CHECK_THROW(fix.session.createSender(a3), qpid::messaging::AssertionFailed);
+ Receiver r3 = fix.session.createReceiver(a3);
+ r3.close();
+
+ fix.admin.deleteExchange("e");
+}
+
QPID_AUTO_TEST_CASE(testGetSender)
{
QueueFixture fix;
@@ -1064,6 +1086,20 @@ QPID_AUTO_TEST_CASE(testAcknowledgeUpTo)
BOOST_CHECK(!fix.session.createReceiver(fix.queue).fetch(m, Duration::IMMEDIATE));
}
+QPID_AUTO_TEST_CASE(testCreateBindingsOnStandardExchange)
+{
+ QueueFixture fix;
+ Sender sender = fix.session.createSender((boost::format("amq.direct; {create:always, node:{type:topic, x-bindings:[{queue:%1%, key:my-subject}]}}") % fix.queue).str());
+ Message out("test-message");
+ out.setSubject("my-subject");
+ sender.send(out);
+ Receiver receiver = fix.session.createReceiver(fix.queue);
+ Message in = receiver.fetch(Duration::SECOND * 5);
+ fix.session.acknowledge();
+ BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
+ BOOST_CHECK_EQUAL(in.getSubject(), out.getSubject());
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
diff --git a/qpid/cpp/src/tests/Qmf2.cpp b/qpid/cpp/src/tests/Qmf2.cpp
index 66c774accd..bc263d5c6d 100644
--- a/qpid/cpp/src/tests/Qmf2.cpp
+++ b/qpid/cpp/src/tests/Qmf2.cpp
@@ -23,12 +23,36 @@
#include "qmf/QueryImpl.h"
#include "qmf/SchemaImpl.h"
#include "qmf/exceptions.h"
-
+#include "qpid/messaging/Connection.h"
+#include "qmf/PosixEventNotifierImpl.h"
+#include "qmf/AgentSession.h"
+#include "qmf/AgentSessionImpl.h"
+#include "qmf/ConsoleSession.h"
+#include "qmf/ConsoleSessionImpl.h"
#include "unit_test.h"
+using namespace std;
using namespace qpid::types;
+using namespace qpid::messaging;
using namespace qmf;
+bool isReadable(int fd)
+{
+ fd_set rfds;
+ struct timeval tv;
+ int nfds, result;
+
+ FD_ZERO(&rfds);
+ FD_SET(fd, &rfds);
+ nfds = fd + 1;
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+
+ result = select(nfds, &rfds, NULL, NULL, &tv);
+
+ return result > 0;
+}
+
namespace qpid {
namespace tests {
@@ -315,6 +339,84 @@ QPID_AUTO_TEST_CASE(testSchema)
BOOST_CHECK_THROW(method.getArgument(3), QmfException);
}
+QPID_AUTO_TEST_CASE(testAgentSessionEventListener)
+{
+ Connection connection("localhost");
+ AgentSession session(connection, "");
+ posix::EventNotifier notifier(session);
+
+ AgentSessionImpl& sessionImpl = AgentSessionImplAccess::get(session);
+
+ BOOST_CHECK(sessionImpl.getEventNotifier() != 0);
+}
+
+QPID_AUTO_TEST_CASE(testConsoleSessionEventListener)
+{
+ Connection connection("localhost");
+ ConsoleSession session(connection, "");
+ posix::EventNotifier notifier(session);
+
+ ConsoleSessionImpl& sessionImpl = ConsoleSessionImplAccess::get(session);
+
+ BOOST_CHECK(sessionImpl.getEventNotifier() != 0);
+}
+
+QPID_AUTO_TEST_CASE(testGetHandle)
+{
+ Connection connection("localhost");
+ ConsoleSession session(connection, "");
+ posix::EventNotifier notifier(session);
+
+ BOOST_CHECK(notifier.getHandle() > 0);
+}
+
+QPID_AUTO_TEST_CASE(testSetReadableToFalse)
+{
+ Connection connection("localhost");
+ ConsoleSession session(connection, "");
+ posix::EventNotifier notifier(session);
+ PosixEventNotifierImplAccess::get(notifier).setReadable(false);
+
+ bool readable(isReadable(notifier.getHandle()));
+ BOOST_CHECK(!readable);
+}
+
+QPID_AUTO_TEST_CASE(testSetReadable)
+{
+ Connection connection("localhost");
+ ConsoleSession session(connection, "");
+ posix::EventNotifier notifier(session);
+ PosixEventNotifierImplAccess::get(notifier).setReadable(true);
+
+ bool readable(isReadable(notifier.getHandle()));
+ BOOST_CHECK(readable);
+}
+
+QPID_AUTO_TEST_CASE(testSetReadableMultiple)
+{
+ Connection connection("localhost");
+ ConsoleSession session(connection, "");
+ posix::EventNotifier notifier(session);
+ for (int i = 0; i < 15; i++)
+ PosixEventNotifierImplAccess::get(notifier).setReadable(true);
+ PosixEventNotifierImplAccess::get(notifier).setReadable(false);
+
+ bool readable(isReadable(notifier.getHandle()));
+ BOOST_CHECK(!readable);
+}
+
+QPID_AUTO_TEST_CASE(testDeleteNotifier)
+{
+ Connection connection("localhost");
+ ConsoleSession session(connection, "");
+ ConsoleSessionImpl& sessionImpl = ConsoleSessionImplAccess::get(session);
+ {
+ posix::EventNotifier notifier(session);
+ BOOST_CHECK(sessionImpl.getEventNotifier() != 0);
+ }
+ BOOST_CHECK(sessionImpl.getEventNotifier() == 0);
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
diff --git a/qpid/cpp/src/tests/QueueEvents.cpp b/qpid/cpp/src/tests/QueueEvents.cpp
index bd18fa45fb..cea8bbf0db 100644
--- a/qpid/cpp/src/tests/QueueEvents.cpp
+++ b/qpid/cpp/src/tests/QueueEvents.cpp
@@ -147,7 +147,7 @@ struct EventRecorder
QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing)
{
- ProxySessionFixture fixture;
+ SessionFixture fixture;
//register dummy event listener to broker
EventRecorder listener;
fixture.broker->getQueueEvents().registerListener("recorder", boost::bind(&EventRecorder::handle, &listener, _1));
@@ -194,7 +194,7 @@ QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing)
QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing_enqueuesOnly)
{
- ProxySessionFixture fixture;
+ SessionFixture fixture;
//register dummy event listener to broker
EventRecorder listener;
fixture.broker->getQueueEvents().registerListener("recorder", boost::bind(&EventRecorder::handle, &listener, _1));
diff --git a/qpid/cpp/src/tests/QueuePolicyTest.cpp b/qpid/cpp/src/tests/QueuePolicyTest.cpp
index 5455105078..f735e09449 100644
--- a/qpid/cpp/src/tests/QueuePolicyTest.cpp
+++ b/qpid/cpp/src/tests/QueuePolicyTest.cpp
@@ -152,7 +152,7 @@ QPID_AUTO_TEST_CASE(testRingPolicyCount)
std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING);
policy->update(args);
- ProxySessionFixture f;
+ SessionFixture f;
std::string q("my-ring-queue");
f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
for (int i = 0; i < 10; i++) {
@@ -187,7 +187,7 @@ QPID_AUTO_TEST_CASE(testRingPolicySize)
std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 0, 500, QueuePolicy::RING);
policy->update(args);
- ProxySessionFixture f;
+ SessionFixture f;
std::string q("my-ring-queue");
f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
@@ -259,7 +259,7 @@ QPID_AUTO_TEST_CASE(testStrictRingPolicy)
std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING_STRICT);
policy->update(args);
- ProxySessionFixture f;
+ SessionFixture f;
std::string q("my-ring-queue");
f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
LocalQueue incoming;
@@ -285,7 +285,7 @@ QPID_AUTO_TEST_CASE(testPolicyWithDtx)
std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT);
policy->update(args);
- ProxySessionFixture f;
+ SessionFixture f;
std::string q("my-policy-queue");
f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
LocalQueue incoming;
@@ -345,7 +345,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskWithNoStore)
// Disable flow control, or else we'll never hit the max limit
args.setInt(QueueFlowLimit::flowStopCountKey, 0);
- ProxySessionFixture f;
+ SessionFixture f;
std::string q("my-queue");
f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
LocalQueue incoming;
@@ -371,7 +371,7 @@ QPID_AUTO_TEST_CASE(testPolicyFailureOnCommit)
std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT);
policy->update(args);
- ProxySessionFixture f;
+ SessionFixture f;
std::string q("q");
f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
f.session.txSelect();
@@ -388,7 +388,7 @@ QPID_AUTO_TEST_CASE(testCapacityConversion)
args.setString("qpid.max_count", "5");
args.setString("qpid.flow_stop_count", "0");
- ProxySessionFixture f;
+ SessionFixture f;
std::string q("q");
f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
for (int i = 0; i < 5; i++) {
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index 98fbc2cba4..5274f2370d 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -81,13 +81,14 @@ public:
Message& getMessage() { return *(msg.get()); }
};
-intrusive_ptr<Message> create_message(std::string exchange, std::string routingKey) {
+intrusive_ptr<Message> create_message(std::string exchange, std::string routingKey, uint64_t ttl = 0) {
intrusive_ptr<Message> msg(new Message());
AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
AMQFrame header((AMQHeaderBody()));
msg->getFrames().append(method);
msg->getFrames().append(header);
msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
+ if (ttl) msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setTtl(ttl);
return msg;
}
@@ -441,10 +442,10 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
- msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
- msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
- msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
- msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ msg1->insertCustomProperty(key,"a");
+ msg2->insertCustomProperty(key,"b");
+ msg3->insertCustomProperty(key,"c");
+ msg4->insertCustomProperty(key,"a");
//enqueue 4 message
queue->deliver(msg1);
@@ -466,9 +467,9 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
intrusive_ptr<Message> msg5 = create_message("e", "A");
intrusive_ptr<Message> msg6 = create_message("e", "B");
intrusive_ptr<Message> msg7 = create_message("e", "C");
- msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
- msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
- msg7->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
+ msg5->insertCustomProperty(key,"a");
+ msg6->insertCustomProperty(key,"b");
+ msg7->insertCustomProperty(key,"c");
queue->deliver(msg5);
queue->deliver(msg6);
queue->deliver(msg7);
@@ -503,7 +504,7 @@ QPID_AUTO_TEST_CASE(testLVQEmptyKey){
BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
- msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ msg1->insertCustomProperty(key,"a");
queue->deliver(msg1);
queue->deliver(msg2);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
@@ -535,12 +536,12 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
- msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
- msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
- msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
- msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
- msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
- msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
+ msg1->insertCustomProperty(key,"a");
+ msg2->insertCustomProperty(key,"b");
+ msg3->insertCustomProperty(key,"c");
+ msg4->insertCustomProperty(key,"a");
+ msg5->insertCustomProperty(key,"b");
+ msg6->insertCustomProperty(key,"c");
//enqueue 4 message
queue->deliver(msg1);
@@ -605,8 +606,8 @@ QPID_AUTO_TEST_CASE(testLVQMultiQueue){
args.getLVQKey(key);
BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
- msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
- msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ msg1->insertCustomProperty(key,"a");
+ msg2->insertCustomProperty(key,"a");
queue1->deliver(msg1);
queue2->deliver(msg1);
@@ -649,8 +650,8 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
args.getLVQKey(key);
BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
- msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
- msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ msg1->insertCustomProperty(key,"a");
+ msg2->insertCustomProperty(key,"a");
// 3
queue1->deliver(msg1);
// 4
@@ -670,12 +671,7 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0)
{
for (uint i = 0; i < count; i++) {
- intrusive_ptr<Message> m = create_message("exchange", "key");
- if (i % 2) {
- if (oddTtl) m->getProperties<DeliveryProperties>()->setTtl(oddTtl);
- } else {
- if (evenTtl) m->getProperties<DeliveryProperties>()->setTtl(evenTtl);
- }
+ intrusive_ptr<Message> m = create_message("exchange", "key", i % 2 ? oddTtl : evenTtl);
m->setTimestamp(new broker::ExpiryPolicy);
queue.deliver(m);
}
@@ -738,8 +734,8 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
std::string("c"), std::string("c"), std::string("c") };
for (int i = 0; i < 9; ++i) {
intrusive_ptr<Message> msg = create_message("e", "A");
- msg->getProperties<MessageProperties>()->getApplicationHeaders().setString("GROUP-ID", groups[i]);
- msg->getProperties<MessageProperties>()->getApplicationHeaders().setInt("MY-ID", i);
+ msg->insertCustomProperty("GROUP-ID", groups[i]);
+ msg->insertCustomProperty("MY-ID", i);
queue->deliver(msg);
}
@@ -885,8 +881,8 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
// Owners= ^C3,
intrusive_ptr<Message> msg = create_message("e", "A");
- msg->getProperties<MessageProperties>()->getApplicationHeaders().setString("GROUP-ID", "a");
- msg->getProperties<MessageProperties>()->getApplicationHeaders().setInt("MY-ID", 9);
+ msg->insertCustomProperty("GROUP-ID", "a");
+ msg->insertCustomProperty("MY-ID", 9);
queue->deliver(msg);
// Queue = a-2, a-9
@@ -896,8 +892,8 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
BOOST_CHECK( !gotOne );
msg = create_message("e", "A");
- msg->getProperties<MessageProperties>()->getApplicationHeaders().setString("GROUP-ID", "b");
- msg->getProperties<MessageProperties>()->getApplicationHeaders().setInt("MY-ID", 10);
+ msg->insertCustomProperty("GROUP-ID", "b");
+ msg->insertCustomProperty("MY-ID", 10);
queue->deliver(msg);
// Queue = a-2, a-9, b-10
@@ -927,7 +923,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) {
for (int i = 0; i < 3; ++i) {
intrusive_ptr<Message> msg = create_message("e", "A");
// no "GROUP-ID" header
- msg->getProperties<MessageProperties>()->getApplicationHeaders().setInt("MY-ID", i);
+ msg->insertCustomProperty("MY-ID", i);
queue->deliver(msg);
}
diff --git a/qpid/cpp/src/tests/ReplicationTest.cpp b/qpid/cpp/src/tests/ReplicationTest.cpp
index 7310a3fe20..1219a6b59e 100644
--- a/qpid/cpp/src/tests/ReplicationTest.cpp
+++ b/qpid/cpp/src/tests/ReplicationTest.cpp
@@ -74,7 +74,7 @@ QPID_AUTO_TEST_CASE(testReplicationExchange)
{
qpid::broker::Broker::Options brokerOpts(getBrokerOpts(list_of<string>("qpidd")
("--replication-exchange-name=qpid.replication")));
- ProxySessionFixture f(brokerOpts);
+ SessionFixture f(brokerOpts);
std::string dataQ("queue-1");
diff --git a/qpid/cpp/src/tests/SessionState.cpp b/qpid/cpp/src/tests/SessionState.cpp
index 157cabfb63..3be9bb0cbc 100644
--- a/qpid/cpp/src/tests/SessionState.cpp
+++ b/qpid/cpp/src/tests/SessionState.cpp
@@ -43,7 +43,7 @@ using namespace qpid::framing;
// Apply f to [begin, end) and accumulate the result
template <class Iter, class T, class F>
T applyAccumulate(Iter begin, Iter end, T seed, const F& f) {
- return std::accumulate(begin, end, seed, bind(std::plus<T>(), _1, bind(f, _2)));
+ return std::accumulate(begin, end, seed, boost::bind(std::plus<T>(), _1, boost::bind(f, _2)));
}
// Create a frame with a one-char string.
@@ -105,8 +105,8 @@ size_t transferN(qpid::SessionState& s, string content) {
char last = content[content.size()-1];
content.resize(content.size()-1);
size += applyAccumulate(content.begin(), content.end(), 0,
- bind(&send, ref(s),
- bind(contentFrameChar, _1, false)));
+ boost::bind(&send, boost::ref(s),
+ boost::bind(contentFrameChar, _1, false)));
size += send(s, contentFrameChar(last, true));
}
return size;
@@ -115,7 +115,7 @@ size_t transferN(qpid::SessionState& s, string content) {
// Send multiple transfers with single-byte content.
size_t transfers(qpid::SessionState& s, string content) {
return applyAccumulate(content.begin(), content.end(), 0,
- bind(transfer1Char, ref(s), _1));
+ boost::bind(transfer1Char, boost::ref(s), _1));
}
size_t contentFrameSize(size_t n=1) { return AMQFrame(( AMQContentBody())).encodedSize() + n; }
diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h
deleted file mode 100644
index d195f11aa9..0000000000
--- a/qpid/cpp/src/tests/SocketProxy.h
+++ /dev/null
@@ -1,183 +0,0 @@
-#ifndef SOCKETPROXY_H
-#define SOCKETPROXY_H
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/IOHandle.h"
-#ifdef _WIN32
-# include "qpid/sys/windows/IoHandlePrivate.h"
- typedef SOCKET FdType;
-#else
-# include "qpid/sys/posix/PrivatePosix.h"
- typedef int FdType;
-#endif
-#include "qpid/sys/Socket.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/Mutex.h"
-#include "qpid/log/Statement.h"
-
-#include <boost/lexical_cast.hpp>
-
-namespace qpid {
-namespace tests {
-
-/**
- * A simple socket proxy that forwards to another socket.
- * Used between client & local broker to simulate network failures.
- */
-class SocketProxy : private qpid::sys::Runnable
-{
- // Need a Socket we can get the fd from
- class LowSocket : public qpid::sys::Socket {
- public:
-#ifdef _WIN32
- FdType getFd() { return toSocketHandle(*this); }
-#else
- FdType getFd() { return toFd(impl); }
-#endif
- };
-
- public:
- /** Connect to connectPort on host, start a forwarding thread.
- * Listen for connection on getPort().
- */
- SocketProxy(int connectPort, const std::string host="localhost")
- : closed(false), joined(true),
- port(listener.listen()), dropClient(), dropServer()
- {
- client.connect(host, boost::lexical_cast<std::string>(connectPort));
- joined = false;
- thread = qpid::sys::Thread(static_cast<qpid::sys::Runnable*>(this));
- }
-
- ~SocketProxy() { close(); if (!joined) thread.join(); }
-
- /** Simulate a network disconnect. */
- void close() {
- {
- qpid::sys::Mutex::ScopedLock l(lock);
- if (closed) { return; }
- closed=true;
- }
- if (thread && thread != qpid::sys::Thread::current()) {
- thread.join();
- joined = true;
- }
- client.close();
- }
-
- /** Simulate lost packets, drop data from client */
- void dropClientData(bool drop=true) { dropClient=drop; }
-
- /** Simulate lost packets, drop data from server */
- void dropServerData(bool drop=true) { dropServer=drop; }
-
- bool isClosed() const {
- qpid::sys::Mutex::ScopedLock l(lock);
- return closed;
- }
-
- uint16_t getPort() const { return port; }
-
- private:
- static void throwErrno(const std::string& msg) {
- throw qpid::Exception(msg+":"+qpid::sys::strError(errno));
- }
- static void throwIf(bool condition, const std::string& msg) {
- if (condition) throw qpid::Exception(msg);
- }
-
- void run() {
- std::auto_ptr<LowSocket> server;
- try {
- fd_set socks;
- FdType maxFd = listener.getFd();
- struct timeval tmo;
- for (;;) {
- FD_ZERO(&socks);
- FD_SET(maxFd, &socks);
- tmo.tv_sec = 0;
- tmo.tv_usec = 500 * 1000;
- if (select(maxFd+1, &socks, 0, 0, &tmo) == 0) {
- qpid::sys::Mutex::ScopedLock l(lock);
- throwIf(closed, "SocketProxy: Closed by close()");
- continue;
- }
- throwIf(!FD_ISSET(maxFd, &socks), "SocketProxy: Accept failed");
- break; // Accept ready... go to next step
- }
- server.reset(reinterpret_cast<LowSocket *>(listener.accept()));
- maxFd = server->getFd();
- if (client.getFd() > maxFd)
- maxFd = client.getFd();
- char buffer[1024];
- for (;;) {
- FD_ZERO(&socks);
- tmo.tv_sec = 0;
- tmo.tv_usec = 500 * 1000;
- FD_SET(client.getFd(), &socks);
- FD_SET(server->getFd(), &socks);
- if (select(maxFd+1, &socks, 0, 0, &tmo) == 0) {
- qpid::sys::Mutex::ScopedLock l(lock);
- throwIf(closed, "SocketProxy: Closed by close()");
- continue;
- }
- // Something is set; relay data as needed until something closes
- if (FD_ISSET(server->getFd(), &socks)) {
- int n = server->read(buffer, sizeof(buffer));
- throwIf(n <= 0, "SocketProxy: server disconnected");
- if (!dropServer) client.write(buffer, n);
- }
- if (FD_ISSET(client.getFd(), &socks)) {
- int n = client.read(buffer, sizeof(buffer));
- throwIf(n <= 0, "SocketProxy: client disconnected");
- if (!dropServer) server->write(buffer, n);
- }
- if (!FD_ISSET(client.getFd(), &socks) &&
- !FD_ISSET(server->getFd(), &socks))
- throwIf(true, "SocketProxy: No handle ready");
- }
- }
- catch (const std::exception& e) {
- QPID_LOG(debug, "SocketProxy::run exception: " << e.what());
- }
- try {
- if (server.get()) server->close();
- close();
- }
- catch (const std::exception& e) {
- QPID_LOG(debug, "SocketProxy::run exception in client/server close()" << e.what());
- }
- }
-
- mutable qpid::sys::Mutex lock;
- mutable bool closed;
- bool joined;
- LowSocket client, listener;
- uint16_t port;
- qpid::sys::Thread thread;
- bool dropClient, dropServer;
-};
-
-}} // namespace qpid::tests
-
-#endif
diff --git a/qpid/cpp/src/tests/TxPublishTest.cpp b/qpid/cpp/src/tests/TxPublishTest.cpp
index 210abf0a5b..152581e4ba 100644
--- a/qpid/cpp/src/tests/TxPublishTest.cpp
+++ b/qpid/cpp/src/tests/TxPublishTest.cpp
@@ -50,10 +50,9 @@ struct TxPublishTest
TxPublishTest() :
queue1(new Queue("queue1", false, &store, 0)),
queue2(new Queue("queue2", false, &store, 0)),
- msg(MessageUtils::createMessage("exchange", "routing_key", false, "id")),
+ msg(MessageUtils::createMessage("exchange", "routing_key", true)),
op(msg)
{
- msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
op.deliverTo(queue1);
op.deliverTo(queue2);
}
diff --git a/qpid/cpp/src/tests/Url.cpp b/qpid/cpp/src/tests/Url.cpp
index 234a62ee91..b30de682bc 100644
--- a/qpid/cpp/src/tests/Url.cpp
+++ b/qpid/cpp/src/tests/Url.cpp
@@ -60,6 +60,32 @@ QPID_AUTO_TEST_CASE(TestParseXyz) {
BOOST_CHECK_EQUAL(Url("xyz:host").str(), "amqp:xyz:host:5672");
}
+QPID_AUTO_TEST_CASE(TestParseTricky) {
+ BOOST_CHECK_EQUAL(Url("amqp").str(), "amqp:tcp:amqp:5672");
+ BOOST_CHECK_EQUAL(Url("amqp:tcp").str(), "amqp:tcp:tcp:5672");
+ // These are ambiguous parses and arguably not the best result
+ BOOST_CHECK_EQUAL(Url("amqp:876").str(), "amqp:tcp:876:5672");
+ BOOST_CHECK_EQUAL(Url("tcp:567").str(), "amqp:tcp:567:5672");
+}
+
+QPID_AUTO_TEST_CASE(TestParseIPv6) {
+ Url u1("[::]");
+ BOOST_CHECK_EQUAL(u1[0].host, "::");
+ BOOST_CHECK_EQUAL(u1[0].port, 5672);
+ Url u2("[::1]");
+ BOOST_CHECK_EQUAL(u2[0].host, "::1");
+ BOOST_CHECK_EQUAL(u2[0].port, 5672);
+ Url u3("[::127.0.0.1]");
+ BOOST_CHECK_EQUAL(u3[0].host, "::127.0.0.1");
+ BOOST_CHECK_EQUAL(u3[0].port, 5672);
+ Url u4("[2002::222:68ff:fe0b:e61a]");
+ BOOST_CHECK_EQUAL(u4[0].host, "2002::222:68ff:fe0b:e61a");
+ BOOST_CHECK_EQUAL(u4[0].port, 5672);
+ Url u5("[2002::222:68ff:fe0b:e61a]:123");
+ BOOST_CHECK_EQUAL(u5[0].host, "2002::222:68ff:fe0b:e61a");
+ BOOST_CHECK_EQUAL(u5[0].port, 123);
+}
+
QPID_AUTO_TEST_CASE(TestParseMultiAddress) {
Url::addProtocol("xyz");
URL_CHECK_STR("amqp:tcp:host:0,xyz:foo:123,tcp:foo:0,xyz:bar:1");
diff --git a/qpid/cpp/src/tests/XmlClientSessionTest.cpp b/qpid/cpp/src/tests/XmlClientSessionTest.cpp
index b3b7f12b53..b94c35ece0 100644
--- a/qpid/cpp/src/tests/XmlClientSessionTest.cpp
+++ b/qpid/cpp/src/tests/XmlClientSessionTest.cpp
@@ -90,7 +90,7 @@ struct SimpleListener : public MessageListener
}
};
-struct ClientSessionFixture : public ProxySessionFixture
+struct ClientSessionFixture : public SessionFixture
{
void declareSubscribe(const string& q="odd_blue",
const string& dest="xml")
diff --git a/qpid/cpp/src/tests/allhosts b/qpid/cpp/src/tests/allhosts
index e43571aed4..4b4b943156 100755
--- a/qpid/cpp/src/tests/allhosts
+++ b/qpid/cpp/src/tests/allhosts
@@ -29,11 +29,12 @@ Options:
-s SECONDS sleep between starting commands.
-q don't print banner lines for each host.
-o SUFFIX log output of each command to <host>.SUFFIX
+ -X passed to ssh - forward X connection.
"
exit 1
}
-while getopts "tl:bs:dqo:" opt; do
+while getopts "tl:bs:dqo:X" opt; do
case $opt in
l) SSHOPTS="-l$OPTARG $SSHOPTS" ;;
t) SSHOPTS="-t $SSHOPTS" ;;
@@ -42,6 +43,7 @@ while getopts "tl:bs:dqo:" opt; do
s) SLEEP="sleep $OPTARG" ;;
q) NOBANNER=1 ;;
o) SUFFIX=$OPTARG ;;
+ X) SSHOPTS="-X $SSHOPTS" ;;
*) usage;;
esac
done
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 4a98c638a2..16d7fb0b78 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -251,7 +251,7 @@ class Broker(Popen):
def get_log(self):
return os.path.abspath(self.log)
- def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None):
+ def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None, show_cmd=False):
"""Start a broker daemon. name determines the data-dir and log
file names."""
@@ -280,6 +280,7 @@ class Broker(Popen):
cmd += ["--log-enable=%s" % log_level]
self.datadir = self.name
cmd += ["--data-dir", self.datadir]
+ if show_cmd: print cmd
Popen.__init__(self, cmd, expect, stdout=PIPE)
test.cleanup_stop(self)
self._host = "127.0.0.1"
@@ -400,7 +401,7 @@ class Cluster:
_cluster_count = 0
- def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
+ def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False):
self.test = test
self._brokers=[]
self.name = "cluster%d" % Cluster._cluster_count
@@ -411,16 +412,19 @@ class Cluster:
self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"]
assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in"
self.args += [ "--load-module", BrokerTest.cluster_lib ]
- self.start_n(count, expect=expect, wait=wait)
+ self.start_n(count, expect=expect, wait=wait, show_cmd=show_cmd)
- def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0):
+ def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0, show_cmd=False):
"""Add a broker to the cluster. Returns the index of the new broker."""
if not name: name="%s-%d" % (self.name, len(self._brokers))
- self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port))
+ self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port, show_cmd=show_cmd))
return self._brokers[-1]
- def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[]):
- for i in range(count): self.start(expect=expect, wait=wait, args=args)
+ def ready(self):
+ for b in self: b.ready()
+
+ def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[], show_cmd=False):
+ for i in range(count): self.start(expect=expect, wait=wait, args=args, show_cmd=show_cmd)
# Behave like a list of brokers.
def __len__(self): return len(self._brokers)
@@ -477,31 +481,30 @@ class BrokerTest(TestCase):
self.cleanup_stop(p)
return p
- def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, log_level=None):
+ def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, log_level=None, show_cmd=False):
"""Create and return a broker ready for use"""
- b = Broker(self, args=args, name=name, expect=expect, port=port, log_level=log_level)
+ b = Broker(self, args=args, name=name, expect=expect, port=port, log_level=log_level, show_cmd=show_cmd)
if (wait):
try: b.ready()
except Exception, e:
raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e))
return b
- def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
+ def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False):
"""Create and return a cluster ready for use"""
- cluster = Cluster(self, count, args, expect=expect, wait=wait)
+ cluster = Cluster(self, count, args, expect=expect, wait=wait, show_cmd=show_cmd)
return cluster
def browse(self, session, queue, timeout=0):
- """Assert that the contents of messages on queue (as retrieved
- using session and timeout) exactly match the strings in
- expect_contents"""
+ """Return a list with the contents of each message on queue."""
r = session.receiver("%s;{mode:browse}"%(queue))
+ r.capacity = 100
try:
contents = []
try:
while True: contents.append(r.fetch(timeout=timeout).content)
except messaging.Empty: pass
- finally: pass #FIXME aconway 2011-04-14: r.close()
+ finally: r.close()
return contents
def assert_browse(self, session, queue, expect_contents, timeout=0):
diff --git a/qpid/cpp/src/tests/cluster_python_tests_failing.txt b/qpid/cpp/src/tests/cluster_python_tests_failing.txt
index 7ba8089946..f8639d7b59 100644
--- a/qpid/cpp/src/tests/cluster_python_tests_failing.txt
+++ b/qpid/cpp/src/tests/cluster_python_tests_failing.txt
@@ -1,32 +1,4 @@
qpid_tests.broker_0_10.management.ManagementTest.test_purge_queue
qpid_tests.broker_0_10.management.ManagementTest.test_connection_close
-qpid_tests.broker_0_10.dtx.DtxTests.test_bad_resume
-qpid_tests.broker_0_10.dtx.DtxTests.test_commit_unknown
-qpid_tests.broker_0_10.dtx.DtxTests.test_end
-qpid_tests.broker_0_10.dtx.DtxTests.test_end_suspend_and_fail
-qpid_tests.broker_0_10.dtx.DtxTests.test_end_unknown_xid
-qpid_tests.broker_0_10.dtx.DtxTests.test_forget_xid_on_completion
-qpid_tests.broker_0_10.dtx.DtxTests.test_get_timeout
-qpid_tests.broker_0_10.dtx.DtxTests.test_get_timeout_unknown
-qpid_tests.broker_0_10.dtx.DtxTests.test_implicit_end
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_commit_not_ended
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_commit_one_phase_false
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_commit_one_phase_true
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_prepare_not_ended
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_rollback_not_ended
-qpid_tests.broker_0_10.dtx.DtxTests.test_prepare_unknown
-qpid_tests.broker_0_10.dtx.DtxTests.test_recover
-qpid_tests.broker_0_10.dtx.DtxTests.test_rollback_unknown
-qpid_tests.broker_0_10.dtx.DtxTests.test_select_required
-qpid_tests.broker_0_10.dtx.DtxTests.test_set_timeout
-qpid_tests.broker_0_10.dtx.DtxTests.test_simple_commit
-qpid_tests.broker_0_10.dtx.DtxTests.test_simple_prepare_commit
-qpid_tests.broker_0_10.dtx.DtxTests.test_simple_prepare_rollback
-qpid_tests.broker_0_10.dtx.DtxTests.test_simple_rollback
-qpid_tests.broker_0_10.dtx.DtxTests.test_start_already_known
-qpid_tests.broker_0_10.dtx.DtxTests.test_start_join
-qpid_tests.broker_0_10.dtx.DtxTests.test_start_join_and_resume
-qpid_tests.broker_0_10.dtx.DtxTests.test_suspend_resume
-qpid_tests.broker_0_10.dtx.DtxTests.test_suspend_start_end_resume
qpid_tests.broker_0_10.message.MessageTests.test_ttl
qpid_tests.broker_0_10.management.ManagementTest.test_broker_connectivity_oldAPI
diff --git a/qpid/cpp/src/tests/cluster_test_logs.py b/qpid/cpp/src/tests/cluster_test_logs.py
index a0ce8fb9c3..3c7e8e8020 100755
--- a/qpid/cpp/src/tests/cluster_test_logs.py
+++ b/qpid/cpp/src/tests/cluster_test_logs.py
@@ -53,6 +53,7 @@ def filter_log(log):
'stall for update|unstall, ignore update|cancelled offer .* unstall',
'caught up',
'active for links|Passivating links|Activating links',
+ 'info Connecting: .*', # UpdateClient connection
'info Connection.* connected to', # UpdateClient connection
'warning Connection \\[[-0-9.: ]+\\] closed', # UpdateClient connection
'warning Broker closed connection: 200, OK',
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 13f4f716da..879dcdaeaf 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -164,6 +164,121 @@ acl allow all all
self.fail("Expected exception")
except messaging.exceptions.NotFound: pass
+ def test_sasl_join(self):
+ """Verify SASL authentication between brokers when joining a cluster."""
+ sasl_config=os.path.join(self.rootdir, "sasl_config")
+
+ # Valid user/password, ensure queue is created.
+ c = cluster[0].connect(username="zig", password="zig")
+ c.session().sender("ziggy;{create:always}")
+ c.close()
+ c = cluster[1].connect(username="zig", password="zig")
+ c.session().receiver("ziggy;{assert:always}")
+ c.close()
+ for b in cluster: b.ready() # Make sure all brokers still running.
+
+ # Valid user, bad password
+ try:
+ cluster[0].connect(username="zig", password="foo").close()
+ self.fail("Expected exception")
+ except messaging.exceptions.ConnectionError: pass
+ for b in cluster: b.ready() # Make sure all brokers still running.
+
+ # Bad user ID
+ try:
+ cluster[0].connect(username="foo", password="bar").close()
+ self.fail("Expected exception")
+ except messaging.exceptions.ConnectionError: pass
+ for b in cluster: b.ready() # Make sure all brokers still running.
+
+ # Action disallowed by ACL
+ c = cluster[0].connect(username="zag", password="zag")
+ try:
+ s = c.session()
+ s.sender("zaggy;{create:always}")
+ s.close()
+ self.fail("Expected exception")
+ except messaging.exceptions.UnauthorizedAccess: pass
+ # make sure the queue was not created at the other node.
+ c = cluster[0].connect(username="zag", password="zag")
+ try:
+ s = c.session()
+ s.sender("zaggy;{assert:always}")
+ s.close()
+ self.fail("Expected exception")
+ except messaging.exceptions.NotFound: pass
+
+ def test_sasl_join(self):
+ """Verify SASL authentication between brokers when joining a cluster."""
+ # Valid user/password, ensure queue is created.
+ c = cluster[0].connect(username="zig", password="zig")
+ c.session().sender("ziggy;{create:always}")
+ c.close()
+ c = cluster[1].connect(username="zig", password="zig")
+ c.session().receiver("ziggy;{assert:always}")
+ c.close()
+ for b in cluster: b.ready() # Make sure all brokers still running.
+
+ # Valid user, bad password
+ try:
+ cluster[0].connect(username="zig", password="foo").close()
+ self.fail("Expected exception")
+ except messaging.exceptions.ConnectionError: pass
+ for b in cluster: b.ready() # Make sure all brokers still running.
+
+ # Bad user ID
+ try:
+ cluster[0].connect(username="foo", password="bar").close()
+ self.fail("Expected exception")
+ except messaging.exceptions.ConnectionError: pass
+ for b in cluster: b.ready() # Make sure all brokers still running.
+
+ # Action disallowed by ACL
+ c = cluster[0].connect(username="zag", password="zag")
+ try:
+ s = c.session()
+ s.sender("zaggy;{create:always}")
+ s.close()
+ self.fail("Expected exception")
+ except messaging.exceptions.UnauthorizedAccess: pass
+ # make sure the queue was not created at the other node.
+ c = cluster[0].connect(username="zag", password="zag")
+ try:
+ s = c.session()
+ s.sender("zaggy;{assert:always}")
+ s.close()
+ self.fail("Expected exception")
+ except messaging.exceptions.NotFound: pass
+
+ def test_sasl_join(self):
+ """Verify SASL authentication between brokers when joining a cluster."""
+ sasl_config=os.path.join(self.rootdir, "sasl_config")
+ # Test with a valid username/password
+ cluster = self.cluster(1, args=["--auth", "yes",
+ "--sasl-config", sasl_config,
+ "--load-module", os.getenv("ACL_LIB"),
+ "--cluster-username=zig",
+ "--cluster-password=zig",
+ "--cluster-mechanism=PLAIN"
+ ])
+ cluster.start()
+ cluster.ready()
+ c = cluster[1].connect(username="zag", password="zag")
+
+ # Test with an invalid username/password
+ cluster = self.cluster(1, args=["--auth", "yes",
+ "--sasl-config", sasl_config,
+ "--load-module", os.getenv("ACL_LIB"),
+ "--cluster-username=x",
+ "--cluster-password=y",
+ "--cluster-mechanism=PLAIN"
+ ])
+ try:
+ cluster.start(expect=EXPECT_EXIT_OK)
+ cluster[1].ready()
+ self.fail("Expected exception")
+ except: pass
+
def test_user_id_update(self):
"""Ensure that user-id of an open session is updated to new cluster members"""
sasl_config=os.path.join(self.rootdir, "sasl_config")
@@ -713,6 +828,272 @@ acl allow all all
cluster.start()
fetch(cluster[2])
+# Some utility code for transaction tests
+XA_RBROLLBACK = 1
+XA_RBTIMEOUT = 2
+XA_OK = 0
+dtx_branch_counter = 0
+
+class DtxStatusException(Exception):
+ def __init__(self, expect, actual):
+ self.expect = expect
+ self.actual = actual
+
+ def str(self):
+ return "DtxStatusException(expect=%s, actual=%s)"%(self.expect, self.actual)
+
+class DtxTestFixture:
+ """Bundle together some common requirements for dtx tests."""
+ def __init__(self, test, broker, name, exclusive=False):
+ self.test = test
+ self.broker = broker
+ self.name = name
+ # Use old API. DTX is not supported in messaging API.
+ self.connection = broker.connect_old()
+ self.session = self.connection.session(name, 1) # 1 second timeout
+ self.queue = self.session.queue_declare(name, exclusive=exclusive)
+ self.session.dtx_select()
+ self.consumer = None
+
+ def xid(self, id=None):
+ if id is None: id = self.name
+ return self.session.xid(format=0, global_id=id)
+
+ def check_status(self, expect, actual):
+ if expect != actual: raise DtxStatusException(expect, actual)
+
+ def start(self, id=None, resume=False):
+ self.check_status(XA_OK, self.session.dtx_start(xid=self.xid(id), resume=resume).status)
+
+ def end(self, id=None, suspend=False):
+ self.check_status(XA_OK, self.session.dtx_end(xid=self.xid(id), suspend=suspend).status)
+
+ def prepare(self, id=None):
+ self.check_status(XA_OK, self.session.dtx_prepare(xid=self.xid(id)).status)
+
+ def commit(self, id=None, one_phase=True):
+ self.check_status(
+ XA_OK, self.session.dtx_commit(xid=self.xid(id), one_phase=one_phase).status)
+
+ def rollback(self, id=None):
+ self.check_status(XA_OK, self.session.dtx_rollback(xid=self.xid(id)).status)
+
+ def set_timeout(self, timeout, id=None):
+ self.session.dtx_set_timeout(xid=self.xid(id),timeout=timeout)
+
+ def send(self, messages):
+ for m in messages:
+ dp=self.session.delivery_properties(routing_key=self.name)
+ mp=self.session.message_properties()
+ self.session.message_transfer(message=qpid.datatypes.Message(dp, mp, m))
+
+ def accept(self):
+ """Accept 1 message from queue"""
+ consumer_tag="%s-consumer"%(self.name)
+ self.session.message_subscribe(queue=self.name, destination=consumer_tag)
+ self.session.message_flow(unit = self.session.credit_unit.message, value = 1, destination = consumer_tag)
+ self.session.message_flow(unit = self.session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag)
+ msg = self.session.incoming(consumer_tag).get(timeout=1)
+ self.session.message_cancel(destination=consumer_tag)
+ self.session.message_accept(qpid.datatypes.RangedSet(msg.id))
+ return msg
+
+
+ def verify(self, sessions, messages):
+ for s in sessions:
+ self.test.assert_browse(s, self.name, messages)
+
+class DtxTests(BrokerTest):
+
+ def test_dtx_update(self):
+ """Verify that DTX transaction state is updated to a new broker.
+ Start a collection of transactions, then add a new cluster member,
+ then verify they commit/rollback correctly on the new broker."""
+
+ # Note: multiple test have been bundled into one to avoid the need to start/stop
+ # multiple brokers per test.
+
+ cluster=self.cluster(1)
+ sessions = [cluster[0].connect().session()] # For verify
+
+ # Transaction that will be open when new member joins, then committed.
+ t1 = DtxTestFixture(self, cluster[0], "t1")
+ t1.start()
+ t1.send(["1", "2"])
+ t1.verify(sessions, []) # Not visible outside of transaction
+
+ # Transaction that will be open when new member joins, then rolled back.
+ t2 = DtxTestFixture(self, cluster[0], "t2")
+ t2.start()
+ t2.send(["1", "2"])
+
+ # Transaction that will be prepared when new member joins, then committed.
+ t3 = DtxTestFixture(self, cluster[0], "t3")
+ t3.start()
+ t3.send(["1", "2"])
+ t3.end()
+ t3.prepare()
+ t1.verify(sessions, []) # Not visible outside of transaction
+
+ # Transaction that will be prepared when new member joins, then rolled back.
+ t4 = DtxTestFixture(self, cluster[0], "t4")
+ t4.start()
+ t4.send(["1", "2"])
+ t4.end()
+ t4.prepare()
+
+ # Transaction using an exclusive queue
+ t5 = DtxTestFixture(self, cluster[0], "t5", exclusive=True)
+ t5.start()
+ t5.send(["1", "2"])
+
+ # Accept messages in a transaction before/after join then commit
+ t6 = DtxTestFixture(self, cluster[0], "t6")
+ t6.send(["a","b","c"])
+ t6.start()
+ self.assertEqual(t6.accept().body, "a");
+
+ # Accept messages in a transaction before/after join then roll back
+ t7 = DtxTestFixture(self, cluster[0], "t7")
+ t7.send(["a","b","c"])
+ t7.start()
+ self.assertEqual(t7.accept().body, "a");
+
+ # Ended, suspended transactions across join.
+ t8 = DtxTestFixture(self, cluster[0], "t8")
+ t8.start(id="1")
+ t8.send(["x"])
+ t8.end(id="1", suspend=True)
+ t8.start(id="2")
+ t8.send(["y"])
+ t8.end(id="2")
+ t8.start()
+ t8.send("z")
+
+
+ # Start new cluster member
+ cluster.start()
+ sessions.append(cluster[1].connect().session())
+
+ # Commit t1
+ t1.send(["3","4"])
+ t1.verify(sessions, [])
+ t1.end()
+ t1.commit(one_phase=True)
+ t1.verify(sessions, ["1","2","3","4"])
+
+ # Rollback t2
+ t2.send(["3","4"])
+ t2.end()
+ t2.rollback()
+ t2.verify(sessions, [])
+
+ # Commit t3
+ t3.commit(one_phase=False)
+ t3.verify(sessions, ["1","2"])
+
+ # Rollback t4
+ t4.rollback()
+ t4.verify(sessions, [])
+
+ # Commit t5
+ t5.send(["3","4"])
+ t5.verify(sessions, [])
+ t5.end()
+ t5.commit(one_phase=True)
+ t5.verify(sessions, ["1","2","3","4"])
+
+ # Commit t6
+ self.assertEqual(t6.accept().body, "b");
+ t6.verify(sessions, ["c"])
+ t6.end()
+ t6.commit(one_phase=True)
+ t6.session.close() # Make sure they're not requeued by the session.
+ t6.verify(sessions, ["c"])
+
+ # Rollback t7
+ self.assertEqual(t7.accept().body, "b");
+ t7.end()
+ t7.rollback()
+ t7.verify(sessions, ["a", "b", "c"])
+
+ # Resume t8
+ t8.end()
+ t8.commit(one_phase=True)
+ t8.start("1", resume=True)
+ t8.end("1")
+ t8.commit("1", one_phase=True)
+ t8.commit("2", one_phase=True)
+ t8.verify(sessions, ["z", "x","y"])
+
+
+ def test_dtx_failover_rollback(self):
+ """Kill a broker during a transaction, verify we roll back correctly"""
+ cluster=self.cluster(1, expect=EXPECT_EXIT_FAIL)
+ cluster.start(expect=EXPECT_RUNNING)
+
+ # Test unprepared at crash
+ t1 = DtxTestFixture(self, cluster[0], "t1")
+ t1.send(["a"]) # Not in transaction
+ t1.start()
+ t1.send(["b"]) # In transaction
+
+ # Test prepared at crash
+ t2 = DtxTestFixture(self, cluster[0], "t2")
+ t2.send(["a"]) # Not in transaction
+ t2.start()
+ t2.send(["b"]) # In transaction
+ t2.end()
+ t2.prepare()
+
+ # Crash the broker
+ cluster[0].kill()
+
+ # Transactional changes should not appear
+ s = cluster[1].connect().session();
+ self.assert_browse(s, "t1", ["a"])
+ self.assert_browse(s, "t2", ["a"])
+
+ def test_dtx_timeout(self):
+ """Verify that dtx timeout works"""
+ cluster = self.cluster(1)
+ t1 = DtxTestFixture(self, cluster[0], "t1")
+ t1.start()
+ t1.set_timeout(1)
+ time.sleep(1.1)
+ try:
+ t1.end()
+ self.fail("Expected rollback timeout.")
+ except DtxStatusException, e:
+ self.assertEqual(e.actual, XA_RBTIMEOUT)
+
+class TxTests(BrokerTest):
+
+ def test_tx_update(self):
+ """Verify that transaction state is updated to a new broker"""
+
+ def make_message(session, body=None, key=None, id=None):
+ dp=session.delivery_properties(routing_key=key)
+ mp=session.message_properties(correlation_id=id)
+ return qpid.datatypes.Message(dp, mp, body)
+
+ cluster=self.cluster(1)
+ # Use old API. TX is not supported in messaging API.
+ c = cluster[0].connect_old()
+ s = c.session("tx-session", 1)
+ s.queue_declare(queue="q")
+ # Start transaction
+ s.tx_select()
+ s.message_transfer(message=make_message(s, "1", "q"))
+ # Start new member mid-transaction
+ cluster.start()
+ # Do more work
+ s.message_transfer(message=make_message(s, "2", "q"))
+ # Commit the transaction and verify the results.
+ s.tx_commit()
+ for b in cluster: self.assert_browse(b.connect().session(), "q", ["1","2"])
+
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):
@@ -1001,6 +1382,8 @@ class LongTests(BrokerTest):
logger = logging.getLogger()
log_level = logger.getEffectiveLevel()
logger.setLevel(logging.ERROR)
+ sender = None
+ receiver = None
try:
# Start sender and receiver threads
receiver = Receiver(cluster[0], "q;{create:always}")
@@ -1031,8 +1414,8 @@ class LongTests(BrokerTest):
finally:
# Detach to avoid slow reconnect attempts during shut-down if test fails.
- sender.connection.detach()
- receiver.connection.detach()
+ if sender: sender.connection.detach()
+ if receiver: receiver.connection.detach()
logger.setLevel(log_level)
def test_msg_group_failover(self):
diff --git a/qpid/cpp/src/tests/exception_test.cpp b/qpid/cpp/src/tests/exception_test.cpp
index 3536ffddbe..3e844b4e58 100644
--- a/qpid/cpp/src/tests/exception_test.cpp
+++ b/qpid/cpp/src/tests/exception_test.cpp
@@ -92,32 +92,30 @@ QPID_AUTO_TEST_CASE(TestSessionBusy) {
}
QPID_AUTO_TEST_CASE(DisconnectedPop) {
- ProxySessionFixture fix;
- ProxyConnection c(fix.broker->getPort(Broker::TCP_TRANSPORT));
+ SessionFixture fix;
fix.session.queueDeclare(arg::queue="q");
fix.subs.subscribe(fix.lq, "q");
Catcher<TransportFailure> pop(bind(&LocalQueue::pop, &fix.lq, sys::TIME_SEC));
- fix.connection.proxy.close();
+ fix.shutdownBroker();
BOOST_CHECK(pop.join());
}
QPID_AUTO_TEST_CASE(DisconnectedListen) {
- ProxySessionFixture fix;
+ SessionFixture fix;
struct NullListener : public MessageListener {
void received(Message&) { BOOST_FAIL("Unexpected message"); }
} l;
- ProxyConnection c(fix.broker->getPort(Broker::TCP_TRANSPORT));
fix.session.queueDeclare(arg::queue="q");
fix.subs.subscribe(l, "q");
Catcher<TransportFailure> runner(bind(&SubscriptionManager::run, boost::ref(fix.subs)));
- fix.connection.proxy.close();
- runner.join();
+ fix.shutdownBroker();
+ runner.join();
BOOST_CHECK_THROW(fix.session.queueDeclare(arg::queue="x"), TransportFailure);
}
QPID_AUTO_TEST_CASE(NoSuchQueueTest) {
- ProxySessionFixture fix;
+ SessionFixture fix;
ScopedSuppressLogging sl; // Suppress messages for expected errors.
BOOST_CHECK_THROW(fix.subs.subscribe(fix.lq, "no such queue"), NotFoundException);
}
diff --git a/qpid/cpp/src/tests/federated_topic_test b/qpid/cpp/src/tests/federated_topic_test
index b1063c7e8c..2e55ddcfaa 100755
--- a/qpid/cpp/src/tests/federated_topic_test
+++ b/qpid/cpp/src/tests/federated_topic_test
@@ -42,13 +42,12 @@ while getopts "s:m:b:" opt ; do
esac
done
-MY_DIR=$(dirname $(which $0))
source ./test_env.sh
trap stop_brokers EXIT
start_broker() {
- ${MY_DIR}/../qpidd --daemon --port 0 --no-module-dir --no-data-dir --auth no > qpidd.port
+ $QPIDD_EXEC --daemon --port 0 --no-module-dir --no-data-dir --auth no > qpidd.port
}
start_brokers() {
@@ -76,39 +75,39 @@ subscribe() {
echo Subscriber $1 connecting on $MY_PORT
LOG="subscriber_$1.log"
- ${MY_DIR}/topic_listener -p $MY_PORT > $LOG 2>&1 && rm -f $LOG
+ ./qpid-topic-listener -p $MY_PORT > $LOG 2>&1 && rm -f $LOG
}
publish() {
- ${MY_DIR}/topic_publisher --messages $MESSAGES --batches $BATCHES --subscribers $SUBSCRIBERS -p $PORT_A
+ ./qpid-topic-publisher --messages $MESSAGES --batches $BATCHES --subscribers $SUBSCRIBERS -p $PORT_A
}
setup_routes() {
- BROKER_A="localhost:$PORT_A"
- BROKER_B="localhost:$PORT_B"
- BROKER_C="localhost:$PORT_C"
+ BROKER_A="daffodil:$PORT_A"
+ BROKER_B="daffodil:$PORT_B"
+ BROKER_C="daffodil:$PORT_C"
if (($VERBOSE)); then
echo "Establishing routes for topic..."
fi
- $PYTHON_COMMANDS/qpid-route route add $BROKER_B $BROKER_A amq.topic topic_control B B
- $PYTHON_COMMANDS/qpid-route route add $BROKER_C $BROKER_B amq.topic topic_control C C
+ $QPID_ROUTE_EXEC route add $BROKER_B $BROKER_A amq.topic topic_control B B
+ $QPID_ROUTE_EXEC route add $BROKER_C $BROKER_B amq.topic topic_control C C
if (($VERBOSE)); then
echo "linked A->B->C"
fi
- $PYTHON_COMMANDS/qpid-route route add $BROKER_B $BROKER_C amq.topic topic_control B B
- $PYTHON_COMMANDS/qpid-route route add $BROKER_A $BROKER_B amq.topic topic_control A A
+ $QPID_ROUTE_EXEC route add $BROKER_B $BROKER_C amq.topic topic_control B B
+ $QPID_ROUTE_EXEC route add $BROKER_A $BROKER_B amq.topic topic_control A A
if (($VERBOSE)); then
echo "linked C->B->A"
echo "Establishing routes for response queue..."
fi
- $PYTHON_COMMANDS/qpid-route route add $BROKER_B $BROKER_C amq.direct response B B
- $PYTHON_COMMANDS/qpid-route route add $BROKER_A $BROKER_B amq.direct response A A
+ $QPID_ROUTE_EXEC route add $BROKER_B $BROKER_C amq.direct response B B
+ $QPID_ROUTE_EXEC route add $BROKER_A $BROKER_B amq.direct response A A
if (($VERBOSE)); then
echo "linked C->B->A"
for b in $BROKER_A $BROKER_B $BROKER_C; do
echo "Routes for $b"
- $PYTHON_COMMANDS/qpid-route route list $b
+ $QPID_ROUTE_EXEC route list $b
done
fi
}
diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py
index 49bdecdd95..7d613b98ce 100755
--- a/qpid/cpp/src/tests/federation.py
+++ b/qpid/cpp/src/tests/federation.py
@@ -111,18 +111,18 @@ class FederationTests(TestBase010):
broker = qmf.getObjects(_class="broker")[0]
result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
link = qmf.getObjects(_class="link")[0]
result = link.bridge(False, "amq.direct", "amq.direct", "my-key", "", "", False, False, False, 0)
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
bridge = qmf.getObjects(_class="bridge")[0]
result = bridge.close()
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
result = link.close()
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
self.verify_cleanup()
@@ -133,11 +133,11 @@ class FederationTests(TestBase010):
qmf = self.qmf
broker = qmf.getObjects(_class="broker")[0]
result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
link = qmf.getObjects(_class="link")[0]
result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False, 0)
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -165,9 +165,9 @@ class FederationTests(TestBase010):
except Empty: None
result = bridge.close()
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
result = link.close()
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
self.verify_cleanup()
@@ -178,11 +178,11 @@ class FederationTests(TestBase010):
qmf = self.qmf
broker = qmf.getObjects(_class="broker")[0]
result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
link = qmf.getObjects(_class="link")[0]
result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, True, False, 0)
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -209,9 +209,9 @@ class FederationTests(TestBase010):
except Empty: None
result = bridge.close()
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
result = link.close()
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
self.verify_cleanup()
@@ -236,11 +236,11 @@ class FederationTests(TestBase010):
qmf = self.qmf
broker = qmf.getObjects(_class="broker")[0]
result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
link = qmf.getObjects(_class="link")[0]
result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1)
- self.assertEqual(result.status, 0)
+ self.assertEqual(result.status, 0, result)
bridge = qmf.getObjects(_class="bridge")[0]
sleep(3)
@@ -262,6 +262,63 @@ class FederationTests(TestBase010):
except Empty: None
result = bridge.close()
+ self.assertEqual(result.status, 0, result)
+ result = link.close()
+ self.assertEqual(result.status, 0, result)
+
+ self.verify_cleanup()
+
+ def test_pull_from_queue_recovery(self):
+ session = self.session
+
+ #setup queue on remote broker and add some messages
+ r_conn = self.connect(host=self.remote_host(), port=self.remote_port())
+ r_session = r_conn.session("test_pull_from_queue_recovery")
+ r_session.queue_declare(queue="my-bridge-queue", auto_delete=True)
+ for i in range(1, 6):
+ dp = r_session.delivery_properties(routing_key="my-bridge-queue")
+ r_session.message_transfer(message=Message(dp, "Message %d" % i))
+
+ #setup queue to receive messages from local broker
+ session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="fed1", exchange="amq.fanout")
+ self.subscribe(queue="fed1", destination="f1")
+ queue = session.incoming("f1")
+
+ self.startQmf()
+ qmf = self.qmf
+ broker = qmf.getObjects(_class="broker")[0]
+ result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0, result)
+
+ link = qmf.getObjects(_class="link")[0]
+ result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1)
+ self.assertEqual(result.status, 0, result)
+
+ bridge = qmf.getObjects(_class="bridge")[0]
+ sleep(5)
+
+ #recreate the remote bridge queue to invalidate the bridge session
+ r_session.queue_delete (queue="my-bridge-queue", if_empty=False, if_unused=False)
+ r_session.queue_declare(queue="my-bridge-queue", auto_delete=True)
+
+ #add some more messages (i.e. after bridge was created)
+ for i in range(6, 11):
+ dp = r_session.delivery_properties(routing_key="my-bridge-queue")
+ r_session.message_transfer(message=Message(dp, "Message %d" % i))
+
+ for i in range(1, 11):
+ try:
+ msg = queue.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+ except Empty:
+ self.fail("Failed to find expected message containing 'Message %d'" % i)
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in queue: " + extra.body)
+ except Empty: None
+
+ result = bridge.close()
self.assertEqual(result.status, 0)
result = link.close()
self.assertEqual(result.status, 0)
diff --git a/qpid/cpp/src/tests/federation_sys.py b/qpid/cpp/src/tests/federation_sys.py
new file mode 100755
index 0000000000..11590f684e
--- /dev/null
+++ b/qpid/cpp/src/tests/federation_sys.py
@@ -0,0 +1,1900 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from inspect import stack
+from qpid import messaging
+from qpid.messaging import Message
+from qpid.messaging.exceptions import Empty
+from qpid.testlib import TestBase010
+from random import randint
+from sys import stdout
+from time import sleep
+
+
+class Enum(object):
+ def __init__(self, **entries):
+ self.__dict__.update(entries)
+ def __repr__(self):
+ args = ['%s=%s' % (k, repr(v)) for (k,v) in vars(self).items()]
+ return 'Enum(%s)' % ', '.join(args)
+
+
+class QmfTestBase010(TestBase010):
+
+ _brokers = []
+ _links = []
+ _bridges = []
+ _alt_exch_ops = Enum(none=0, create=1, delete=2)
+
+ class _Broker(object):
+ """
+ This broker proxy object holds the Qmf proxy to a broker of known address as well as the QMF broker
+ object, connection and sessions to the broker.
+ """
+ def __init__(self, url):
+ self.url = url # format: "host:port"
+ url_parts = url.split(':')
+ self.host = url_parts[0]
+ self.port = int(url_parts[1])
+ self.qmf_broker = None
+ self.connection = messaging.Connection.establish(self.url)
+ self.sessions = []
+ def __str__(self):
+ return "_Broker %s:%s (%d open sessions)" % (self.host, self.port, len(self.sessions))
+ def destroy(self, qmf = None):
+ if qmf is not None:
+ qmf.delBroker(self.qmf_broker.getBroker())
+ for session in self.sessions:
+ try: # Session may have been closed by broker error
+ session.close()
+ except Exception, e: print "WARNING: %s: Unable to close session %s (%s): %s %s" % (self, session, hex(id(session)), type(e), e)
+ try: # Connection may have been closed by broker error
+ self.connection.close()
+ except Exception, e: print "WARNING: %s: Unable to close connection %s (%s): %s %s" % (self, self.connection, hex(id(self.connection)), type(e), e)
+ def session(self, name, transactional_flag = False):
+ session = self.connection.session(name, transactional_flag)
+ self.sessions.append(session)
+ return session
+
+ def setUp(self):
+ """
+ Called one before each test starts
+ """
+ TestBase010.setUp(self)
+ self.startQmf();
+
+ def tearDown(self):
+ """
+ Called once after each test competes. Close all Qmf objects (bridges, links and brokers)
+ """
+ while len(self._bridges):
+ self._bridges.pop().close()
+ while len(self._links):
+ self._links.pop().close()
+ while len(self._brokers):
+ b = self._brokers.pop()
+ if len(self._brokers) <= 1:
+ b.destroy(None)
+ else:
+ b.destroy(self.qmf)
+ TestBase010.tearDown(self)
+ self.qmf.close()
+
+ #--- General test utility functions
+
+ def _get_name(self):
+ """
+ Return the name of method which called this method stripped of "test_" prefix. Used for naming
+ queues and exchanges on a per-test basis.
+ """
+ return stack()[1][3][5:]
+
+ def _get_broker_port(self, key):
+ """
+ Get the port of a broker defined in the environment using -D<key>=portno
+ """
+ return int(self.defines[key])
+
+ def _get_cluster_ports(self, key):
+ """
+ Get the cluster ports from the parameters of the test which place it in the environment using
+ -D<key>="port0 port1 ... portN" (space-separated)
+ """
+ ports = []
+ ports_str = self.defines[key]
+ if ports_str:
+ for p in ports_str.split():
+ ports.append(int(p))
+ return ports
+
+ def _get_send_address(self, exch_name, queue_name):
+ """
+ Get an address to which to send messages based on the exchange name and queue name, but taking into account
+ that the exchange name may be "" (the default exchange), in whcih case the format changes slightly.
+ """
+ if len(exch_name) == 0: # Default exchange
+ return queue_name
+ return "%s/%s" % (exch_name, queue_name)
+
+ def _get_broker(self, cluster_flag, broker_port_key, cluster_ports_key):
+ """
+ Read the port numbers for pre-started brokers from the environment using keys, then find or create and return
+ the Qmf broker proxy for the appropriate broker
+ """
+ if cluster_flag:
+ port = self._get_cluster_ports(cluster_ports_key)[0] # Always use the first node in the cluster
+ else:
+ port = self._get_broker_port(broker_port_key)
+ return self._find_create_broker("localhost:%s" % port)
+
+ def _get_msg_subject(self, topic_key):
+ """
+ Return an appropriate subject for sending a message to a known topic. Return None if there is no topic.
+ """
+ if len(topic_key) == 0: return None
+ if "*" in topic_key: return topic_key.replace("*", "test")
+ if "#" in topic_key: return topic_key.replace("#", "multipart.test")
+ return topic_key
+
+ def _send_msgs(self, session_name, broker, addr, msg_count, msg_content = "Message_%03d", topic_key = "",
+ msg_durable_flag = False, enq_txn_size = 0):
+ """
+ Send messages to a broker using address addr
+ """
+ send_session = broker.session(session_name, transactional_flag = enq_txn_size > 0)
+ sender = send_session.sender(addr)
+ txn_cnt = 0
+ for i in range(0, msg_count):
+ sender.send(Message(msg_content % (i + 1), subject = self._get_msg_subject(topic_key), durable = msg_durable_flag))
+ if enq_txn_size > 0:
+ txn_cnt += 1
+ if txn_cnt >= enq_txn_size:
+ send_session.commit()
+ txn_cnt = 0
+ if enq_txn_size > 0 and txn_cnt > 0:
+ send_session.commit()
+ sender.close()
+ send_session.close()
+
+ def _receive_msgs(self, session_name, broker, addr, msg_count, msg_content = "Message_%03d", deq_txn_size = 0,
+ timeout = 0):
+ """
+ Receive messages from a broker
+ """
+ receive_session = broker.session(session_name, transactional_flag = deq_txn_size > 0)
+ receiver = receive_session.receiver(addr)
+ txn_cnt = 0
+ for i in range(0, msg_count):
+ try:
+ msg = receiver.fetch(timeout = timeout)
+ if deq_txn_size > 0:
+ txn_cnt += 1
+ if txn_cnt >= deq_txn_size:
+ receive_session.commit()
+ txn_cnt = 0
+ receive_session.acknowledge()
+ except Empty:
+ if deq_txn_size > 0: receive_session.rollback()
+ receiver.close()
+ receive_session.close()
+ if i == 0:
+ self.fail("Broker %s queue \"%s\" is empty" % (broker.qmf_broker.getBroker().getUrl(), addr))
+ else:
+ self.fail("Unable to receive message %d from broker %s queue \"%s\"" % (i, broker.qmf_broker.getBroker().getUrl(), addr))
+ if msg.content != msg_content % (i + 1):
+ receiver.close()
+ receive_session.close()
+ self.fail("Unexpected message \"%s\", was expecting \"%s\"." % (msg.content, msg_content % (i + 1)))
+ try:
+ msg = receiver.fetch(timeout = 0)
+ if deq_txn_size > 0: receive_session.rollback()
+ receiver.close()
+ receive_session.close()
+ self.fail("Extra message \"%s\" found on broker %s address \"%s\"" % (msg.content, broker.qmf_broker.getBroker().getUrl(), addr))
+ except Empty:
+ pass
+ if deq_txn_size > 0 and txn_cnt > 0:
+ receive_session.commit()
+ receiver.close()
+ receive_session.close()
+
+ #--- QMF-specific utility functions
+
+ def _get_qmf_property(self, props, key):
+ """
+ Get the value of a named property key kj from a property list [(k0, v0), (k1, v1), ... (kn, vn)].
+ """
+ for k,v in props:
+ if k.name == key:
+ return v
+ return None
+
+ def _check_qmf_return(self, method_result):
+ """
+ Check the result of a Qmf-defined method call
+ """
+ self.assertTrue(method_result.status == 0, method_result.text)
+
+ def _check_optional_qmf_property(self, qmf_broker, type, qmf_object, key, expected_val, obj_ref_flag):
+ """
+ Optional Qmf properties don't show up in the properties list when they are not specified. Checks for
+ these property types involve searching the properties list and making sure it is present or not as
+ expected.
+ """
+ val = self._get_qmf_property(qmf_object.getProperties(), key)
+ if val is None:
+ if len(expected_val) > 0:
+ self.fail("%s %s exists, but has does not have %s property. Expected value: \"%s\"" %
+ (type, qmf_object.name, key, expected_val))
+ else:
+ if len(expected_val) > 0:
+ if obj_ref_flag:
+ obj = self.qmf.getObjects(_objectId = val, _broker = qmf_broker.getBroker())
+ self.assertEqual(len(obj), 1, "More than one object with the same objectId: %s" % obj)
+ val = obj[0].name
+ self.assertEqual(val, expected_val, "%s %s exists, but has incorrect %s property. Found \"%s\", expected \"%s\"" %
+ (type, qmf_object.name, key, val, expected_val))
+ else:
+ self.fail("%s %s exists, but has an unexpected %s property \"%s\" set." % (type, qmf_object.name, key, val))
+
+ #--- Find/create Qmf broker objects
+
+ def _find_qmf_broker(self, url):
+ """
+ Find the Qmf broker object for the given broker URL. The broker must have been previously added to Qmf through
+ addBroker()
+ """
+ for b in self.qmf.getObjects(_class="broker"):
+ if b.getBroker().getUrl() == url:
+ return b
+ return None
+
+ def _find_create_broker(self, url):
+ """
+ Find a running broker through Qmf. If it does not exist, add it (assuming the broker is already running).
+ """
+ broker = self._Broker(url)
+ self._brokers.append(broker)
+ if self.qmf is not None:
+ qmf_broker = self._find_qmf_broker(broker.url)
+ if qmf_broker is None:
+ self.qmf.addBroker(broker.url)
+ broker.qmf_broker = self._find_qmf_broker(broker.url)
+ else:
+ broker.qmf_broker = qmf_broker
+ return broker
+
+ #--- Find/create/delete exchanges
+
+ def _find_qmf_exchange(self, qmf_broker, name, type, alternate, durable, auto_delete):
+ """
+ Find Qmf exchange object
+ """
+ for e in self.qmf.getObjects(_class="exchange", _broker = qmf_broker.getBroker()):
+ if e.name == name:
+ if len(name) == 0 or (len(name) >= 4 and name[:4] == "amq."): return e # skip checks for special exchanges
+ self.assertEqual(e.type, type,
+ "Exchange \"%s\" exists, but is of unexpected type %s; expected type %s." %
+ (name, e.type, type))
+ self._check_optional_qmf_property(qmf_broker, "Exchange", e, "altExchange", alternate, True)
+ self.assertEqual(e.durable, durable,
+ "Exchange \"%s\" exists, but has incorrect durability. Found durable=%s, expected durable=%s" %
+ (name, e.durable, durable))
+ self.assertEqual(e.autoDelete, auto_delete,
+ "Exchange \"%s\" exists, but has incorrect auto-delete property. Found %s, expected %s" %
+ (name, e.autoDelete, auto_delete))
+ return e
+ return None
+
+ def _find_create_qmf_exchange(self, qmf_broker, name, type, alternate, durable, auto_delete, args):
+ """
+ Find Qmf exchange object if exchange exists, create exchange and return its Qmf object if not
+ """
+ e = self._find_qmf_exchange(qmf_broker, name, type, alternate, durable, auto_delete)
+ if e is not None: return e
+ # Does not exist, so create it
+ props = dict({"exchange-type": type, "type": type, "durable": durable, "auto-delete": auto_delete, "alternate-exchange": alternate}, **args)
+ self._check_qmf_return(qmf_broker.create(type="exchange", name=name, properties=props, strict=True))
+ e = self._find_qmf_exchange(qmf_broker, name, type, alternate, durable, auto_delete)
+ self.assertNotEqual(e, None, "Creation of exchange %s on broker %s failed" % (name, qmf_broker.getBroker().getUrl()))
+ return e
+
+ def _find_delete_qmf_exchange(self, qmf_broker, name, type, alternate, durable, auto_delete):
+ """
+ Find and delete Qmf exchange object if it exists
+ """
+ e = self._find_qmf_exchange(qmf_broker, name, type, alternate, durable, auto_delete)
+ if e is not None and not auto_delete:
+ self._check_qmf_return(qmf_broker.delete(type="exchange", name=name, options={}))
+
+ #--- Find/create/delete queues
+
+ def _find_qmf_queue(self, qmf_broker, name, alternate_exchange, durable, exclusive, auto_delete):
+ """
+ Find a Qmf queue object
+ """
+ for q in self.qmf.getObjects(_class="queue", _broker = qmf_broker.getBroker()):
+ if q.name == name:
+ self._check_optional_qmf_property(qmf_broker, "Queue", q, "altExchange", alternate_exchange, True)
+ self.assertEqual(q.durable, durable,
+ "Queue \"%s\" exists, but has incorrect durable property. Found %s, expected %s" %
+ (name, q.durable, durable))
+ self.assertEqual(q.exclusive, exclusive,
+ "Queue \"%s\" exists, but has incorrect exclusive property. Found %s, expected %s" %
+ (name, q.exclusive, exclusive))
+ self.assertEqual(q.autoDelete, auto_delete,
+ "Queue \"%s\" exists, but has incorrect auto-delete property. Found %s, expected %s" %
+ (name, q.autoDelete, auto_delete))
+ return q
+ return None
+
+ def _find_create_qmf_queue(self, qmf_broker, name, alternate_exchange, durable, exclusive, auto_delete, args):
+ """
+ Find Qmf queue object if queue exists, create queue and return its Qmf object if not
+ """
+ q = self._find_qmf_queue(qmf_broker, name, alternate_exchange, durable, exclusive, auto_delete)
+ if q is not None: return q
+ # Queue does not exist, so create it
+ props = dict({"durable": durable, "auto-delete": auto_delete, "exclusive": exclusive, "alternate-exchange": alternate_exchange}, **args)
+ self._check_qmf_return(qmf_broker.create(type="queue", name=name, properties=props, strict=True))
+ q = self._find_qmf_queue(qmf_broker, name, alternate_exchange, durable, exclusive, auto_delete)
+ self.assertNotEqual(q, None, "Creation of queue %s on broker %s failed" % (name, qmf_broker.getBroker().getUrl()))
+ return q
+
+ def _find_delete_qmf_queue(self, qmf_broker, name, alternate_exchange, durable, exclusive, auto_delete, args):
+ """
+ Find and delete Qmf queue object if it exists
+ """
+ q = self._find_qmf_queue(qmf_broker, name, alternate_exchange, durable, exclusive, auto_delete)
+ if q is not None and not auto_delete:
+ self._check_qmf_return(qmf_broker.delete(type="queue", name=name, options={}))
+
+ #--- Find/create/delete bindings (between an exchange and a queue)
+
+ def _find_qmf_binding(self, qmf_broker, qmf_exchange, qmf_queue, binding_key, binding_args):
+ """
+ Find a Qmf binding object
+ """
+ for b in self.qmf.getObjects(_class="binding", _broker = qmf_broker.getBroker()):
+ if b.exchangeRef == qmf_exchange.getObjectId() and b.queueRef == qmf_queue.getObjectId():
+ if qmf_exchange.type != "fanout": # Fanout ignores the binding key, and always returns "" as the key
+ self.assertEqual(b.bindingKey, binding_key,
+ "Binding between exchange %s and queue %s exists, but has mismatching binding key: Found %s, expected %s." %
+ (qmf_exchange.name, qmf_queue.name, b.bindingKey, binding_key))
+ self.assertEqual(b.arguments, binding_args,
+ "Binding between exchange %s and queue %s exists, but has mismatching arguments: Found %s, expected %s" %
+ (qmf_exchange.name, qmf_queue.name, b.arguments, binding_args))
+ return b
+ return None
+
+ def _find_create_qmf_binding(self, qmf_broker, qmf_exchange, qmf_queue, binding_key, binding_args):
+ """
+ Find Qmf binding object if it exists, create binding and return its Qmf object if not
+ """
+ b = self._find_qmf_binding(qmf_broker, qmf_exchange, qmf_queue, binding_key, binding_args)
+ if b is not None: return b
+ # Does not exist, so create it
+ self._check_qmf_return(qmf_broker.create(type="binding", name="%s/%s/%s" % (qmf_exchange.name, qmf_queue.name, binding_key), properties=binding_args, strict=True))
+ b = self._find_qmf_binding(qmf_broker, qmf_exchange, qmf_queue, binding_key, binding_args)
+ self.assertNotEqual(b, None, "Creation of binding between exchange %s and queue %s with key %s failed" %
+ (qmf_exchange.name, qmf_queue.name, binding_key))
+ return b
+
+ def _find_delete_qmf_binding(self, qmf_broker, qmf_exchange, qmf_queue, binding_key, binding_args):
+ """
+ Find and delete Qmf binding object if it exists
+ """
+ b = self._find_qmf_binding(qmf_broker, qmf_exchange, qmf_queue, binding_key, binding_args)
+ if b is not None:
+ if len(qmf_exchange.name) > 0: # not default exchange
+ self._check_qmf_return(qmf_broker.delete(type="binding", name="%s/%s/%s" % (qmf_exchange.name, qmf_queue.name, binding_key), options={}))
+
+ #--- Find/create a link
+
+ def _find_qmf_link(self, qmf_from_broker_proxy, host, port):
+ """
+ Find a Qmf link object
+ """
+ for l in self.qmf.getObjects(_class="link", _broker=qmf_from_broker_proxy):
+ if l.host == host and l.port == port:
+ return l
+ return None
+
+ def _find_create_qmf_link(self, qmf_from_broker, qmf_to_broker_proxy, link_durable_flag, auth_mechanism, user_id,
+ password, transport, pause_interval, link_ready_timeout):
+ """
+ Find a Qmf link object if it exists, create it and return its Qmf link object if not
+ """
+ to_broker_host = qmf_to_broker_proxy.host
+ to_broker_port = qmf_to_broker_proxy.port
+ l = self._find_qmf_link(qmf_from_broker.getBroker(), to_broker_host, to_broker_port)
+ if l is not None: return l
+ # Does not exist, so create it
+ self._check_qmf_return(qmf_from_broker.connect(to_broker_host, to_broker_port, link_durable_flag, auth_mechanism, user_id, password, transport))
+ l = self._find_qmf_link(qmf_from_broker.getBroker(), to_broker_host, to_broker_port)
+ self.assertNotEqual(l, None, "Creation of link from broker %s to broker %s failed" %
+ (qmf_from_broker.getBroker().getUrl(), qmf_to_broker_proxy.getUrl()))
+ self._wait_for_link(l, pause_interval, link_ready_timeout)
+ return l
+
+ def _wait_for_link(self, link, pause_interval, link_ready_timeout):
+ """
+ Wait for link to become active (state=Operational)
+ """
+ tot_time = 0
+ link.update()
+ if link.state == "":
+ # Link mgmt updates for the c++ link object are disabled when in a cluster because of inconsistent state:
+ # one is "Operational", the other "Passive". In this case, wait a bit and hope for the best...
+ sleep(2*pause_interval)
+ else:
+ while link.state != "Operational" and tot_time < link_ready_timeout:
+ sleep(pause_interval)
+ tot_time += pause_interval
+ link.update()
+ self.assertEqual(link.state, "Operational", "Timeout: Link not operational, state=%s" % link.state)
+
+ #--- Find/create a bridge
+
+ def _find_qmf_bridge(self, qmf_broker_proxy, qmf_link, source, destination, key):
+ """
+ Find a Qmf link object
+ """
+ for b in self.qmf.getObjects(_class="bridge", _broker=qmf_broker_proxy):
+ if b.linkRef == qmf_link.getObjectId() and b.src == source and b.dest == destination and b.key == key:
+ return b
+ return None
+
+ def _find_create_qmf_bridge(self, qmf_broker_proxy, qmf_link, queue_name, exch_name, topic_key,
+ queue_route_type_flag, bridge_durable_flag):
+ """
+ Find a Qmf bridge object if it exists, create it and return its Qmf object if not
+ """
+ if queue_route_type_flag:
+ src = queue_name
+ dest = exch_name
+ key = ""
+ else:
+ src = exch_name
+ dest = exch_name
+ if len(topic_key) > 0:
+ key = topic_key
+ else:
+ key = queue_name
+ b = self._find_qmf_bridge(qmf_broker_proxy, qmf_link, src, dest, key)
+ if b is not None:
+ return b
+ # Does not exist, so create it
+ self._check_qmf_return(qmf_link.bridge(bridge_durable_flag, src, dest, key, "", "", queue_route_type_flag, False, False, 1))
+ b = self._find_qmf_bridge(qmf_broker_proxy, qmf_link, src, dest, key)
+ self.assertNotEqual(b, None, "Bridge creation failed: src=%s dest=%s key=%s" % (src, dest, key))
+ return b
+
+ def _wait_for_bridge(self, bridge, src_broker, dest_broker, exch_name, queue_name, topic_key, pause_interval,
+ bridge_ready_timeout):
+ """
+ Wait for bridge to become active by sending messages over the bridge at 1 sec intervals until they are
+ observed at the destination.
+ """
+ tot_time = 0
+ active = False
+ send_session = src_broker.session("tx")
+ sender = send_session.sender(self._get_send_address(exch_name, queue_name))
+ src_receive_session = src_broker.session("src_rx")
+ src_receiver = src_receive_session.receiver(queue_name)
+ dest_receive_session = dest_broker.session("dest_rx")
+ dest_receiver = dest_receive_session.receiver(queue_name)
+ while not active and tot_time < bridge_ready_timeout:
+ sender.send(Message("xyz123", subject = self._get_msg_subject(topic_key)))
+ try:
+ src_receiver.fetch(timeout = 0)
+ src_receive_session.acknowledge()
+ # Keep receiving msgs, as several may have accumulated
+ while True:
+ dest_receiver.fetch(timeout = 0)
+ dest_receive_session.acknowledge()
+ sleep(1)
+ active = True
+ except Empty:
+ sleep(pause_interval)
+ tot_time += pause_interval
+ dest_receiver.close()
+ dest_receive_session.close()
+ src_receiver.close()
+ src_receive_session.close()
+ sender.close()
+ send_session.close()
+ self.assertTrue(active, "Bridge failed to become active after %ds: %s" % (bridge_ready_timeout, bridge))
+
+ #--- Find/create/delete utility functions
+
+ def _create_and_bind(self, qmf_broker, exchange_args, queue_args, binding_args):
+ """
+ Create a binding between a named exchange and queue on a broker
+ """
+ e = self._find_create_qmf_exchange(qmf_broker, **exchange_args)
+ q = self._find_create_qmf_queue(qmf_broker, **queue_args)
+ return self._find_create_qmf_binding(qmf_broker, e, q, **binding_args)
+
+ def _check_alt_exchange(self, qmf_broker, alt_exch_name, alt_exch_type, alt_exch_op):
+ """
+ Check for existence of alternate exchange. Return the Qmf exchange proxy object for the alternate exchange
+ """
+ if len(alt_exch_name) == 0: return None
+ if alt_exch_op == _alt_exch_ops.create:
+ return self._find_create_qmf_exchange(qmf_broker=qmf_broker, name=alt_exch_name, type=alt_exch_type,
+ alternate="", durable=False, auto_delete=False, args={})
+ if alt_exch_op == _alt_exch_ops.delete:
+ return self._find_delete_qmf_exchange(qmf_broker=qmf_broker, name=alt_exch_name, type=alt_exch_type,
+ alternate="", durable=False, auto_delete=False)
+ return self._find_qmf_exchange(qmf_broker=qmf_broker, name=alt_exchange_name, type=alt_exchange_type,
+ alternate="", durable=False, auto_delete=False)
+
+ def _delete_queue_binding(self, qmf_broker, exchange_args, queue_args, binding_args):
+ """
+ Delete a queue and the binding between it and the exchange
+ """
+ e = self._find_qmf_exchange(qmf_broker, exchange_args["name"], exchange_args["type"], exchange_args["alternate"], exchange_args["durable"], exchange_args["auto_delete"])
+ q = self._find_qmf_queue(qmf_broker, queue_args["name"], queue_args["alternate_exchange"], queue_args["durable"], queue_args["exclusive"], queue_args["auto_delete"])
+ self._find_delete_qmf_binding(qmf_broker, e, q, **binding_args)
+ self._find_delete_qmf_queue(qmf_broker, **queue_args)
+
+ def _create_route(self, queue_route_type_flag, src_broker, dest_broker, exch_name, queue_name, topic_key,
+ link_durable_flag, bridge_durable_flag, auth_mechanism, user_id, password, transport,
+ pause_interval = 1, link_ready_timeout = 20, bridge_ready_timeout = 20):
+ """
+ Create a route from a source broker to a destination broker
+ """
+ l = self._find_create_qmf_link(dest_broker.qmf_broker, src_broker.qmf_broker.getBroker(), link_durable_flag,
+ auth_mechanism, user_id, password, transport, pause_interval, link_ready_timeout)
+ self._links.append(l)
+ b = self._find_create_qmf_bridge(dest_broker.qmf_broker.getBroker(), l, queue_name, exch_name, topic_key,
+ queue_route_type_flag, bridge_durable_flag)
+ self._bridges.append(b)
+ self._wait_for_bridge(b, src_broker, dest_broker, exch_name, queue_name, topic_key, pause_interval, bridge_ready_timeout)
+
+ # Parameterized test - entry point for tests
+
+ def _do_test(self,
+ test_name, # Name of test
+ exch_name = "amq.direct", # Remote exchange name
+ exch_type = "direct", # Remote exchange type
+ exch_alt_exch = "", # Remote exchange alternate exchange
+ exch_alt_exch_type = "direct", # Remote exchange alternate exchange type
+ exch_durable_flag = False, # Remote exchange durability
+ exch_auto_delete_flag = False, # Remote exchange auto-delete property
+ exch_x_args = {}, # Remote exchange args
+ queue_alt_exch = "", # Remote queue alternate exchange
+ queue_alt_exch_type = "direct", # Remote queue alternate exchange type
+ queue_durable_flag = False, # Remote queue durability
+ queue_exclusive_flag = False, # Remote queue exclusive property
+ queue_auto_delete_flag = False, # Remote queue auto-delete property
+ queue_x_args = {}, # Remote queue args
+ binding_durable_flag = False, # Remote binding durability
+ binding_x_args = {}, # Remote binding args
+ topic_key = "", # Binding key For remote topic exchanges only
+ msg_count = 10, # Number of messages to send
+ msg_durable_flag = False, # Message durability
+ link_durable_flag = False, # Route link durability
+ bridge_durable_flag = False, # Route bridge durability
+ queue_route_type_flag = False, # Route type: false = bridge route, true = queue route
+ enq_txn_size = 0, # Enqueue transaction size, 0 = no transactions
+ deq_txn_size = 0, # Dequeue transaction size, 0 = no transactions
+ local_cluster_flag = False, # Use a node from the local cluster, otherwise use single local broker
+ remote_cluster_flag = False, # Use a node from the remote cluster, otherwise use single remote broker
+ alt_exch_op = _alt_exch_ops.create,# Op on alt exch [create (ensure present), delete (ensure not present), none (neither create nor delete)]
+ auth_mechanism = "", # Authorization mechanism for linked broker
+ user_id = "", # User ID for authorization on linked broker
+ password = "", # Password for authorization on linked broker
+ transport = "tcp" # Transport for route to linked broker
+ ):
+ """
+ Parameterized federation test. Sets up a federated link between a source broker and a destination broker and
+ checks that messages correctly pass over the link to the destination. Where appropriate (non-queue-routes), also
+ checks for the presence of messages on the source broker.
+
+ In these tests, the concept is to create a LOCAL broker, then create a link to a REMOTE broker using federation.
+ In other words, the messages sent to the LOCAL broker will be replicated on the REMOTE broker, and tests are
+ performed on the REMOTE broker to check that the required messages are present. In the case of regular routes,
+ the LOCAL broker will also retain the messages, and a similar test is performed on this broker.
+
+ TODO: There are several items to improve here:
+ 1. _do_test() is rather general. Rather create a version for each exchange type and test the exchange/queue
+ interaction in more detail based on the exchange type
+ 2. Add a headers and an xml exchange type
+ 3. Restructure the tests to start and stop brokers and clusters directly rather than relying on previously
+ started brokers. Then persistence can be checked by stopping and restarting the brokers/clusters. In particular,
+ test the persistence of links and bridges, both of which take a persistence flag.
+ 4. Test the behavior of the alternate exchanges when messages are sourced through a link. Also check behavior
+ when the alternate exchange is not present or is deleted after the reference is made.
+ 5. Test special queue types (eg LVQ)
+ """
+ local_broker = self._get_broker(local_cluster_flag, "local-port", "local-cluster-ports")
+ remote_broker = self._get_broker(remote_cluster_flag, "remote-port", "remote-cluster-ports")
+
+ # Check alternate exchanges exist (and create them if not) on both local and remote brokers
+ self._check_alt_exchange(local_broker.qmf_broker, exch_alt_exch, exch_alt_exch_type, alt_exch_op)
+ self._check_alt_exchange(local_broker.qmf_broker, queue_alt_exch, queue_alt_exch_type, alt_exch_op)
+ self._check_alt_exchange(remote_broker.qmf_broker, exch_alt_exch, exch_alt_exch_type, alt_exch_op)
+ self._check_alt_exchange(remote_broker.qmf_broker, queue_alt_exch, queue_alt_exch_type, alt_exch_op)
+
+ queue_name = "queue_%s" % test_name
+ exchange_args = {"name": exch_name, "type": exch_type, "alternate": exch_alt_exch,
+ "durable": exch_durable_flag, "auto_delete": exch_auto_delete_flag, "args": exch_x_args}
+ queue_args = {"name": queue_name, "alternate_exchange": queue_alt_exch, "durable": queue_durable_flag,
+ "exclusive": queue_exclusive_flag, "auto_delete": queue_auto_delete_flag, "args": queue_x_args}
+ binding_args = {"binding_args": binding_x_args}
+ if exch_type == "topic":
+ self.assertTrue(len(topic_key) > 0, "Topic exchange selected, but no topic key was set.")
+ binding_args["binding_key"] = topic_key
+ elif exch_type == "direct":
+ binding_args["binding_key"] = queue_name
+ else:
+ binding_args["binding_key"] = ""
+ self._create_and_bind(qmf_broker=local_broker.qmf_broker, exchange_args=exchange_args, queue_args=queue_args, binding_args=binding_args)
+ self._create_and_bind(qmf_broker=remote_broker.qmf_broker, exchange_args=exchange_args, queue_args=queue_args, binding_args=binding_args)
+ self._create_route(queue_route_type_flag, local_broker, remote_broker, exch_name, queue_name, topic_key,
+ link_durable_flag, bridge_durable_flag, auth_mechanism, user_id, password, transport)
+
+ self._send_msgs("send_session", local_broker, addr = self._get_send_address(exch_name, queue_name),
+ msg_count = msg_count, topic_key = topic_key, msg_durable_flag = msg_durable_flag, enq_txn_size = enq_txn_size)
+ if not queue_route_type_flag:
+ self._receive_msgs("local_receive_session", local_broker, addr = queue_name, msg_count = msg_count, deq_txn_size = deq_txn_size)
+ self._receive_msgs("remote_receive_session", remote_broker, addr = queue_name, msg_count = msg_count, deq_txn_size = deq_txn_size, timeout = 5)
+
+ # Clean up
+ self._delete_queue_binding(qmf_broker=local_broker.qmf_broker, exchange_args=exchange_args, queue_args=queue_args, binding_args=binding_args)
+ self._delete_queue_binding(qmf_broker=remote_broker.qmf_broker, exchange_args=exchange_args, queue_args=queue_args, binding_args=binding_args)
+
+class A_ShortTests(QmfTestBase010):
+
+ def test_route_defaultExch(self):
+ self._do_test(self._get_name())
+
+ def test_queueRoute_defaultExch(self):
+ self._do_test(self._get_name(), queue_route_type_flag=True)
+
+
+class A_LongTests(QmfTestBase010):
+
+ def test_route_amqDirectExch(self):
+ self._do_test(self._get_name(), exch_name="amq.direct")
+
+ def test_queueRoute_amqDirectExch(self):
+ self._do_test(self._get_name(), exch_name="amq.direct", queue_route_type_flag=True)
+
+
+ def test_route_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange")
+
+ def test_queueRoute_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True)
+
+
+ def test_route_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout")
+
+ def test_queueRoute_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True)
+
+
+ def test_route_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#")
+
+ def test_queueRoute_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True)
+
+
+class B_ShortTransactionTests(QmfTestBase010):
+
+ def test_txEnq01_route_defaultExch(self):
+ self._do_test(self._get_name(), enq_txn_size=1)
+
+ def test_txEnq01_queueRoute_defaultExch(self):
+ self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=1)
+
+ def test_txEnq01_txDeq01_route_defaultExch(self):
+ self._do_test(self._get_name(), enq_txn_size=1, deq_txn_size=1)
+
+ def test_txEnq01_txDeq01_queueRoute_defaultExch(self):
+ self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1)
+
+
+class B_LongTransactionTests(QmfTestBase010):
+
+ def test_txEnq10_route_defaultExch(self):
+ self._do_test(self._get_name(), enq_txn_size=10, msg_count = 103)
+
+ def test_txEnq10_queueRoute_defaultExch(self):
+ self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=10, msg_count = 103)
+
+
+
+
+ def test_txEnq01_route_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=1)
+
+ def test_txEnq01_queueRoute_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=1)
+
+ def test_txEnq10_route_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=10, msg_count = 103)
+
+ def test_txEnq10_queueRoute_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103)
+
+ def test_txEnq01_txDeq01_route_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=1, deq_txn_size=1)
+
+ def test_txEnq01_txDeq01_queueRoute_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1)
+
+
+ def test_txEnq01_route_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=1)
+
+ def test_txEnq01_queueRoute_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=1)
+
+ def test_txEnq10_route_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=10, msg_count = 103)
+
+ def test_txEnq10_queueRoute_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103)
+
+ def test_txEnq01_txDeq01_route_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=1, deq_txn_size=1)
+
+ def test_txEnq01_txDeq01_queueRoute_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1)
+
+
+ def test_txEnq01_route_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=1)
+
+ def test_txEnq01_queueRoute_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=1)
+
+ def test_txEnq10_route_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=10, msg_count = 103)
+
+ def test_txEnq10_queueRoute_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103)
+
+ def test_txEnq01_txDeq01_route_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=1, deq_txn_size=1)
+
+ def test_txEnq01_txDeq01_queueRoute_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1)
+
+
+class C_ShortClusterTests(QmfTestBase010):
+
+ def test_locCluster_route_defaultExch(self):
+ self._do_test(self._get_name(), local_cluster_flag=True)
+
+ def test_locCluster_queueRoute_defaultExch(self):
+ self._do_test(self._get_name(), queue_route_type_flag=True, local_cluster_flag=True)
+
+ def test_remCluster_route_defaultExch(self):
+ self._do_test(self._get_name(), remote_cluster_flag=True)
+
+ def test_remCluster_queueRoute_defaultExch(self):
+ self._do_test(self._get_name(), queue_route_type_flag=True, remote_cluster_flag=True)
+
+ def test_locCluster_remCluster_route_defaultExch(self):
+ self._do_test(self._get_name(), local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_locCluster_remCluster_queueRoute_defaultExch(self):
+ self._do_test(self._get_name(), queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True)
+
+
+class C_LongClusterTests(QmfTestBase010):
+
+ def test_locCluster_route_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", local_cluster_flag=True)
+
+ def test_locCluster_queueRoute_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, local_cluster_flag=True)
+
+ def test_remCluster_route_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", remote_cluster_flag=True)
+
+ def test_remCluster_queueRoute_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, remote_cluster_flag=True)
+
+ def test_locCluster_remCluster_route_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_locCluster_remCluster_queueRoute_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True)
+
+
+ def test_locCluster_route_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", local_cluster_flag=True)
+
+ def test_locCluster_queueRoute_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, local_cluster_flag=True)
+
+ def test_remCluster_route_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", remote_cluster_flag=True)
+
+ def test_remCluster_queueRoute_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, remote_cluster_flag=True)
+
+ def test_locCluster_remCluster_route_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_locCluster_remCluster_queueRoute_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True)
+
+
+ def test_locCluster_route_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", local_cluster_flag=True)
+
+ def test_locCluster_queueRoute_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, local_cluster_flag=True)
+
+ def test_remCluster_route_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", remote_cluster_flag=True)
+
+ def test_remCluster_queueRoute_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, remote_cluster_flag=True)
+
+ def test_locCluster_remCluster_route_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_locCluster_remCluster_queueRoute_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True)
+
+
+class D_ShortClusterTransactionTests(QmfTestBase010):
+
+ def test_txEnq01_locCluster_route_defaultExch(self):
+ self._do_test(self._get_name(), enq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_locCluster_queueRoute_defaultExch(self):
+ self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_route_defaultExch(self):
+ self._do_test(self._get_name(), enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_queueRoute_defaultExch(self):
+ self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_remCluster_route_defaultExch(self):
+ self._do_test(self._get_name(), enq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_remCluster_queueRoute_defaultExch(self):
+ self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_remCluster_route_defaultExch(self):
+ self._do_test(self._get_name(), enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_remCluster_queueRoute_defaultExch(self):
+ self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_locCluster_remCluster_route_defaultExch(self):
+ self._do_test(self._get_name(), enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_locCluster_remCluster_queueRoute_defaultExch(self):
+ self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_remCluster_route_defaultExch(self):
+ self._do_test(self._get_name(), enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_defaultExch(self):
+ self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+
+class D_LongClusterTransactionTests(QmfTestBase010):
+
+ def test_txEnq10_locCluster_route_defaultExch(self):
+ self._do_test(self._get_name(), enq_txn_size=10, msg_count = 103, local_cluster_flag=True)
+
+ def test_txEnq10_locCluster_queueRoute_defaultExch(self):
+ self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True)
+
+ def test_txEnq10_remCluster_route_defaultExch(self):
+ self._do_test(self._get_name(), enq_txn_size=10, msg_count = 103, remote_cluster_flag=True)
+
+ def test_txEnq10_remCluster_queueRoute_defaultExch(self):
+ self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True)
+
+ def test_txEnq10_locCluster_remCluster_route_defaultExch(self):
+ self._do_test(self._get_name(), enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq10_locCluster_remCluster_queueRoute_defaultExch(self):
+ self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True)
+
+
+ def test_txEnq01_locCluster_route_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_locCluster_queueRoute_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq10_locCluster_route_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=10, msg_count = 103, local_cluster_flag=True)
+
+ def test_txEnq10_locCluster_queueRoute_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_route_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_queueRoute_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_remCluster_route_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_remCluster_queueRoute_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq10_remCluster_route_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=10, msg_count = 103, remote_cluster_flag=True)
+
+ def test_txEnq10_remCluster_queueRoute_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_remCluster_route_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_remCluster_queueRoute_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_locCluster_remCluster_route_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_locCluster_remCluster_queueRoute_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq10_locCluster_remCluster_route_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq10_locCluster_remCluster_queueRoute_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_remCluster_route_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+
+ def test_txEnq01_locCluster_route_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_locCluster_queueRoute_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq10_locCluster_route_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=10, msg_count = 103, local_cluster_flag=True)
+
+ def test_txEnq10_locCluster_queueRoute_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_route_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_queueRoute_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_remCluster_route_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_remCluster_queueRoute_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq10_remCluster_route_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=10, msg_count = 103, remote_cluster_flag=True)
+
+ def test_txEnq10_remCluster_queueRoute_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_remCluster_route_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_remCluster_queueRoute_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_locCluster_remCluster_route_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_locCluster_remCluster_queueRoute_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq10_locCluster_remCluster_route_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq10_locCluster_remCluster_queueRoute_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_remCluster_route_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+
+ def test_txEnq01_locCluster_route_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_locCluster_queueRoute_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq10_locCluster_route_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=10, msg_count = 103, local_cluster_flag=True)
+
+ def test_txEnq10_locCluster_queueRoute_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_route_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_queueRoute_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_remCluster_route_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_remCluster_queueRoute_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq10_remCluster_route_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=10, msg_count = 103, remote_cluster_flag=True)
+
+ def test_txEnq10_remCluster_queueRoute_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_remCluster_route_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_remCluster_queueRoute_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_locCluster_remCluster_route_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_locCluster_remCluster_queueRoute_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq10_locCluster_remCluster_route_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq10_locCluster_remCluster_queueRoute_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_remCluster_route_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+
+class E_ShortPersistenceTests(QmfTestBase010):
+
+ def test_route_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True)
+
+ def test_route_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True)
+
+ def test_queueRoute_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True)
+
+ def test_queueRoute_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True)
+
+
+class E_LongPersistenceTests(QmfTestBase010):
+
+
+ def test_route_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True)
+
+ def test_route_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True)
+
+ def test_queueRoute_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True)
+
+ def test_queueRoute_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True)
+
+
+ def test_route_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True)
+
+ def test_route_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True)
+
+ def test_queueRoute_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True)
+
+ def test_queueRoute_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True)
+
+
+ def test_route_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True)
+
+ def test_route_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True)
+
+ def test_queueRoute_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True)
+
+ def test_queueRoute_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True)
+
+
+class F_ShortPersistenceTransactionTests(QmfTestBase010):
+
+ def test_txEnq01_route_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=1)
+
+ def test_txEnq01_route_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1)
+
+ def test_txEnq01_queueRoute_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1)
+
+ def test_txEnq01_queueRoute_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1)
+
+ def test_txEnq01_txDeq01_route_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1)
+
+ def test_txEnq01_txDeq01_route_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1)
+
+ def test_txEnq01_txDeq01_queueRoute_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1)
+
+ def test_txEnq01_txDeq01_queueRoute_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1)
+
+
+class F_LongPersistenceTransactionTests(QmfTestBase010):
+
+ def test_txEnq10_route_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=10, msg_count = 103)
+
+ def test_txEnq10_route_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103)
+
+ def test_txEnq10_queueRoute_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103)
+
+ def test_txEnq10_queueRoute_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103)
+
+
+
+
+ def test_txEnq01_route_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=1)
+
+ def test_txEnq01_route_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1)
+
+ def test_txEnq01_queueRoute_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1)
+
+ def test_txEnq01_queueRoute_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1)
+
+ def test_txEnq10_route_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=10, msg_count = 103)
+
+ def test_txEnq10_route_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103)
+
+ def test_txEnq10_queueRoute_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103)
+
+ def test_txEnq10_queueRoute_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103)
+
+ def test_txEnq01_txDeq01_route_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1)
+
+ def test_txEnq01_txDeq01_route_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1)
+
+ def test_txEnq01_txDeq01_queueRoute_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1)
+
+ def test_txEnq01_txDeq01_queueRoute_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1)
+
+
+ def test_txEnq01_route_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=1)
+
+ def test_txEnq01_route_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1)
+
+ def test_txEnq01_queueRoute_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1)
+
+ def test_txEnq01_queueRoute_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1)
+
+ def test_txEnq10_route_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=10, msg_count = 103)
+
+ def test_txEnq10_route_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103)
+
+ def test_txEnq10_queueRoute_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103)
+
+ def test_txEnq10_queueRoute_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103)
+
+ def test_txEnq01_txDeq01_route_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1)
+
+ def test_txEnq01_txDeq01_route_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1)
+
+ def test_txEnq01_txDeq01_queueRoute_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1)
+
+ def test_txEnq01_txDeq01_queueRoute_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1)
+
+
+ def test_txEnq01_route_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=1)
+
+ def test_txEnq01_route_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1)
+
+ def test_txEnq01_queueRoute_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1)
+
+ def test_txEnq01_queueRoute_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1)
+
+ def test_txEnq10_route_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=10, msg_count = 103)
+
+ def test_txEnq10_route_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103)
+
+ def test_txEnq10_queueRoute_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103)
+
+ def test_txEnq10_queueRoute_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103)
+
+ def test_txEnq01_txDeq01_route_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1)
+
+ def test_txEnq01_txDeq01_route_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1)
+
+ def test_txEnq01_txDeq01_queueRoute_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1)
+
+ def test_txEnq01_txDeq01_queueRoute_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1)
+
+
+class G_ShortPersistenceClusterTests(QmfTestBase010):
+
+ def test_locCluster_route_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, local_cluster_flag=True)
+
+ def test_locCluster_route_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, local_cluster_flag=True)
+
+ def test_locCluster_queueRoute_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True)
+
+ def test_locCluster_queueRoute_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True)
+
+ def test_remCluster_route_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, remote_cluster_flag=True)
+
+ def test_remCluster_route_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, remote_cluster_flag=True)
+
+ def test_remCluster_queueRoute_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, remote_cluster_flag=True)
+
+ def test_remCluster_queueRoute_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, remote_cluster_flag=True)
+
+ def test_locCluster_remCluster_route_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_locCluster_remCluster_route_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_locCluster_remCluster_queueRoute_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_locCluster_remCluster_queueRoute_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True)
+
+
+class G_LongPersistenceClusterTests(QmfTestBase010):
+
+
+
+ def test_locCluster_route_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, local_cluster_flag=True)
+
+ def test_locCluster_route_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, local_cluster_flag=True)
+
+ def test_locCluster_queueRoute_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True)
+
+ def test_locCluster_queueRoute_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True)
+
+ def test_remCluster_route_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, remote_cluster_flag=True)
+
+ def test_remCluster_route_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, remote_cluster_flag=True)
+
+ def test_remCluster_queueRoute_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, remote_cluster_flag=True)
+
+ def test_remCluster_queueRoute_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, remote_cluster_flag=True)
+
+ def test_locCluster_remCluster_route_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_locCluster_remCluster_route_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_locCluster_remCluster_queueRoute_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_locCluster_remCluster_queueRoute_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True)
+
+
+ def test_locCluster_route_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, local_cluster_flag=True)
+
+ def test_locCluster_route_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, local_cluster_flag=True)
+
+ def test_locCluster_queueRoute_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True)
+
+ def test_locCluster_queueRoute_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True)
+
+ def test_remCluster_route_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, remote_cluster_flag=True)
+
+ def test_remCluster_route_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, remote_cluster_flag=True)
+
+ def test_remCluster_queueRoute_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, remote_cluster_flag=True)
+
+ def test_remCluster_queueRoute_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, remote_cluster_flag=True)
+
+ def test_locCluster_remCluster_route_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_locCluster_remCluster_route_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_locCluster_remCluster_queueRoute_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_locCluster_remCluster_queueRoute_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True)
+
+
+ def test_locCluster_route_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, local_cluster_flag=True)
+
+ def test_locCluster_route_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, local_cluster_flag=True)
+
+ def test_locCluster_queueRoute_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True)
+
+ def test_locCluster_queueRoute_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True)
+
+ def test_remCluster_route_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, remote_cluster_flag=True)
+
+ def test_remCluster_route_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, remote_cluster_flag=True)
+
+ def test_remCluster_queueRoute_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, remote_cluster_flag=True)
+
+ def test_remCluster_queueRoute_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, remote_cluster_flag=True)
+
+ def test_locCluster_remCluster_route_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_locCluster_remCluster_route_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_locCluster_remCluster_queueRoute_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_locCluster_remCluster_queueRoute_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True)
+
+
+class H_ShortPersistenceClusterTransactionTests(QmfTestBase010):
+
+ def test_txEnq01_locCluster_route_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_locCluster_route_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_locCluster_queueRoute_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_locCluster_queueRoute_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_route_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_route_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_queueRoute_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_queueRoute_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_remCluster_route_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_remCluster_route_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_remCluster_queueRoute_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_remCluster_queueRoute_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_remCluster_route_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_remCluster_route_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_remCluster_queueRoute_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_remCluster_queueRoute_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_locCluster_remCluster_route_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_locCluster_remCluster_route_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_locCluster_remCluster_queueRoute_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_locCluster_remCluster_queueRoute_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_remCluster_route_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_remCluster_route_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+
+class H_LongPersistenceClusterTransactionTests(QmfTestBase010):
+
+ def test_txEnq10_locCluster_route_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True)
+
+ def test_txEnq10_locCluster_route_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True)
+
+ def test_txEnq10_locCluster_queueRoute_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True)
+
+ def test_txEnq10_locCluster_queueRoute_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True)
+
+ def test_txEnq10_remCluster_route_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True)
+
+ def test_txEnq10_remCluster_route_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True)
+
+ def test_txEnq10_remCluster_queueRoute_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True)
+
+ def test_txEnq10_remCluster_queueRoute_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True)
+
+ def test_txEnq10_locCluster_remCluster_route_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq10_locCluster_remCluster_route_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq10_locCluster_remCluster_queueRoute_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq10_locCluster_remCluster_queueRoute_durMsg_durQueue_defaultExch(self):
+ self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True)
+
+
+
+
+ def test_txEnq01_locCluster_route_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_locCluster_route_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_locCluster_queueRoute_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_locCluster_queueRoute_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq10_locCluster_route_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True)
+
+ def test_txEnq10_locCluster_route_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True)
+
+ def test_txEnq10_locCluster_queueRoute_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True)
+
+ def test_txEnq10_locCluster_queueRoute_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_route_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_route_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_queueRoute_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_queueRoute_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_remCluster_route_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_remCluster_route_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_remCluster_queueRoute_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_remCluster_queueRoute_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq10_remCluster_route_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True)
+
+ def test_txEnq10_remCluster_route_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True)
+
+ def test_txEnq10_remCluster_queueRoute_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True)
+
+ def test_txEnq10_remCluster_queueRoute_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_remCluster_route_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_remCluster_route_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_remCluster_queueRoute_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_remCluster_queueRoute_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_locCluster_remCluster_route_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_locCluster_remCluster_route_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_locCluster_remCluster_queueRoute_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_locCluster_remCluster_queueRoute_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq10_locCluster_remCluster_route_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq10_locCluster_remCluster_route_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq10_locCluster_remCluster_queueRoute_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq10_locCluster_remCluster_queueRoute_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_remCluster_route_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_remCluster_route_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_durMsg_durQueue_directExch(self):
+ self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+
+ def test_txEnq01_locCluster_route_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_locCluster_route_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_locCluster_queueRoute_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_locCluster_queueRoute_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq10_locCluster_route_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True)
+
+ def test_txEnq10_locCluster_route_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True)
+
+ def test_txEnq10_locCluster_queueRoute_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True)
+
+ def test_txEnq10_locCluster_queueRoute_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_route_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_route_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_queueRoute_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_queueRoute_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_remCluster_route_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_remCluster_route_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_remCluster_queueRoute_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_remCluster_queueRoute_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq10_remCluster_route_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True)
+
+ def test_txEnq10_remCluster_route_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True)
+
+ def test_txEnq10_remCluster_queueRoute_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True)
+
+ def test_txEnq10_remCluster_queueRoute_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_remCluster_route_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_remCluster_route_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_remCluster_queueRoute_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_remCluster_queueRoute_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_locCluster_remCluster_route_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_locCluster_remCluster_route_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_locCluster_remCluster_queueRoute_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_locCluster_remCluster_queueRoute_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq10_locCluster_remCluster_route_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq10_locCluster_remCluster_route_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq10_locCluster_remCluster_queueRoute_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq10_locCluster_remCluster_queueRoute_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_remCluster_route_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_remCluster_route_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_durMsg_durQueue_fanoutExch(self):
+ self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+
+ def test_txEnq01_locCluster_route_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_locCluster_route_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_locCluster_queueRoute_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_locCluster_queueRoute_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq10_locCluster_route_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True)
+
+ def test_txEnq10_locCluster_route_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True)
+
+ def test_txEnq10_locCluster_queueRoute_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True)
+
+ def test_txEnq10_locCluster_queueRoute_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_route_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_route_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_queueRoute_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_queueRoute_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True)
+
+ def test_txEnq01_remCluster_route_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_remCluster_route_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_remCluster_queueRoute_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_remCluster_queueRoute_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq10_remCluster_route_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True)
+
+ def test_txEnq10_remCluster_route_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True)
+
+ def test_txEnq10_remCluster_queueRoute_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True)
+
+ def test_txEnq10_remCluster_queueRoute_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_remCluster_route_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_remCluster_route_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_remCluster_queueRoute_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_remCluster_queueRoute_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True)
+
+ def test_txEnq01_locCluster_remCluster_route_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_locCluster_remCluster_route_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_locCluster_remCluster_queueRoute_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_locCluster_remCluster_queueRoute_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq10_locCluster_remCluster_route_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq10_locCluster_remCluster_route_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq10_locCluster_remCluster_queueRoute_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq10_locCluster_remCluster_queueRoute_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_remCluster_route_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_remCluster_route_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
+ def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_durMsg_durQueue_topicExch(self):
+ self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True)
+
diff --git a/qpid/cpp/src/tests/ipv6_test b/qpid/cpp/src/tests/ipv6_test
new file mode 100755
index 0000000000..d75d50fd0a
--- /dev/null
+++ b/qpid/cpp/src/tests/ipv6_test
@@ -0,0 +1,150 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Run a simple test over IPv6
+source ./test_env.sh
+
+CONFIG=$(dirname $0)/config.null
+TEST_HOSTNAME=::1
+COUNT=10
+
+trap cleanup EXIT
+
+error() { echo $*; exit 1; }
+
+# Don't need --no-module-dir or --no-data-dir as they are set as env vars in test_env.sh
+COMMON_OPTS="--daemon --auth no --config $CONFIG"
+
+# Record all broker ports started
+unset PORTS
+declare -a PORTS
+
+# Start new brokers:
+# $1 must be integer
+# $2 = extra opts
+# Append used ports to PORTS variable
+start_brokers() {
+ local -a ports
+ for (( i=0; $i<$1; i++)) do
+ ports[$i]=$($QPIDD_EXEC --port 0 $COMMON_OPTS $2)
+ done
+ PORTS=( ${PORTS[@]} ${ports[@]} )
+}
+
+stop_brokers() {
+ for port in "${PORTS[@]}";
+ do
+ $QPIDD_EXEC -qp $port
+ done
+ PORTS=()
+}
+
+cleanup() {
+ stop_brokers
+}
+
+start_brokers 1
+PORT=${PORTS[0]}
+echo "Started IPv6 smoke perftest on broker port $PORT"
+
+## Test connection via connection settings
+./qpid-perftest --count ${COUNT} --port ${PORT} -b $TEST_HOSTNAME --summary
+
+## Test connection with a URL
+URL="amqp:[$TEST_HOSTNAME]:$PORT"
+
+./qpid-send -b $URL --content-string=hello -a "foo;{create:always}"
+MSG=`./qpid-receive -b $URL -a "foo;{create:always}" --messages 1`
+test "$MSG" = "hello" || { echo "receive failed '$MSG' != 'hello'"; exit 1; }
+
+stop_brokers
+
+# Federation smoke test follows
+
+# Start 2 brokers
+
+# In a distribution, the python tools will be absent.
+if [ ! -f $QPID_CONFIG_EXEC ] || [ ! -f $QPID_ROUTE_EXEC ] ; then
+ echo "python tools absent - skipping federation test."
+else
+
+ start_brokers 2
+ echo "Started Federated brokers on ports ${PORTS[*]}"
+ # Make broker urls
+ BROKER0="[::1]:${PORTS[0]}"
+ BROKER1="[::1]:${PORTS[1]}"
+ TEST_QUEUE=ipv6-fed-test
+
+ $QPID_CONFIG_EXEC -a $BROKER0 add queue $TEST_QUEUE
+ $QPID_CONFIG_EXEC -a $BROKER1 add queue $TEST_QUEUE
+ $QPID_ROUTE_EXEC dynamic add $BROKER1 $BROKER0 amq.direct
+ $QPID_CONFIG_EXEC -a $BROKER1 bind amq.direct $TEST_QUEUE $TEST_QUEUE
+ $QPID_ROUTE_EXEC route map $BROKER1
+
+ ./datagen --count 100 | tee rdata-in |
+ ./qpid-send -b amqp:$BROKER0 -a amq.direct/$TEST_QUEUE --content-stdin
+ ./qpid-receive -b amqp:$BROKER1 -a $TEST_QUEUE --print-content yes -m 0 > rdata-out
+
+ cmp rdata-in rdata-out || { echo "Federated data over IPv6 does not compare"; exit 1; }
+
+ stop_brokers
+ rm rdata-in rdata-out
+fi
+
+# Cluster smoke test follows
+test -z $CLUSTER_LIB && exit 0 # Exit if cluster not supported.
+
+## Test failover in a cluster using IPv6 only
+. $srcdir/ais_check # Will exit if clustering not enabled.
+
+pick_port() {
+ # We need a fixed port to set --cluster-url. Use qpidd to pick a free port.
+ # Note this method is racy
+ PICK=$($QPIDD_EXEC -dp0)
+ $QPIDD_EXEC -qp $PICK
+ echo $PICK
+}
+
+ssl_cluster_broker() { # $1 = port
+ $QPIDD_EXEC $COMMON_OPTS --load-module $CLUSTER_LIB --cluster-name ipv6_test.$HOSTNAME.$$ --cluster-url amqp:[$TEST_HOSTNAME]:$1 --port $1
+ # Wait for broker to be ready
+ ./qpid-ping -b $TEST_HOSTNAME -qp $1 || { echo "Cannot connect to broker on $1"; exit 1; }
+ echo "Running IPv6 cluster broker on port $1"
+}
+
+PORT1=`pick_port`; ssl_cluster_broker $PORT1
+PORT2=`pick_port`; ssl_cluster_broker $PORT2
+
+# Pipe receive output to uniq to remove duplicates
+./qpid-receive --connection-options "{reconnect:true, reconnect-timeout:5}" --failover-updates -b amqp:[$TEST_HOSTNAME]:$PORT1 -a "foo;{create:always}" -f | uniq > ssl_test_receive.tmp &
+
+./qpid-send -b amqp:[$TEST_HOSTNAME]:$PORT2 --content-string=one -a "foo;{create:always}"
+
+$QPIDD_EXEC -qp $PORT1 # Kill broker 1 receiver should fail-over.
+./qpid-send -b amqp:[$TEST_HOSTNAME]:$PORT2 --content-string=two -a "foo;{create:always}" --send-eos 1
+wait # Wait for qpid-receive
+{ echo one; echo two; } > ssl_test_receive.cmp
+diff ssl_test_receive.tmp ssl_test_receive.cmp || { echo "Failover failed"; exit 1; }
+
+$QPIDD_EXEC -qp $PORT2
+
+rm -f ssl_test_receive.*
+
diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark
index 6138108558..300d34774f 100755
--- a/qpid/cpp/src/tests/qpid-cpp-benchmark
+++ b/qpid/cpp/src/tests/qpid-cpp-benchmark
@@ -215,7 +215,8 @@ class ReadyReceiver:
raise Exception("Receiver error: %s"%(out))
raise Exception("Timed out waiting for receivers to be ready")
-def flatten(l): return sum(map(lambda s: s.split(","), l),[])
+def flatten(l):
+ return sum(map(lambda s: re.split(re.compile("\s*,\s*|\s+"), s), l), [])
class RoundRobin:
def __init__(self,items):
diff --git a/qpid/cpp/src/tests/qpid-perftest.cpp b/qpid/cpp/src/tests/qpid-perftest.cpp
index 8a5cf05775..3aff742c62 100644
--- a/qpid/cpp/src/tests/qpid-perftest.cpp
+++ b/qpid/cpp/src/tests/qpid-perftest.cpp
@@ -396,7 +396,7 @@ struct Controller : public Client {
void run() { // Controller
try {
// Wait for subscribers to be ready.
- process(opts.totalSubs, fqn("sub_ready"), bind(expect, _1, "ready"));
+ process(opts.totalSubs, fqn("sub_ready"), boost::bind(expect, _1, "ready"));
LocalQueue pubDone;
LocalQueue subDone;
@@ -510,10 +510,11 @@ struct PublishThread : public Client {
}
SubscriptionManager subs(session);
LocalQueue lq;
- subs.setFlowControl(1, SubscriptionManager::UNLIMITED, true);
- subs.subscribe(lq, fqn("pub_start"));
+ subs.setFlowControl(0, SubscriptionManager::UNLIMITED, false);
+ Subscription cs = subs.subscribe(lq, fqn("pub_start"));
for (size_t j = 0; j < opts.iterations; ++j) {
+ cs.grantMessageCredit(1);
expect(lq.pop().getData(), "start");
AbsTime start=now();
for (size_t i=0; i<opts.count; i++) {
diff --git a/qpid/cpp/src/tests/run_federation_sys_tests b/qpid/cpp/src/tests/run_federation_sys_tests
new file mode 100755
index 0000000000..f5f772d72e
--- /dev/null
+++ b/qpid/cpp/src/tests/run_federation_sys_tests
@@ -0,0 +1,97 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Run the federation system tests.
+
+source ./test_env.sh
+
+MODULENAME=federation_sys
+
+# Test for clustering
+ps -u root | grep 'aisexec\|corosync' > /dev/null
+if (( $? == 0 )); then
+ CLUSTERING_ENABLED=1
+else
+ echo "WARNING: No clustering detected; tests using it will be ignored."
+fi
+
+# Test for long test
+if [[ "$1" == "LONG_TEST" ]]; then
+ USE_LONG_TEST=1
+ shift # get rid of this param so it is not treated as a test name
+fi
+
+trap stop_brokers INT TERM QUIT
+
+SKIPTESTS="-i federation_sys.E_* -i federation_sys.F_* -i federation_sys.G_* -i federation_sys.H_*"
+if [ -z ${USE_LONG_TEST} ]; then
+ SKIPTESTS="-i federation_sys.A_Long* -i federation_sys.B_Long* ${SKIPTESTS}"
+fi
+echo "WARNING: Tests using persistence will be ignored."
+if [ -z ${CLUSTERING_ENABLED} ]; then
+ SKIPTESTS="${SKIPTESTS} -i federation_sys.C_* -i federation_sys.D_*"
+elif [ -z ${USE_LONG_TEST} ]; then
+ SKIPTESTS="${SKIPTESTS} -i federation_sys.C_Long* -i federation_sys.D_Long*"
+fi
+
+start_brokers() {
+ start_broker() {
+ ${QPIDD_EXEC} --daemon --port 0 --auth no --no-data-dir $1 > qpidd.port
+ PORT=`cat qpidd.port`
+ eval "$2=${PORT}"
+ }
+ start_broker "" LOCAL_PORT
+ start_broker "" REMOTE_PORT
+ if [ -n "${CLUSTERING_ENABLED}" ]; then
+ start_broker "--load-module ${CLUSTER_LIB} --cluster-name test-cluster-1" CLUSTER_C1_1
+ start_broker "--load-module ${CLUSTER_LIB} --cluster-name test-cluster-1" CLUSTER_C1_2
+ start_broker "--load-module ${CLUSTER_LIB} --cluster-name test-cluster-2" CLUSTER_C2_1
+ start_broker "--load-module ${CLUSTER_LIB} --cluster-name test-cluster-2" CLUSTER_C2_2
+ fi
+ rm qpidd.port
+}
+
+stop_brokers() {
+ ${QPIDD_EXEC} -q --port ${LOCAL_PORT}
+ ${QPIDD_EXEC} -q --port ${REMOTE_PORT}
+ if [ -n "${CLUSTERING_ENABLED}" ]; then
+ ${QPID_CLUSTER_EXEC} --all-stop --force localhost:${CLUSTER_C1_1}
+ ${QPID_CLUSTER_EXEC} --all-stop --force localhost:${CLUSTER_C2_1}
+ fi
+}
+
+if test -d ${PYTHON_DIR} ; then
+ start_brokers
+ if [ -z ${CLUSTERING_ENABLED} ]; then
+ echo "Running federation tests using brokers on local port ${LOCAL_PORT}, remote port ${REMOTE_PORT} (NOTE: clustering is DISABLED)"
+ else
+ echo "Running federation tests using brokers on local port ${LOCAL_PORT}, remote port ${REMOTE_PORT}, local cluster nodes ${CLUSTER_C1_1} ${CLUSTER_C1_2}, remote cluster nodes ${CLUSTER_C2_1} ${CLUSTER_C2_2}"
+ fi
+ if [ -z ${USE_LONG_TEST} ]; then
+ echo "NOTE: To run a full set of federation system tests, use \"make check-long\". To test with persistence, run the store version of this script."
+ fi
+ ${QPID_PYTHON_TEST} -m ${MODULENAME} ${SKIPTESTS} -b localhost:${REMOTE_PORT} -Dlocal-port=${LOCAL_PORT} -Dremote-port=${REMOTE_PORT} -Dlocal-cluster-ports="${CLUSTER_C1_1} ${CLUSTER_C1_2}" -Dremote-cluster-ports="${CLUSTER_C2_1} ${CLUSTER_C2_2}" $@
+ RETCODE=$?
+ stop_brokers
+ if test x${RETCODE} != x0; then
+ echo "FAIL federation tests"; exit 1;
+ fi
+fi
diff --git a/qpid/cpp/src/tests/run_federation_tests b/qpid/cpp/src/tests/run_federation_tests
index 590f74746e..14af4807ba 100755
--- a/qpid/cpp/src/tests/run_federation_tests
+++ b/qpid/cpp/src/tests/run_federation_tests
@@ -55,7 +55,7 @@ stop_brokers() {
if test -d ${PYTHON_DIR} ; then
start_brokers
echo "Running federation tests using brokers on ports $LOCAL_PORT $REMOTE_PORT $REMOTE_B1 $REMOTE_B2"
- $QPID_PYTHON_TEST -m federation $SKIPTESTS -b localhost:$LOCAL_PORT -Dremote-port=$REMOTE_PORT -Dextra-brokers="$REMOTE_B1 $REMOTE_B2" $@
+ $QPID_PYTHON_TEST -m federation "$SKIPTESTS" -b localhost:$LOCAL_PORT -Dremote-port=$REMOTE_PORT -Dextra-brokers="$REMOTE_B1 $REMOTE_B2" $@
RETCODE=$?
stop_brokers
if test x$RETCODE != x0; then
diff --git a/qpid/cpp/src/tests/run_long_federation_sys_tests b/qpid/cpp/src/tests/run_long_federation_sys_tests
new file mode 100644
index 0000000000..69dc08d11c
--- /dev/null
+++ b/qpid/cpp/src/tests/run_long_federation_sys_tests
@@ -0,0 +1,24 @@
+#! /bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Run the federation system tests (long version).
+
+./run_federation_sys_tests LONG_TEST $@
diff --git a/qpid/cpp/src/tests/run_store_tests.ps1 b/qpid/cpp/src/tests/run_store_tests.ps1
index 76b46737f0..b2f0b1ccd8 100644
--- a/qpid/cpp/src/tests/run_store_tests.ps1
+++ b/qpid/cpp/src/tests/run_store_tests.ps1
@@ -111,7 +111,7 @@ Invoke-Expression "$prog --quit --port $env:QPID_PORT" | Write-Output
# Test 2... store.py starts/stops/restarts its own brokers
$tests = "*"
-$env:PYTHONPATH="$PYTHON_DIR;$srcdir"
+$env:PYTHONPATH="$PYTHON_DIR;$QMF_LIB;$srcdir"
$env:QPIDD_EXEC="$prog"
$env:STORE_LIB="$store_dir\store$suffix.dll"
if ($test_store -eq "MSSQL") {
diff --git a/qpid/cpp/src/tests/sasl_test_setup.sh b/qpid/cpp/src/tests/sasl_test_setup.sh
index 6395ba6ec3..3e69c0f02b 100755
--- a/qpid/cpp/src/tests/sasl_test_setup.sh
+++ b/qpid/cpp/src/tests/sasl_test_setup.sh
@@ -30,6 +30,7 @@ pwcheck_method: auxprop
auxprop_plugin: sasldb
sasldb_path: $PWD/sasl_config/qpidd.sasldb
sql_select: dummy select
+mech_list: ANONYMOUS PLAIN DIGEST-MD5 EXTERNAL
EOF
# Populate temporary sasl db.