From 69ae049561adde6b053c5906d0514c375814cae7 Mon Sep 17 00:00:00 2001 From: Kenneth Anthony Giusti Date: Mon, 19 Sep 2011 15:13:18 +0000 Subject: QPID-3346: merge in latest from trunk (r1172628) git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1172657 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/BrokerFixture.h | 32 +- qpid/cpp/src/tests/CMakeLists.txt | 11 +- qpid/cpp/src/tests/ClientSessionTest.cpp | 26 +- qpid/cpp/src/tests/ExchangeTest.cpp | 2 +- qpid/cpp/src/tests/Makefile.am | 7 +- qpid/cpp/src/tests/MessageReplayTracker.cpp | 4 +- qpid/cpp/src/tests/MessagingSessionTests.cpp | 36 + qpid/cpp/src/tests/Qmf2.cpp | 104 +- qpid/cpp/src/tests/QueueEvents.cpp | 4 +- qpid/cpp/src/tests/QueuePolicyTest.cpp | 14 +- qpid/cpp/src/tests/QueueTest.cpp | 60 +- qpid/cpp/src/tests/ReplicationTest.cpp | 2 +- qpid/cpp/src/tests/SessionState.cpp | 8 +- qpid/cpp/src/tests/SocketProxy.h | 183 -- qpid/cpp/src/tests/TxPublishTest.cpp | 3 +- qpid/cpp/src/tests/Url.cpp | 26 + qpid/cpp/src/tests/XmlClientSessionTest.cpp | 2 +- qpid/cpp/src/tests/allhosts | 4 +- qpid/cpp/src/tests/brokertest.py | 33 +- .../cpp/src/tests/cluster_python_tests_failing.txt | 28 - qpid/cpp/src/tests/cluster_test_logs.py | 1 + qpid/cpp/src/tests/cluster_tests.py | 387 +++- qpid/cpp/src/tests/exception_test.cpp | 14 +- qpid/cpp/src/tests/federated_topic_test | 27 +- qpid/cpp/src/tests/federation.py | 85 +- qpid/cpp/src/tests/federation_sys.py | 1900 ++++++++++++++++++++ qpid/cpp/src/tests/ipv6_test | 150 ++ qpid/cpp/src/tests/qpid-cpp-benchmark | 3 +- qpid/cpp/src/tests/qpid-perftest.cpp | 7 +- qpid/cpp/src/tests/run_federation_sys_tests | 97 + qpid/cpp/src/tests/run_federation_tests | 2 +- qpid/cpp/src/tests/run_long_federation_sys_tests | 24 + qpid/cpp/src/tests/run_store_tests.ps1 | 2 +- qpid/cpp/src/tests/sasl_test_setup.sh | 1 + 34 files changed, 2913 insertions(+), 376 deletions(-) delete mode 100644 qpid/cpp/src/tests/SocketProxy.h create mode 100755 qpid/cpp/src/tests/federation_sys.py create mode 100755 qpid/cpp/src/tests/ipv6_test create mode 100755 qpid/cpp/src/tests/run_federation_sys_tests create mode 100644 qpid/cpp/src/tests/run_long_federation_sys_tests (limited to 'qpid/cpp/src/tests') diff --git a/qpid/cpp/src/tests/BrokerFixture.h b/qpid/cpp/src/tests/BrokerFixture.h index 672d954572..92c6d22b57 100644 --- a/qpid/cpp/src/tests/BrokerFixture.h +++ b/qpid/cpp/src/tests/BrokerFixture.h @@ -22,8 +22,6 @@ * */ -#include "SocketProxy.h" - #include "qpid/broker/Broker.h" #include "qpid/client/Connection.h" #include "qpid/client/ConnectionImpl.h" @@ -71,16 +69,15 @@ struct BrokerFixture : private boost::noncopyable { brokerThread = qpid::sys::Thread(*broker); }; - void shutdownBroker() - { - broker->shutdown(); - broker = BrokerPtr(); + void shutdownBroker() { + if (broker) { + broker->shutdown(); + brokerThread.join(); + broker = BrokerPtr(); + } } - ~BrokerFixture() { - if (broker) broker->shutdown(); - brokerThread.join(); - } + ~BrokerFixture() { shutdownBroker(); } /** Open a connection to the broker. */ void open(qpid::client::Connection& c) { @@ -97,20 +94,6 @@ struct LocalConnection : public qpid::client::Connection { ~LocalConnection() { close(); } }; -/** A local client connection via a socket proxy. */ -struct ProxyConnection : public qpid::client::Connection { - SocketProxy proxy; - ProxyConnection(int brokerPort) : proxy(brokerPort) { - open("localhost", proxy.getPort()); - } - ProxyConnection(const qpid::client::ConnectionSettings& s) : proxy(s.port) { - qpid::client::ConnectionSettings proxySettings(s); - proxySettings.port = proxy.getPort(); - open(proxySettings); - } - ~ProxyConnection() { close(); } -}; - /** Convenience class to create and open a connection and session * and some related useful objects. */ @@ -147,7 +130,6 @@ struct SessionFixtureT : BrokerFixture, ClientT { }; typedef SessionFixtureT SessionFixture; -typedef SessionFixtureT ProxySessionFixture; }} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt index 405718f12b..cc33478114 100644 --- a/qpid/cpp/src/tests/CMakeLists.txt +++ b/qpid/cpp/src/tests/CMakeLists.txt @@ -264,6 +264,14 @@ add_executable (qpid-send qpid-send.cpp Statistics.cpp ${platform_test_additions target_link_libraries (qpid-send qpidmessaging) remember_location(qpid-send) +add_executable (qpid-ping qpid-ping.cpp ${platform_test_additions}) +target_link_libraries (qpid-ping qpidclient) +remember_location(qpid-ping) + +add_executable (datagen datagen.cpp ${platform_test_additions}) +target_link_libraries (datagen qpidclient) +remember_location(datagen) + # qpid-perftest and qpid-latency-test are generally useful so install them install (TARGETS qpid-perftest qpid-latency-test RUNTIME DESTINATION ${QPID_INSTALL_BINDIR}) @@ -278,7 +286,7 @@ set(test_wrap ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_test${test_script_suffix} add_test (unit_test ${test_wrap} ${unit_test_LOCATION}) add_test (start_broker ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/start_broker${test_script_suffix}) -add_test (qpid-client-test ${test_wrap} ${qpid-client_test_LOCATION}) +add_test (qpid-client-test ${test_wrap} ${qpid-client-test_LOCATION}) add_test (quick_perftest ${test_wrap} ${qpid-perftest_LOCATION} --summary --count 100) add_test (quick_topictest ${test_wrap} ${CMAKE_CURRENT_SOURCE_DIR}/quick_topictest${test_script_suffix}) add_test (quick_txtest ${test_wrap} ${qpid-txtest_LOCATION} --queues 4 --tx-count 10 --quiet) @@ -288,6 +296,7 @@ if (PYTHON_EXECUTABLE) endif (PYTHON_EXECUTABLE) add_test (stop_broker ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/stop_broker${test_script_suffix}) if (PYTHON_EXECUTABLE) + add_test (ipv6_test ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/ipv6_test${test_script_suffix}) add_test (federation_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_federation_tests${test_script_suffix}) if (BUILD_ACL) add_test (acl_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_acl_tests${test_script_suffix}) diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp index 3c0cff7350..30441cd03c 100644 --- a/qpid/cpp/src/tests/ClientSessionTest.cpp +++ b/qpid/cpp/src/tests/ClientSessionTest.cpp @@ -102,9 +102,9 @@ struct SimpleListener : public MessageListener } }; -struct ClientSessionFixture : public ProxySessionFixture +struct ClientSessionFixture : public SessionFixture { - ClientSessionFixture(Broker::Options opts = Broker::Options()) : ProxySessionFixture(opts) { + ClientSessionFixture(Broker::Options opts = Broker::Options()) : SessionFixture(opts) { session.queueDeclare(arg::queue="my-queue"); } }; @@ -150,16 +150,6 @@ QPID_AUTO_TEST_CASE(testDispatcherThread) BOOST_CHECK_EQUAL(boost::lexical_cast(i), listener.messages[i].getData()); } -// FIXME aconway 2009-06-17: test for unimplemented feature, enable when implemented. -void testSuspend0Timeout() { - ClientSessionFixture fix; - fix.session.suspend(); // session has 0 timeout. - try { - fix.connection.resume(fix.session); - BOOST_FAIL("Expected InvalidArgumentException."); - } catch(const InternalErrorException&) {} -} - QPID_AUTO_TEST_CASE(testUseSuspendedError) { ClientSessionFixture fix; @@ -171,18 +161,6 @@ QPID_AUTO_TEST_CASE(testUseSuspendedError) } catch(const NotAttachedException&) {} } -// FIXME aconway 2009-06-17: test for unimplemented feature, enable when implemented. -void testSuspendResume() { - ClientSessionFixture fix; - fix.session.timeout(60); - fix.session.suspend(); - // Make sure we are still subscribed after resume. - fix.connection.resume(fix.session); - fix.session.messageTransfer(arg::content=Message("my-message", "my-queue")); - BOOST_CHECK_EQUAL("my-message", fix.subs.get("my-queue", TIME_SEC).getData()); -} - - QPID_AUTO_TEST_CASE(testSendToSelf) { ClientSessionFixture fix; SimpleListener mylistener; diff --git a/qpid/cpp/src/tests/ExchangeTest.cpp b/qpid/cpp/src/tests/ExchangeTest.cpp index 88a1cd99c2..fe72f42a46 100644 --- a/qpid/cpp/src/tests/ExchangeTest.cpp +++ b/qpid/cpp/src/tests/ExchangeTest.cpp @@ -253,7 +253,7 @@ QPID_AUTO_TEST_CASE(testIVEOption) TopicExchange topic ("topic1", false, args); intrusive_ptr msg1 = cmessage("direct1", "abc"); - msg1->getProperties()->getApplicationHeaders().setString("a", "abc"); + msg1->insertCustomProperty("a", "abc"); DeliverableMessage dmsg1(msg1); FieldTable args2; diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index dceabe36cc..78ac6db5f1 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -304,9 +304,9 @@ TESTS_ENVIRONMENT = \ system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest \ run_msg_group_tests -TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests \ +TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_federation_sys_tests \ run_acl_tests run_cli_tests replication_test dynamic_log_level_test \ - run_queue_flow_limit_tests + run_queue_flow_limit_tests ipv6_test EXTRA_DIST += \ run_test vg_check \ @@ -321,6 +321,8 @@ EXTRA_DIST += \ config.null \ ais_check \ run_federation_tests \ + run_federation_sys_tests \ + run_long_federation_sys_tests \ run_cli_tests \ run_acl_tests \ .valgrind.supp \ @@ -362,6 +364,7 @@ LONG_TESTS+=start_broker \ fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test \ run_msg_groups_tests_soak \ stop_broker \ + run_long_federation_sys_tests \ run_failover_soak reliable_replication_test \ federated_cluster_test_with_node_failure diff --git a/qpid/cpp/src/tests/MessageReplayTracker.cpp b/qpid/cpp/src/tests/MessageReplayTracker.cpp index 3d79ee53c2..e35f673683 100644 --- a/qpid/cpp/src/tests/MessageReplayTracker.cpp +++ b/qpid/cpp/src/tests/MessageReplayTracker.cpp @@ -51,7 +51,7 @@ class ReplayBufferChecker QPID_AUTO_TEST_CASE(testReplay) { - ProxySessionFixture fix; + SessionFixture fix; fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true); MessageReplayTracker tracker(10); @@ -77,7 +77,7 @@ QPID_AUTO_TEST_CASE(testReplay) QPID_AUTO_TEST_CASE(testCheckCompletion) { - ProxySessionFixture fix; + SessionFixture fix; fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true); MessageReplayTracker tracker(10); diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index fae45a94d0..418653978b 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -611,6 +611,28 @@ QPID_AUTO_TEST_CASE(testAssertPolicyQueue) fix.admin.deleteQueue("q"); } +QPID_AUTO_TEST_CASE(testAssertExchangeOption) +{ + MessagingFixture fix; + std::string a1 = "e; {create:always, assert:always, node:{type:topic, x-declare:{type:direct, arguments:{qpid.msg_sequence:True}}}}"; + Sender s1 = fix.session.createSender(a1); + s1.close(); + Receiver r1 = fix.session.createReceiver(a1); + r1.close(); + + std::string a2 = "e; {assert:receiver, node:{type:topic, x-declare:{type:fanout, arguments:{qpid.msg_sequence:True}}}}"; + Sender s2 = fix.session.createSender(a2); + s2.close(); + BOOST_CHECK_THROW(fix.session.createReceiver(a2), qpid::messaging::AssertionFailed); + + std::string a3 = "e; {assert:sender, node:{x-declare:{arguments:{qpid.msg_sequence:False}}}}"; + BOOST_CHECK_THROW(fix.session.createSender(a3), qpid::messaging::AssertionFailed); + Receiver r3 = fix.session.createReceiver(a3); + r3.close(); + + fix.admin.deleteExchange("e"); +} + QPID_AUTO_TEST_CASE(testGetSender) { QueueFixture fix; @@ -1064,6 +1086,20 @@ QPID_AUTO_TEST_CASE(testAcknowledgeUpTo) BOOST_CHECK(!fix.session.createReceiver(fix.queue).fetch(m, Duration::IMMEDIATE)); } +QPID_AUTO_TEST_CASE(testCreateBindingsOnStandardExchange) +{ + QueueFixture fix; + Sender sender = fix.session.createSender((boost::format("amq.direct; {create:always, node:{type:topic, x-bindings:[{queue:%1%, key:my-subject}]}}") % fix.queue).str()); + Message out("test-message"); + out.setSubject("my-subject"); + sender.send(out); + Receiver receiver = fix.session.createReceiver(fix.queue); + Message in = receiver.fetch(Duration::SECOND * 5); + fix.session.acknowledge(); + BOOST_CHECK_EQUAL(in.getContent(), out.getContent()); + BOOST_CHECK_EQUAL(in.getSubject(), out.getSubject()); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/Qmf2.cpp b/qpid/cpp/src/tests/Qmf2.cpp index 66c774accd..bc263d5c6d 100644 --- a/qpid/cpp/src/tests/Qmf2.cpp +++ b/qpid/cpp/src/tests/Qmf2.cpp @@ -23,12 +23,36 @@ #include "qmf/QueryImpl.h" #include "qmf/SchemaImpl.h" #include "qmf/exceptions.h" - +#include "qpid/messaging/Connection.h" +#include "qmf/PosixEventNotifierImpl.h" +#include "qmf/AgentSession.h" +#include "qmf/AgentSessionImpl.h" +#include "qmf/ConsoleSession.h" +#include "qmf/ConsoleSessionImpl.h" #include "unit_test.h" +using namespace std; using namespace qpid::types; +using namespace qpid::messaging; using namespace qmf; +bool isReadable(int fd) +{ + fd_set rfds; + struct timeval tv; + int nfds, result; + + FD_ZERO(&rfds); + FD_SET(fd, &rfds); + nfds = fd + 1; + tv.tv_sec = 0; + tv.tv_usec = 0; + + result = select(nfds, &rfds, NULL, NULL, &tv); + + return result > 0; +} + namespace qpid { namespace tests { @@ -315,6 +339,84 @@ QPID_AUTO_TEST_CASE(testSchema) BOOST_CHECK_THROW(method.getArgument(3), QmfException); } +QPID_AUTO_TEST_CASE(testAgentSessionEventListener) +{ + Connection connection("localhost"); + AgentSession session(connection, ""); + posix::EventNotifier notifier(session); + + AgentSessionImpl& sessionImpl = AgentSessionImplAccess::get(session); + + BOOST_CHECK(sessionImpl.getEventNotifier() != 0); +} + +QPID_AUTO_TEST_CASE(testConsoleSessionEventListener) +{ + Connection connection("localhost"); + ConsoleSession session(connection, ""); + posix::EventNotifier notifier(session); + + ConsoleSessionImpl& sessionImpl = ConsoleSessionImplAccess::get(session); + + BOOST_CHECK(sessionImpl.getEventNotifier() != 0); +} + +QPID_AUTO_TEST_CASE(testGetHandle) +{ + Connection connection("localhost"); + ConsoleSession session(connection, ""); + posix::EventNotifier notifier(session); + + BOOST_CHECK(notifier.getHandle() > 0); +} + +QPID_AUTO_TEST_CASE(testSetReadableToFalse) +{ + Connection connection("localhost"); + ConsoleSession session(connection, ""); + posix::EventNotifier notifier(session); + PosixEventNotifierImplAccess::get(notifier).setReadable(false); + + bool readable(isReadable(notifier.getHandle())); + BOOST_CHECK(!readable); +} + +QPID_AUTO_TEST_CASE(testSetReadable) +{ + Connection connection("localhost"); + ConsoleSession session(connection, ""); + posix::EventNotifier notifier(session); + PosixEventNotifierImplAccess::get(notifier).setReadable(true); + + bool readable(isReadable(notifier.getHandle())); + BOOST_CHECK(readable); +} + +QPID_AUTO_TEST_CASE(testSetReadableMultiple) +{ + Connection connection("localhost"); + ConsoleSession session(connection, ""); + posix::EventNotifier notifier(session); + for (int i = 0; i < 15; i++) + PosixEventNotifierImplAccess::get(notifier).setReadable(true); + PosixEventNotifierImplAccess::get(notifier).setReadable(false); + + bool readable(isReadable(notifier.getHandle())); + BOOST_CHECK(!readable); +} + +QPID_AUTO_TEST_CASE(testDeleteNotifier) +{ + Connection connection("localhost"); + ConsoleSession session(connection, ""); + ConsoleSessionImpl& sessionImpl = ConsoleSessionImplAccess::get(session); + { + posix::EventNotifier notifier(session); + BOOST_CHECK(sessionImpl.getEventNotifier() != 0); + } + BOOST_CHECK(sessionImpl.getEventNotifier() == 0); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/QueueEvents.cpp b/qpid/cpp/src/tests/QueueEvents.cpp index bd18fa45fb..cea8bbf0db 100644 --- a/qpid/cpp/src/tests/QueueEvents.cpp +++ b/qpid/cpp/src/tests/QueueEvents.cpp @@ -147,7 +147,7 @@ struct EventRecorder QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing) { - ProxySessionFixture fixture; + SessionFixture fixture; //register dummy event listener to broker EventRecorder listener; fixture.broker->getQueueEvents().registerListener("recorder", boost::bind(&EventRecorder::handle, &listener, _1)); @@ -194,7 +194,7 @@ QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing) QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing_enqueuesOnly) { - ProxySessionFixture fixture; + SessionFixture fixture; //register dummy event listener to broker EventRecorder listener; fixture.broker->getQueueEvents().registerListener("recorder", boost::bind(&EventRecorder::handle, &listener, _1)); diff --git a/qpid/cpp/src/tests/QueuePolicyTest.cpp b/qpid/cpp/src/tests/QueuePolicyTest.cpp index 5455105078..f735e09449 100644 --- a/qpid/cpp/src/tests/QueuePolicyTest.cpp +++ b/qpid/cpp/src/tests/QueuePolicyTest.cpp @@ -152,7 +152,7 @@ QPID_AUTO_TEST_CASE(testRingPolicyCount) std::auto_ptr policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING); policy->update(args); - ProxySessionFixture f; + SessionFixture f; std::string q("my-ring-queue"); f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args); for (int i = 0; i < 10; i++) { @@ -187,7 +187,7 @@ QPID_AUTO_TEST_CASE(testRingPolicySize) std::auto_ptr policy = QueuePolicy::createQueuePolicy("test", 0, 500, QueuePolicy::RING); policy->update(args); - ProxySessionFixture f; + SessionFixture f; std::string q("my-ring-queue"); f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args); @@ -259,7 +259,7 @@ QPID_AUTO_TEST_CASE(testStrictRingPolicy) std::auto_ptr policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING_STRICT); policy->update(args); - ProxySessionFixture f; + SessionFixture f; std::string q("my-ring-queue"); f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args); LocalQueue incoming; @@ -285,7 +285,7 @@ QPID_AUTO_TEST_CASE(testPolicyWithDtx) std::auto_ptr policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT); policy->update(args); - ProxySessionFixture f; + SessionFixture f; std::string q("my-policy-queue"); f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args); LocalQueue incoming; @@ -345,7 +345,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskWithNoStore) // Disable flow control, or else we'll never hit the max limit args.setInt(QueueFlowLimit::flowStopCountKey, 0); - ProxySessionFixture f; + SessionFixture f; std::string q("my-queue"); f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args); LocalQueue incoming; @@ -371,7 +371,7 @@ QPID_AUTO_TEST_CASE(testPolicyFailureOnCommit) std::auto_ptr policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT); policy->update(args); - ProxySessionFixture f; + SessionFixture f; std::string q("q"); f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args); f.session.txSelect(); @@ -388,7 +388,7 @@ QPID_AUTO_TEST_CASE(testCapacityConversion) args.setString("qpid.max_count", "5"); args.setString("qpid.flow_stop_count", "0"); - ProxySessionFixture f; + SessionFixture f; std::string q("q"); f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args); for (int i = 0; i < 5; i++) { diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index 98fbc2cba4..5274f2370d 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -81,13 +81,14 @@ public: Message& getMessage() { return *(msg.get()); } }; -intrusive_ptr create_message(std::string exchange, std::string routingKey) { +intrusive_ptr create_message(std::string exchange, std::string routingKey, uint64_t ttl = 0) { intrusive_ptr msg(new Message()); AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0))); AMQFrame header((AMQHeaderBody())); msg->getFrames().append(method); msg->getFrames().append(header); msg->getFrames().getHeaders()->get(true)->setRoutingKey(routingKey); + if (ttl) msg->getFrames().getHeaders()->get(true)->setTtl(ttl); return msg; } @@ -441,10 +442,10 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){ BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); - msg1->getProperties()->getApplicationHeaders().setString(key,"a"); - msg2->getProperties()->getApplicationHeaders().setString(key,"b"); - msg3->getProperties()->getApplicationHeaders().setString(key,"c"); - msg4->getProperties()->getApplicationHeaders().setString(key,"a"); + msg1->insertCustomProperty(key,"a"); + msg2->insertCustomProperty(key,"b"); + msg3->insertCustomProperty(key,"c"); + msg4->insertCustomProperty(key,"a"); //enqueue 4 message queue->deliver(msg1); @@ -466,9 +467,9 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){ intrusive_ptr msg5 = create_message("e", "A"); intrusive_ptr msg6 = create_message("e", "B"); intrusive_ptr msg7 = create_message("e", "C"); - msg5->getProperties()->getApplicationHeaders().setString(key,"a"); - msg6->getProperties()->getApplicationHeaders().setString(key,"b"); - msg7->getProperties()->getApplicationHeaders().setString(key,"c"); + msg5->insertCustomProperty(key,"a"); + msg6->insertCustomProperty(key,"b"); + msg7->insertCustomProperty(key,"c"); queue->deliver(msg5); queue->deliver(msg6); queue->deliver(msg7); @@ -503,7 +504,7 @@ QPID_AUTO_TEST_CASE(testLVQEmptyKey){ BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); - msg1->getProperties()->getApplicationHeaders().setString(key,"a"); + msg1->insertCustomProperty(key,"a"); queue->deliver(msg1); queue->deliver(msg2); BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u); @@ -535,12 +536,12 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){ BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); - msg1->getProperties()->getApplicationHeaders().setString(key,"a"); - msg2->getProperties()->getApplicationHeaders().setString(key,"b"); - msg3->getProperties()->getApplicationHeaders().setString(key,"c"); - msg4->getProperties()->getApplicationHeaders().setString(key,"a"); - msg5->getProperties()->getApplicationHeaders().setString(key,"b"); - msg6->getProperties()->getApplicationHeaders().setString(key,"c"); + msg1->insertCustomProperty(key,"a"); + msg2->insertCustomProperty(key,"b"); + msg3->insertCustomProperty(key,"c"); + msg4->insertCustomProperty(key,"a"); + msg5->insertCustomProperty(key,"b"); + msg6->insertCustomProperty(key,"c"); //enqueue 4 message queue->deliver(msg1); @@ -605,8 +606,8 @@ QPID_AUTO_TEST_CASE(testLVQMultiQueue){ args.getLVQKey(key); BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); - msg1->getProperties()->getApplicationHeaders().setString(key,"a"); - msg2->getProperties()->getApplicationHeaders().setString(key,"a"); + msg1->insertCustomProperty(key,"a"); + msg2->insertCustomProperty(key,"a"); queue1->deliver(msg1); queue2->deliver(msg1); @@ -649,8 +650,8 @@ QPID_AUTO_TEST_CASE(testLVQRecover){ args.getLVQKey(key); BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); - msg1->getProperties()->getApplicationHeaders().setString(key,"a"); - msg2->getProperties()->getApplicationHeaders().setString(key,"a"); + msg1->insertCustomProperty(key,"a"); + msg2->insertCustomProperty(key,"a"); // 3 queue1->deliver(msg1); // 4 @@ -670,12 +671,7 @@ QPID_AUTO_TEST_CASE(testLVQRecover){ void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0) { for (uint i = 0; i < count; i++) { - intrusive_ptr m = create_message("exchange", "key"); - if (i % 2) { - if (oddTtl) m->getProperties()->setTtl(oddTtl); - } else { - if (evenTtl) m->getProperties()->setTtl(evenTtl); - } + intrusive_ptr m = create_message("exchange", "key", i % 2 ? oddTtl : evenTtl); m->setTimestamp(new broker::ExpiryPolicy); queue.deliver(m); } @@ -738,8 +734,8 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { std::string("c"), std::string("c"), std::string("c") }; for (int i = 0; i < 9; ++i) { intrusive_ptr msg = create_message("e", "A"); - msg->getProperties()->getApplicationHeaders().setString("GROUP-ID", groups[i]); - msg->getProperties()->getApplicationHeaders().setInt("MY-ID", i); + msg->insertCustomProperty("GROUP-ID", groups[i]); + msg->insertCustomProperty("MY-ID", i); queue->deliver(msg); } @@ -885,8 +881,8 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { // Owners= ^C3, intrusive_ptr msg = create_message("e", "A"); - msg->getProperties()->getApplicationHeaders().setString("GROUP-ID", "a"); - msg->getProperties()->getApplicationHeaders().setInt("MY-ID", 9); + msg->insertCustomProperty("GROUP-ID", "a"); + msg->insertCustomProperty("MY-ID", 9); queue->deliver(msg); // Queue = a-2, a-9 @@ -896,8 +892,8 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { BOOST_CHECK( !gotOne ); msg = create_message("e", "A"); - msg->getProperties()->getApplicationHeaders().setString("GROUP-ID", "b"); - msg->getProperties()->getApplicationHeaders().setInt("MY-ID", 10); + msg->insertCustomProperty("GROUP-ID", "b"); + msg->insertCustomProperty("MY-ID", 10); queue->deliver(msg); // Queue = a-2, a-9, b-10 @@ -927,7 +923,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) { for (int i = 0; i < 3; ++i) { intrusive_ptr msg = create_message("e", "A"); // no "GROUP-ID" header - msg->getProperties()->getApplicationHeaders().setInt("MY-ID", i); + msg->insertCustomProperty("MY-ID", i); queue->deliver(msg); } diff --git a/qpid/cpp/src/tests/ReplicationTest.cpp b/qpid/cpp/src/tests/ReplicationTest.cpp index 7310a3fe20..1219a6b59e 100644 --- a/qpid/cpp/src/tests/ReplicationTest.cpp +++ b/qpid/cpp/src/tests/ReplicationTest.cpp @@ -74,7 +74,7 @@ QPID_AUTO_TEST_CASE(testReplicationExchange) { qpid::broker::Broker::Options brokerOpts(getBrokerOpts(list_of("qpidd") ("--replication-exchange-name=qpid.replication"))); - ProxySessionFixture f(brokerOpts); + SessionFixture f(brokerOpts); std::string dataQ("queue-1"); diff --git a/qpid/cpp/src/tests/SessionState.cpp b/qpid/cpp/src/tests/SessionState.cpp index 157cabfb63..3be9bb0cbc 100644 --- a/qpid/cpp/src/tests/SessionState.cpp +++ b/qpid/cpp/src/tests/SessionState.cpp @@ -43,7 +43,7 @@ using namespace qpid::framing; // Apply f to [begin, end) and accumulate the result template T applyAccumulate(Iter begin, Iter end, T seed, const F& f) { - return std::accumulate(begin, end, seed, bind(std::plus(), _1, bind(f, _2))); + return std::accumulate(begin, end, seed, boost::bind(std::plus(), _1, boost::bind(f, _2))); } // Create a frame with a one-char string. @@ -105,8 +105,8 @@ size_t transferN(qpid::SessionState& s, string content) { char last = content[content.size()-1]; content.resize(content.size()-1); size += applyAccumulate(content.begin(), content.end(), 0, - bind(&send, ref(s), - bind(contentFrameChar, _1, false))); + boost::bind(&send, boost::ref(s), + boost::bind(contentFrameChar, _1, false))); size += send(s, contentFrameChar(last, true)); } return size; @@ -115,7 +115,7 @@ size_t transferN(qpid::SessionState& s, string content) { // Send multiple transfers with single-byte content. size_t transfers(qpid::SessionState& s, string content) { return applyAccumulate(content.begin(), content.end(), 0, - bind(transfer1Char, ref(s), _1)); + boost::bind(transfer1Char, boost::ref(s), _1)); } size_t contentFrameSize(size_t n=1) { return AMQFrame(( AMQContentBody())).encodedSize() + n; } diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h deleted file mode 100644 index d195f11aa9..0000000000 --- a/qpid/cpp/src/tests/SocketProxy.h +++ /dev/null @@ -1,183 +0,0 @@ -#ifndef SOCKETPROXY_H -#define SOCKETPROXY_H - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/IOHandle.h" -#ifdef _WIN32 -# include "qpid/sys/windows/IoHandlePrivate.h" - typedef SOCKET FdType; -#else -# include "qpid/sys/posix/PrivatePosix.h" - typedef int FdType; -#endif -#include "qpid/sys/Socket.h" -#include "qpid/sys/Runnable.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/Mutex.h" -#include "qpid/log/Statement.h" - -#include - -namespace qpid { -namespace tests { - -/** - * A simple socket proxy that forwards to another socket. - * Used between client & local broker to simulate network failures. - */ -class SocketProxy : private qpid::sys::Runnable -{ - // Need a Socket we can get the fd from - class LowSocket : public qpid::sys::Socket { - public: -#ifdef _WIN32 - FdType getFd() { return toSocketHandle(*this); } -#else - FdType getFd() { return toFd(impl); } -#endif - }; - - public: - /** Connect to connectPort on host, start a forwarding thread. - * Listen for connection on getPort(). - */ - SocketProxy(int connectPort, const std::string host="localhost") - : closed(false), joined(true), - port(listener.listen()), dropClient(), dropServer() - { - client.connect(host, boost::lexical_cast(connectPort)); - joined = false; - thread = qpid::sys::Thread(static_cast(this)); - } - - ~SocketProxy() { close(); if (!joined) thread.join(); } - - /** Simulate a network disconnect. */ - void close() { - { - qpid::sys::Mutex::ScopedLock l(lock); - if (closed) { return; } - closed=true; - } - if (thread && thread != qpid::sys::Thread::current()) { - thread.join(); - joined = true; - } - client.close(); - } - - /** Simulate lost packets, drop data from client */ - void dropClientData(bool drop=true) { dropClient=drop; } - - /** Simulate lost packets, drop data from server */ - void dropServerData(bool drop=true) { dropServer=drop; } - - bool isClosed() const { - qpid::sys::Mutex::ScopedLock l(lock); - return closed; - } - - uint16_t getPort() const { return port; } - - private: - static void throwErrno(const std::string& msg) { - throw qpid::Exception(msg+":"+qpid::sys::strError(errno)); - } - static void throwIf(bool condition, const std::string& msg) { - if (condition) throw qpid::Exception(msg); - } - - void run() { - std::auto_ptr server; - try { - fd_set socks; - FdType maxFd = listener.getFd(); - struct timeval tmo; - for (;;) { - FD_ZERO(&socks); - FD_SET(maxFd, &socks); - tmo.tv_sec = 0; - tmo.tv_usec = 500 * 1000; - if (select(maxFd+1, &socks, 0, 0, &tmo) == 0) { - qpid::sys::Mutex::ScopedLock l(lock); - throwIf(closed, "SocketProxy: Closed by close()"); - continue; - } - throwIf(!FD_ISSET(maxFd, &socks), "SocketProxy: Accept failed"); - break; // Accept ready... go to next step - } - server.reset(reinterpret_cast(listener.accept())); - maxFd = server->getFd(); - if (client.getFd() > maxFd) - maxFd = client.getFd(); - char buffer[1024]; - for (;;) { - FD_ZERO(&socks); - tmo.tv_sec = 0; - tmo.tv_usec = 500 * 1000; - FD_SET(client.getFd(), &socks); - FD_SET(server->getFd(), &socks); - if (select(maxFd+1, &socks, 0, 0, &tmo) == 0) { - qpid::sys::Mutex::ScopedLock l(lock); - throwIf(closed, "SocketProxy: Closed by close()"); - continue; - } - // Something is set; relay data as needed until something closes - if (FD_ISSET(server->getFd(), &socks)) { - int n = server->read(buffer, sizeof(buffer)); - throwIf(n <= 0, "SocketProxy: server disconnected"); - if (!dropServer) client.write(buffer, n); - } - if (FD_ISSET(client.getFd(), &socks)) { - int n = client.read(buffer, sizeof(buffer)); - throwIf(n <= 0, "SocketProxy: client disconnected"); - if (!dropServer) server->write(buffer, n); - } - if (!FD_ISSET(client.getFd(), &socks) && - !FD_ISSET(server->getFd(), &socks)) - throwIf(true, "SocketProxy: No handle ready"); - } - } - catch (const std::exception& e) { - QPID_LOG(debug, "SocketProxy::run exception: " << e.what()); - } - try { - if (server.get()) server->close(); - close(); - } - catch (const std::exception& e) { - QPID_LOG(debug, "SocketProxy::run exception in client/server close()" << e.what()); - } - } - - mutable qpid::sys::Mutex lock; - mutable bool closed; - bool joined; - LowSocket client, listener; - uint16_t port; - qpid::sys::Thread thread; - bool dropClient, dropServer; -}; - -}} // namespace qpid::tests - -#endif diff --git a/qpid/cpp/src/tests/TxPublishTest.cpp b/qpid/cpp/src/tests/TxPublishTest.cpp index 210abf0a5b..152581e4ba 100644 --- a/qpid/cpp/src/tests/TxPublishTest.cpp +++ b/qpid/cpp/src/tests/TxPublishTest.cpp @@ -50,10 +50,9 @@ struct TxPublishTest TxPublishTest() : queue1(new Queue("queue1", false, &store, 0)), queue2(new Queue("queue2", false, &store, 0)), - msg(MessageUtils::createMessage("exchange", "routing_key", false, "id")), + msg(MessageUtils::createMessage("exchange", "routing_key", true)), op(msg) { - msg->getProperties()->setDeliveryMode(PERSISTENT); op.deliverTo(queue1); op.deliverTo(queue2); } diff --git a/qpid/cpp/src/tests/Url.cpp b/qpid/cpp/src/tests/Url.cpp index 234a62ee91..b30de682bc 100644 --- a/qpid/cpp/src/tests/Url.cpp +++ b/qpid/cpp/src/tests/Url.cpp @@ -60,6 +60,32 @@ QPID_AUTO_TEST_CASE(TestParseXyz) { BOOST_CHECK_EQUAL(Url("xyz:host").str(), "amqp:xyz:host:5672"); } +QPID_AUTO_TEST_CASE(TestParseTricky) { + BOOST_CHECK_EQUAL(Url("amqp").str(), "amqp:tcp:amqp:5672"); + BOOST_CHECK_EQUAL(Url("amqp:tcp").str(), "amqp:tcp:tcp:5672"); + // These are ambiguous parses and arguably not the best result + BOOST_CHECK_EQUAL(Url("amqp:876").str(), "amqp:tcp:876:5672"); + BOOST_CHECK_EQUAL(Url("tcp:567").str(), "amqp:tcp:567:5672"); +} + +QPID_AUTO_TEST_CASE(TestParseIPv6) { + Url u1("[::]"); + BOOST_CHECK_EQUAL(u1[0].host, "::"); + BOOST_CHECK_EQUAL(u1[0].port, 5672); + Url u2("[::1]"); + BOOST_CHECK_EQUAL(u2[0].host, "::1"); + BOOST_CHECK_EQUAL(u2[0].port, 5672); + Url u3("[::127.0.0.1]"); + BOOST_CHECK_EQUAL(u3[0].host, "::127.0.0.1"); + BOOST_CHECK_EQUAL(u3[0].port, 5672); + Url u4("[2002::222:68ff:fe0b:e61a]"); + BOOST_CHECK_EQUAL(u4[0].host, "2002::222:68ff:fe0b:e61a"); + BOOST_CHECK_EQUAL(u4[0].port, 5672); + Url u5("[2002::222:68ff:fe0b:e61a]:123"); + BOOST_CHECK_EQUAL(u5[0].host, "2002::222:68ff:fe0b:e61a"); + BOOST_CHECK_EQUAL(u5[0].port, 123); +} + QPID_AUTO_TEST_CASE(TestParseMultiAddress) { Url::addProtocol("xyz"); URL_CHECK_STR("amqp:tcp:host:0,xyz:foo:123,tcp:foo:0,xyz:bar:1"); diff --git a/qpid/cpp/src/tests/XmlClientSessionTest.cpp b/qpid/cpp/src/tests/XmlClientSessionTest.cpp index b3b7f12b53..b94c35ece0 100644 --- a/qpid/cpp/src/tests/XmlClientSessionTest.cpp +++ b/qpid/cpp/src/tests/XmlClientSessionTest.cpp @@ -90,7 +90,7 @@ struct SimpleListener : public MessageListener } }; -struct ClientSessionFixture : public ProxySessionFixture +struct ClientSessionFixture : public SessionFixture { void declareSubscribe(const string& q="odd_blue", const string& dest="xml") diff --git a/qpid/cpp/src/tests/allhosts b/qpid/cpp/src/tests/allhosts index e43571aed4..4b4b943156 100755 --- a/qpid/cpp/src/tests/allhosts +++ b/qpid/cpp/src/tests/allhosts @@ -29,11 +29,12 @@ Options: -s SECONDS sleep between starting commands. -q don't print banner lines for each host. -o SUFFIX log output of each command to .SUFFIX + -X passed to ssh - forward X connection. " exit 1 } -while getopts "tl:bs:dqo:" opt; do +while getopts "tl:bs:dqo:X" opt; do case $opt in l) SSHOPTS="-l$OPTARG $SSHOPTS" ;; t) SSHOPTS="-t $SSHOPTS" ;; @@ -42,6 +43,7 @@ while getopts "tl:bs:dqo:" opt; do s) SLEEP="sleep $OPTARG" ;; q) NOBANNER=1 ;; o) SUFFIX=$OPTARG ;; + X) SSHOPTS="-X $SSHOPTS" ;; *) usage;; esac done diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 4a98c638a2..16d7fb0b78 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -251,7 +251,7 @@ class Broker(Popen): def get_log(self): return os.path.abspath(self.log) - def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None): + def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None, show_cmd=False): """Start a broker daemon. name determines the data-dir and log file names.""" @@ -280,6 +280,7 @@ class Broker(Popen): cmd += ["--log-enable=%s" % log_level] self.datadir = self.name cmd += ["--data-dir", self.datadir] + if show_cmd: print cmd Popen.__init__(self, cmd, expect, stdout=PIPE) test.cleanup_stop(self) self._host = "127.0.0.1" @@ -400,7 +401,7 @@ class Cluster: _cluster_count = 0 - def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True): + def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False): self.test = test self._brokers=[] self.name = "cluster%d" % Cluster._cluster_count @@ -411,16 +412,19 @@ class Cluster: self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"] assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in" self.args += [ "--load-module", BrokerTest.cluster_lib ] - self.start_n(count, expect=expect, wait=wait) + self.start_n(count, expect=expect, wait=wait, show_cmd=show_cmd) - def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0): + def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0, show_cmd=False): """Add a broker to the cluster. Returns the index of the new broker.""" if not name: name="%s-%d" % (self.name, len(self._brokers)) - self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port)) + self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port, show_cmd=show_cmd)) return self._brokers[-1] - def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[]): - for i in range(count): self.start(expect=expect, wait=wait, args=args) + def ready(self): + for b in self: b.ready() + + def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[], show_cmd=False): + for i in range(count): self.start(expect=expect, wait=wait, args=args, show_cmd=show_cmd) # Behave like a list of brokers. def __len__(self): return len(self._brokers) @@ -477,31 +481,30 @@ class BrokerTest(TestCase): self.cleanup_stop(p) return p - def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, log_level=None): + def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, log_level=None, show_cmd=False): """Create and return a broker ready for use""" - b = Broker(self, args=args, name=name, expect=expect, port=port, log_level=log_level) + b = Broker(self, args=args, name=name, expect=expect, port=port, log_level=log_level, show_cmd=show_cmd) if (wait): try: b.ready() except Exception, e: raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e)) return b - def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True): + def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False): """Create and return a cluster ready for use""" - cluster = Cluster(self, count, args, expect=expect, wait=wait) + cluster = Cluster(self, count, args, expect=expect, wait=wait, show_cmd=show_cmd) return cluster def browse(self, session, queue, timeout=0): - """Assert that the contents of messages on queue (as retrieved - using session and timeout) exactly match the strings in - expect_contents""" + """Return a list with the contents of each message on queue.""" r = session.receiver("%s;{mode:browse}"%(queue)) + r.capacity = 100 try: contents = [] try: while True: contents.append(r.fetch(timeout=timeout).content) except messaging.Empty: pass - finally: pass #FIXME aconway 2011-04-14: r.close() + finally: r.close() return contents def assert_browse(self, session, queue, expect_contents, timeout=0): diff --git a/qpid/cpp/src/tests/cluster_python_tests_failing.txt b/qpid/cpp/src/tests/cluster_python_tests_failing.txt index 7ba8089946..f8639d7b59 100644 --- a/qpid/cpp/src/tests/cluster_python_tests_failing.txt +++ b/qpid/cpp/src/tests/cluster_python_tests_failing.txt @@ -1,32 +1,4 @@ qpid_tests.broker_0_10.management.ManagementTest.test_purge_queue qpid_tests.broker_0_10.management.ManagementTest.test_connection_close -qpid_tests.broker_0_10.dtx.DtxTests.test_bad_resume -qpid_tests.broker_0_10.dtx.DtxTests.test_commit_unknown -qpid_tests.broker_0_10.dtx.DtxTests.test_end -qpid_tests.broker_0_10.dtx.DtxTests.test_end_suspend_and_fail -qpid_tests.broker_0_10.dtx.DtxTests.test_end_unknown_xid -qpid_tests.broker_0_10.dtx.DtxTests.test_forget_xid_on_completion -qpid_tests.broker_0_10.dtx.DtxTests.test_get_timeout -qpid_tests.broker_0_10.dtx.DtxTests.test_get_timeout_unknown -qpid_tests.broker_0_10.dtx.DtxTests.test_implicit_end -qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_commit_not_ended -qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_commit_one_phase_false -qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_commit_one_phase_true -qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_prepare_not_ended -qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_rollback_not_ended -qpid_tests.broker_0_10.dtx.DtxTests.test_prepare_unknown -qpid_tests.broker_0_10.dtx.DtxTests.test_recover -qpid_tests.broker_0_10.dtx.DtxTests.test_rollback_unknown -qpid_tests.broker_0_10.dtx.DtxTests.test_select_required -qpid_tests.broker_0_10.dtx.DtxTests.test_set_timeout -qpid_tests.broker_0_10.dtx.DtxTests.test_simple_commit -qpid_tests.broker_0_10.dtx.DtxTests.test_simple_prepare_commit -qpid_tests.broker_0_10.dtx.DtxTests.test_simple_prepare_rollback -qpid_tests.broker_0_10.dtx.DtxTests.test_simple_rollback -qpid_tests.broker_0_10.dtx.DtxTests.test_start_already_known -qpid_tests.broker_0_10.dtx.DtxTests.test_start_join -qpid_tests.broker_0_10.dtx.DtxTests.test_start_join_and_resume -qpid_tests.broker_0_10.dtx.DtxTests.test_suspend_resume -qpid_tests.broker_0_10.dtx.DtxTests.test_suspend_start_end_resume qpid_tests.broker_0_10.message.MessageTests.test_ttl qpid_tests.broker_0_10.management.ManagementTest.test_broker_connectivity_oldAPI diff --git a/qpid/cpp/src/tests/cluster_test_logs.py b/qpid/cpp/src/tests/cluster_test_logs.py index a0ce8fb9c3..3c7e8e8020 100755 --- a/qpid/cpp/src/tests/cluster_test_logs.py +++ b/qpid/cpp/src/tests/cluster_test_logs.py @@ -53,6 +53,7 @@ def filter_log(log): 'stall for update|unstall, ignore update|cancelled offer .* unstall', 'caught up', 'active for links|Passivating links|Activating links', + 'info Connecting: .*', # UpdateClient connection 'info Connection.* connected to', # UpdateClient connection 'warning Connection \\[[-0-9.: ]+\\] closed', # UpdateClient connection 'warning Broker closed connection: 200, OK', diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 13f4f716da..879dcdaeaf 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -164,6 +164,121 @@ acl allow all all self.fail("Expected exception") except messaging.exceptions.NotFound: pass + def test_sasl_join(self): + """Verify SASL authentication between brokers when joining a cluster.""" + sasl_config=os.path.join(self.rootdir, "sasl_config") + + # Valid user/password, ensure queue is created. + c = cluster[0].connect(username="zig", password="zig") + c.session().sender("ziggy;{create:always}") + c.close() + c = cluster[1].connect(username="zig", password="zig") + c.session().receiver("ziggy;{assert:always}") + c.close() + for b in cluster: b.ready() # Make sure all brokers still running. + + # Valid user, bad password + try: + cluster[0].connect(username="zig", password="foo").close() + self.fail("Expected exception") + except messaging.exceptions.ConnectionError: pass + for b in cluster: b.ready() # Make sure all brokers still running. + + # Bad user ID + try: + cluster[0].connect(username="foo", password="bar").close() + self.fail("Expected exception") + except messaging.exceptions.ConnectionError: pass + for b in cluster: b.ready() # Make sure all brokers still running. + + # Action disallowed by ACL + c = cluster[0].connect(username="zag", password="zag") + try: + s = c.session() + s.sender("zaggy;{create:always}") + s.close() + self.fail("Expected exception") + except messaging.exceptions.UnauthorizedAccess: pass + # make sure the queue was not created at the other node. + c = cluster[0].connect(username="zag", password="zag") + try: + s = c.session() + s.sender("zaggy;{assert:always}") + s.close() + self.fail("Expected exception") + except messaging.exceptions.NotFound: pass + + def test_sasl_join(self): + """Verify SASL authentication between brokers when joining a cluster.""" + # Valid user/password, ensure queue is created. + c = cluster[0].connect(username="zig", password="zig") + c.session().sender("ziggy;{create:always}") + c.close() + c = cluster[1].connect(username="zig", password="zig") + c.session().receiver("ziggy;{assert:always}") + c.close() + for b in cluster: b.ready() # Make sure all brokers still running. + + # Valid user, bad password + try: + cluster[0].connect(username="zig", password="foo").close() + self.fail("Expected exception") + except messaging.exceptions.ConnectionError: pass + for b in cluster: b.ready() # Make sure all brokers still running. + + # Bad user ID + try: + cluster[0].connect(username="foo", password="bar").close() + self.fail("Expected exception") + except messaging.exceptions.ConnectionError: pass + for b in cluster: b.ready() # Make sure all brokers still running. + + # Action disallowed by ACL + c = cluster[0].connect(username="zag", password="zag") + try: + s = c.session() + s.sender("zaggy;{create:always}") + s.close() + self.fail("Expected exception") + except messaging.exceptions.UnauthorizedAccess: pass + # make sure the queue was not created at the other node. + c = cluster[0].connect(username="zag", password="zag") + try: + s = c.session() + s.sender("zaggy;{assert:always}") + s.close() + self.fail("Expected exception") + except messaging.exceptions.NotFound: pass + + def test_sasl_join(self): + """Verify SASL authentication between brokers when joining a cluster.""" + sasl_config=os.path.join(self.rootdir, "sasl_config") + # Test with a valid username/password + cluster = self.cluster(1, args=["--auth", "yes", + "--sasl-config", sasl_config, + "--load-module", os.getenv("ACL_LIB"), + "--cluster-username=zig", + "--cluster-password=zig", + "--cluster-mechanism=PLAIN" + ]) + cluster.start() + cluster.ready() + c = cluster[1].connect(username="zag", password="zag") + + # Test with an invalid username/password + cluster = self.cluster(1, args=["--auth", "yes", + "--sasl-config", sasl_config, + "--load-module", os.getenv("ACL_LIB"), + "--cluster-username=x", + "--cluster-password=y", + "--cluster-mechanism=PLAIN" + ]) + try: + cluster.start(expect=EXPECT_EXIT_OK) + cluster[1].ready() + self.fail("Expected exception") + except: pass + def test_user_id_update(self): """Ensure that user-id of an open session is updated to new cluster members""" sasl_config=os.path.join(self.rootdir, "sasl_config") @@ -713,6 +828,272 @@ acl allow all all cluster.start() fetch(cluster[2]) +# Some utility code for transaction tests +XA_RBROLLBACK = 1 +XA_RBTIMEOUT = 2 +XA_OK = 0 +dtx_branch_counter = 0 + +class DtxStatusException(Exception): + def __init__(self, expect, actual): + self.expect = expect + self.actual = actual + + def str(self): + return "DtxStatusException(expect=%s, actual=%s)"%(self.expect, self.actual) + +class DtxTestFixture: + """Bundle together some common requirements for dtx tests.""" + def __init__(self, test, broker, name, exclusive=False): + self.test = test + self.broker = broker + self.name = name + # Use old API. DTX is not supported in messaging API. + self.connection = broker.connect_old() + self.session = self.connection.session(name, 1) # 1 second timeout + self.queue = self.session.queue_declare(name, exclusive=exclusive) + self.session.dtx_select() + self.consumer = None + + def xid(self, id=None): + if id is None: id = self.name + return self.session.xid(format=0, global_id=id) + + def check_status(self, expect, actual): + if expect != actual: raise DtxStatusException(expect, actual) + + def start(self, id=None, resume=False): + self.check_status(XA_OK, self.session.dtx_start(xid=self.xid(id), resume=resume).status) + + def end(self, id=None, suspend=False): + self.check_status(XA_OK, self.session.dtx_end(xid=self.xid(id), suspend=suspend).status) + + def prepare(self, id=None): + self.check_status(XA_OK, self.session.dtx_prepare(xid=self.xid(id)).status) + + def commit(self, id=None, one_phase=True): + self.check_status( + XA_OK, self.session.dtx_commit(xid=self.xid(id), one_phase=one_phase).status) + + def rollback(self, id=None): + self.check_status(XA_OK, self.session.dtx_rollback(xid=self.xid(id)).status) + + def set_timeout(self, timeout, id=None): + self.session.dtx_set_timeout(xid=self.xid(id),timeout=timeout) + + def send(self, messages): + for m in messages: + dp=self.session.delivery_properties(routing_key=self.name) + mp=self.session.message_properties() + self.session.message_transfer(message=qpid.datatypes.Message(dp, mp, m)) + + def accept(self): + """Accept 1 message from queue""" + consumer_tag="%s-consumer"%(self.name) + self.session.message_subscribe(queue=self.name, destination=consumer_tag) + self.session.message_flow(unit = self.session.credit_unit.message, value = 1, destination = consumer_tag) + self.session.message_flow(unit = self.session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag) + msg = self.session.incoming(consumer_tag).get(timeout=1) + self.session.message_cancel(destination=consumer_tag) + self.session.message_accept(qpid.datatypes.RangedSet(msg.id)) + return msg + + + def verify(self, sessions, messages): + for s in sessions: + self.test.assert_browse(s, self.name, messages) + +class DtxTests(BrokerTest): + + def test_dtx_update(self): + """Verify that DTX transaction state is updated to a new broker. + Start a collection of transactions, then add a new cluster member, + then verify they commit/rollback correctly on the new broker.""" + + # Note: multiple test have been bundled into one to avoid the need to start/stop + # multiple brokers per test. + + cluster=self.cluster(1) + sessions = [cluster[0].connect().session()] # For verify + + # Transaction that will be open when new member joins, then committed. + t1 = DtxTestFixture(self, cluster[0], "t1") + t1.start() + t1.send(["1", "2"]) + t1.verify(sessions, []) # Not visible outside of transaction + + # Transaction that will be open when new member joins, then rolled back. + t2 = DtxTestFixture(self, cluster[0], "t2") + t2.start() + t2.send(["1", "2"]) + + # Transaction that will be prepared when new member joins, then committed. + t3 = DtxTestFixture(self, cluster[0], "t3") + t3.start() + t3.send(["1", "2"]) + t3.end() + t3.prepare() + t1.verify(sessions, []) # Not visible outside of transaction + + # Transaction that will be prepared when new member joins, then rolled back. + t4 = DtxTestFixture(self, cluster[0], "t4") + t4.start() + t4.send(["1", "2"]) + t4.end() + t4.prepare() + + # Transaction using an exclusive queue + t5 = DtxTestFixture(self, cluster[0], "t5", exclusive=True) + t5.start() + t5.send(["1", "2"]) + + # Accept messages in a transaction before/after join then commit + t6 = DtxTestFixture(self, cluster[0], "t6") + t6.send(["a","b","c"]) + t6.start() + self.assertEqual(t6.accept().body, "a"); + + # Accept messages in a transaction before/after join then roll back + t7 = DtxTestFixture(self, cluster[0], "t7") + t7.send(["a","b","c"]) + t7.start() + self.assertEqual(t7.accept().body, "a"); + + # Ended, suspended transactions across join. + t8 = DtxTestFixture(self, cluster[0], "t8") + t8.start(id="1") + t8.send(["x"]) + t8.end(id="1", suspend=True) + t8.start(id="2") + t8.send(["y"]) + t8.end(id="2") + t8.start() + t8.send("z") + + + # Start new cluster member + cluster.start() + sessions.append(cluster[1].connect().session()) + + # Commit t1 + t1.send(["3","4"]) + t1.verify(sessions, []) + t1.end() + t1.commit(one_phase=True) + t1.verify(sessions, ["1","2","3","4"]) + + # Rollback t2 + t2.send(["3","4"]) + t2.end() + t2.rollback() + t2.verify(sessions, []) + + # Commit t3 + t3.commit(one_phase=False) + t3.verify(sessions, ["1","2"]) + + # Rollback t4 + t4.rollback() + t4.verify(sessions, []) + + # Commit t5 + t5.send(["3","4"]) + t5.verify(sessions, []) + t5.end() + t5.commit(one_phase=True) + t5.verify(sessions, ["1","2","3","4"]) + + # Commit t6 + self.assertEqual(t6.accept().body, "b"); + t6.verify(sessions, ["c"]) + t6.end() + t6.commit(one_phase=True) + t6.session.close() # Make sure they're not requeued by the session. + t6.verify(sessions, ["c"]) + + # Rollback t7 + self.assertEqual(t7.accept().body, "b"); + t7.end() + t7.rollback() + t7.verify(sessions, ["a", "b", "c"]) + + # Resume t8 + t8.end() + t8.commit(one_phase=True) + t8.start("1", resume=True) + t8.end("1") + t8.commit("1", one_phase=True) + t8.commit("2", one_phase=True) + t8.verify(sessions, ["z", "x","y"]) + + + def test_dtx_failover_rollback(self): + """Kill a broker during a transaction, verify we roll back correctly""" + cluster=self.cluster(1, expect=EXPECT_EXIT_FAIL) + cluster.start(expect=EXPECT_RUNNING) + + # Test unprepared at crash + t1 = DtxTestFixture(self, cluster[0], "t1") + t1.send(["a"]) # Not in transaction + t1.start() + t1.send(["b"]) # In transaction + + # Test prepared at crash + t2 = DtxTestFixture(self, cluster[0], "t2") + t2.send(["a"]) # Not in transaction + t2.start() + t2.send(["b"]) # In transaction + t2.end() + t2.prepare() + + # Crash the broker + cluster[0].kill() + + # Transactional changes should not appear + s = cluster[1].connect().session(); + self.assert_browse(s, "t1", ["a"]) + self.assert_browse(s, "t2", ["a"]) + + def test_dtx_timeout(self): + """Verify that dtx timeout works""" + cluster = self.cluster(1) + t1 = DtxTestFixture(self, cluster[0], "t1") + t1.start() + t1.set_timeout(1) + time.sleep(1.1) + try: + t1.end() + self.fail("Expected rollback timeout.") + except DtxStatusException, e: + self.assertEqual(e.actual, XA_RBTIMEOUT) + +class TxTests(BrokerTest): + + def test_tx_update(self): + """Verify that transaction state is updated to a new broker""" + + def make_message(session, body=None, key=None, id=None): + dp=session.delivery_properties(routing_key=key) + mp=session.message_properties(correlation_id=id) + return qpid.datatypes.Message(dp, mp, body) + + cluster=self.cluster(1) + # Use old API. TX is not supported in messaging API. + c = cluster[0].connect_old() + s = c.session("tx-session", 1) + s.queue_declare(queue="q") + # Start transaction + s.tx_select() + s.message_transfer(message=make_message(s, "1", "q")) + # Start new member mid-transaction + cluster.start() + # Do more work + s.message_transfer(message=make_message(s, "2", "q")) + # Commit the transaction and verify the results. + s.tx_commit() + for b in cluster: self.assert_browse(b.connect().session(), "q", ["1","2"]) + + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION= is set""" def duration(self): @@ -1001,6 +1382,8 @@ class LongTests(BrokerTest): logger = logging.getLogger() log_level = logger.getEffectiveLevel() logger.setLevel(logging.ERROR) + sender = None + receiver = None try: # Start sender and receiver threads receiver = Receiver(cluster[0], "q;{create:always}") @@ -1031,8 +1414,8 @@ class LongTests(BrokerTest): finally: # Detach to avoid slow reconnect attempts during shut-down if test fails. - sender.connection.detach() - receiver.connection.detach() + if sender: sender.connection.detach() + if receiver: receiver.connection.detach() logger.setLevel(log_level) def test_msg_group_failover(self): diff --git a/qpid/cpp/src/tests/exception_test.cpp b/qpid/cpp/src/tests/exception_test.cpp index 3536ffddbe..3e844b4e58 100644 --- a/qpid/cpp/src/tests/exception_test.cpp +++ b/qpid/cpp/src/tests/exception_test.cpp @@ -92,32 +92,30 @@ QPID_AUTO_TEST_CASE(TestSessionBusy) { } QPID_AUTO_TEST_CASE(DisconnectedPop) { - ProxySessionFixture fix; - ProxyConnection c(fix.broker->getPort(Broker::TCP_TRANSPORT)); + SessionFixture fix; fix.session.queueDeclare(arg::queue="q"); fix.subs.subscribe(fix.lq, "q"); Catcher pop(bind(&LocalQueue::pop, &fix.lq, sys::TIME_SEC)); - fix.connection.proxy.close(); + fix.shutdownBroker(); BOOST_CHECK(pop.join()); } QPID_AUTO_TEST_CASE(DisconnectedListen) { - ProxySessionFixture fix; + SessionFixture fix; struct NullListener : public MessageListener { void received(Message&) { BOOST_FAIL("Unexpected message"); } } l; - ProxyConnection c(fix.broker->getPort(Broker::TCP_TRANSPORT)); fix.session.queueDeclare(arg::queue="q"); fix.subs.subscribe(l, "q"); Catcher runner(bind(&SubscriptionManager::run, boost::ref(fix.subs))); - fix.connection.proxy.close(); - runner.join(); + fix.shutdownBroker(); + runner.join(); BOOST_CHECK_THROW(fix.session.queueDeclare(arg::queue="x"), TransportFailure); } QPID_AUTO_TEST_CASE(NoSuchQueueTest) { - ProxySessionFixture fix; + SessionFixture fix; ScopedSuppressLogging sl; // Suppress messages for expected errors. BOOST_CHECK_THROW(fix.subs.subscribe(fix.lq, "no such queue"), NotFoundException); } diff --git a/qpid/cpp/src/tests/federated_topic_test b/qpid/cpp/src/tests/federated_topic_test index b1063c7e8c..2e55ddcfaa 100755 --- a/qpid/cpp/src/tests/federated_topic_test +++ b/qpid/cpp/src/tests/federated_topic_test @@ -42,13 +42,12 @@ while getopts "s:m:b:" opt ; do esac done -MY_DIR=$(dirname $(which $0)) source ./test_env.sh trap stop_brokers EXIT start_broker() { - ${MY_DIR}/../qpidd --daemon --port 0 --no-module-dir --no-data-dir --auth no > qpidd.port + $QPIDD_EXEC --daemon --port 0 --no-module-dir --no-data-dir --auth no > qpidd.port } start_brokers() { @@ -76,39 +75,39 @@ subscribe() { echo Subscriber $1 connecting on $MY_PORT LOG="subscriber_$1.log" - ${MY_DIR}/topic_listener -p $MY_PORT > $LOG 2>&1 && rm -f $LOG + ./qpid-topic-listener -p $MY_PORT > $LOG 2>&1 && rm -f $LOG } publish() { - ${MY_DIR}/topic_publisher --messages $MESSAGES --batches $BATCHES --subscribers $SUBSCRIBERS -p $PORT_A + ./qpid-topic-publisher --messages $MESSAGES --batches $BATCHES --subscribers $SUBSCRIBERS -p $PORT_A } setup_routes() { - BROKER_A="localhost:$PORT_A" - BROKER_B="localhost:$PORT_B" - BROKER_C="localhost:$PORT_C" + BROKER_A="daffodil:$PORT_A" + BROKER_B="daffodil:$PORT_B" + BROKER_C="daffodil:$PORT_C" if (($VERBOSE)); then echo "Establishing routes for topic..." fi - $PYTHON_COMMANDS/qpid-route route add $BROKER_B $BROKER_A amq.topic topic_control B B - $PYTHON_COMMANDS/qpid-route route add $BROKER_C $BROKER_B amq.topic topic_control C C + $QPID_ROUTE_EXEC route add $BROKER_B $BROKER_A amq.topic topic_control B B + $QPID_ROUTE_EXEC route add $BROKER_C $BROKER_B amq.topic topic_control C C if (($VERBOSE)); then echo "linked A->B->C" fi - $PYTHON_COMMANDS/qpid-route route add $BROKER_B $BROKER_C amq.topic topic_control B B - $PYTHON_COMMANDS/qpid-route route add $BROKER_A $BROKER_B amq.topic topic_control A A + $QPID_ROUTE_EXEC route add $BROKER_B $BROKER_C amq.topic topic_control B B + $QPID_ROUTE_EXEC route add $BROKER_A $BROKER_B amq.topic topic_control A A if (($VERBOSE)); then echo "linked C->B->A" echo "Establishing routes for response queue..." fi - $PYTHON_COMMANDS/qpid-route route add $BROKER_B $BROKER_C amq.direct response B B - $PYTHON_COMMANDS/qpid-route route add $BROKER_A $BROKER_B amq.direct response A A + $QPID_ROUTE_EXEC route add $BROKER_B $BROKER_C amq.direct response B B + $QPID_ROUTE_EXEC route add $BROKER_A $BROKER_B amq.direct response A A if (($VERBOSE)); then echo "linked C->B->A" for b in $BROKER_A $BROKER_B $BROKER_C; do echo "Routes for $b" - $PYTHON_COMMANDS/qpid-route route list $b + $QPID_ROUTE_EXEC route list $b done fi } diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py index 49bdecdd95..7d613b98ce 100755 --- a/qpid/cpp/src/tests/federation.py +++ b/qpid/cpp/src/tests/federation.py @@ -111,18 +111,18 @@ class FederationTests(TestBase010): broker = qmf.getObjects(_class="broker")[0] result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) link = qmf.getObjects(_class="link")[0] result = link.bridge(False, "amq.direct", "amq.direct", "my-key", "", "", False, False, False, 0) - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) bridge = qmf.getObjects(_class="bridge")[0] result = bridge.close() - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) result = link.close() - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) self.verify_cleanup() @@ -133,11 +133,11 @@ class FederationTests(TestBase010): qmf = self.qmf broker = qmf.getObjects(_class="broker")[0] result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) link = qmf.getObjects(_class="link")[0] result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False, 0) - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) bridge = qmf.getObjects(_class="bridge")[0] @@ -165,9 +165,9 @@ class FederationTests(TestBase010): except Empty: None result = bridge.close() - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) result = link.close() - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) self.verify_cleanup() @@ -178,11 +178,11 @@ class FederationTests(TestBase010): qmf = self.qmf broker = qmf.getObjects(_class="broker")[0] result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) link = qmf.getObjects(_class="link")[0] result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, True, False, 0) - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) bridge = qmf.getObjects(_class="bridge")[0] @@ -209,9 +209,9 @@ class FederationTests(TestBase010): except Empty: None result = bridge.close() - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) result = link.close() - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) self.verify_cleanup() @@ -236,11 +236,11 @@ class FederationTests(TestBase010): qmf = self.qmf broker = qmf.getObjects(_class="broker")[0] result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) link = qmf.getObjects(_class="link")[0] result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1) - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) bridge = qmf.getObjects(_class="bridge")[0] sleep(3) @@ -261,6 +261,63 @@ class FederationTests(TestBase010): self.fail("Got unexpected message in queue: " + extra.body) except Empty: None + result = bridge.close() + self.assertEqual(result.status, 0, result) + result = link.close() + self.assertEqual(result.status, 0, result) + + self.verify_cleanup() + + def test_pull_from_queue_recovery(self): + session = self.session + + #setup queue on remote broker and add some messages + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_pull_from_queue_recovery") + r_session.queue_declare(queue="my-bridge-queue", auto_delete=True) + for i in range(1, 6): + dp = r_session.delivery_properties(routing_key="my-bridge-queue") + r_session.message_transfer(message=Message(dp, "Message %d" % i)) + + #setup queue to receive messages from local broker + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="amq.fanout") + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0, result) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1) + self.assertEqual(result.status, 0, result) + + bridge = qmf.getObjects(_class="bridge")[0] + sleep(5) + + #recreate the remote bridge queue to invalidate the bridge session + r_session.queue_delete (queue="my-bridge-queue", if_empty=False, if_unused=False) + r_session.queue_declare(queue="my-bridge-queue", auto_delete=True) + + #add some more messages (i.e. after bridge was created) + for i in range(6, 11): + dp = r_session.delivery_properties(routing_key="my-bridge-queue") + r_session.message_transfer(message=Message(dp, "Message %d" % i)) + + for i in range(1, 11): + try: + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + except Empty: + self.fail("Failed to find expected message containing 'Message %d'" % i) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + result = bridge.close() self.assertEqual(result.status, 0) result = link.close() diff --git a/qpid/cpp/src/tests/federation_sys.py b/qpid/cpp/src/tests/federation_sys.py new file mode 100755 index 0000000000..11590f684e --- /dev/null +++ b/qpid/cpp/src/tests/federation_sys.py @@ -0,0 +1,1900 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from inspect import stack +from qpid import messaging +from qpid.messaging import Message +from qpid.messaging.exceptions import Empty +from qpid.testlib import TestBase010 +from random import randint +from sys import stdout +from time import sleep + + +class Enum(object): + def __init__(self, **entries): + self.__dict__.update(entries) + def __repr__(self): + args = ['%s=%s' % (k, repr(v)) for (k,v) in vars(self).items()] + return 'Enum(%s)' % ', '.join(args) + + +class QmfTestBase010(TestBase010): + + _brokers = [] + _links = [] + _bridges = [] + _alt_exch_ops = Enum(none=0, create=1, delete=2) + + class _Broker(object): + """ + This broker proxy object holds the Qmf proxy to a broker of known address as well as the QMF broker + object, connection and sessions to the broker. + """ + def __init__(self, url): + self.url = url # format: "host:port" + url_parts = url.split(':') + self.host = url_parts[0] + self.port = int(url_parts[1]) + self.qmf_broker = None + self.connection = messaging.Connection.establish(self.url) + self.sessions = [] + def __str__(self): + return "_Broker %s:%s (%d open sessions)" % (self.host, self.port, len(self.sessions)) + def destroy(self, qmf = None): + if qmf is not None: + qmf.delBroker(self.qmf_broker.getBroker()) + for session in self.sessions: + try: # Session may have been closed by broker error + session.close() + except Exception, e: print "WARNING: %s: Unable to close session %s (%s): %s %s" % (self, session, hex(id(session)), type(e), e) + try: # Connection may have been closed by broker error + self.connection.close() + except Exception, e: print "WARNING: %s: Unable to close connection %s (%s): %s %s" % (self, self.connection, hex(id(self.connection)), type(e), e) + def session(self, name, transactional_flag = False): + session = self.connection.session(name, transactional_flag) + self.sessions.append(session) + return session + + def setUp(self): + """ + Called one before each test starts + """ + TestBase010.setUp(self) + self.startQmf(); + + def tearDown(self): + """ + Called once after each test competes. Close all Qmf objects (bridges, links and brokers) + """ + while len(self._bridges): + self._bridges.pop().close() + while len(self._links): + self._links.pop().close() + while len(self._brokers): + b = self._brokers.pop() + if len(self._brokers) <= 1: + b.destroy(None) + else: + b.destroy(self.qmf) + TestBase010.tearDown(self) + self.qmf.close() + + #--- General test utility functions + + def _get_name(self): + """ + Return the name of method which called this method stripped of "test_" prefix. Used for naming + queues and exchanges on a per-test basis. + """ + return stack()[1][3][5:] + + def _get_broker_port(self, key): + """ + Get the port of a broker defined in the environment using -D=portno + """ + return int(self.defines[key]) + + def _get_cluster_ports(self, key): + """ + Get the cluster ports from the parameters of the test which place it in the environment using + -D="port0 port1 ... portN" (space-separated) + """ + ports = [] + ports_str = self.defines[key] + if ports_str: + for p in ports_str.split(): + ports.append(int(p)) + return ports + + def _get_send_address(self, exch_name, queue_name): + """ + Get an address to which to send messages based on the exchange name and queue name, but taking into account + that the exchange name may be "" (the default exchange), in whcih case the format changes slightly. + """ + if len(exch_name) == 0: # Default exchange + return queue_name + return "%s/%s" % (exch_name, queue_name) + + def _get_broker(self, cluster_flag, broker_port_key, cluster_ports_key): + """ + Read the port numbers for pre-started brokers from the environment using keys, then find or create and return + the Qmf broker proxy for the appropriate broker + """ + if cluster_flag: + port = self._get_cluster_ports(cluster_ports_key)[0] # Always use the first node in the cluster + else: + port = self._get_broker_port(broker_port_key) + return self._find_create_broker("localhost:%s" % port) + + def _get_msg_subject(self, topic_key): + """ + Return an appropriate subject for sending a message to a known topic. Return None if there is no topic. + """ + if len(topic_key) == 0: return None + if "*" in topic_key: return topic_key.replace("*", "test") + if "#" in topic_key: return topic_key.replace("#", "multipart.test") + return topic_key + + def _send_msgs(self, session_name, broker, addr, msg_count, msg_content = "Message_%03d", topic_key = "", + msg_durable_flag = False, enq_txn_size = 0): + """ + Send messages to a broker using address addr + """ + send_session = broker.session(session_name, transactional_flag = enq_txn_size > 0) + sender = send_session.sender(addr) + txn_cnt = 0 + for i in range(0, msg_count): + sender.send(Message(msg_content % (i + 1), subject = self._get_msg_subject(topic_key), durable = msg_durable_flag)) + if enq_txn_size > 0: + txn_cnt += 1 + if txn_cnt >= enq_txn_size: + send_session.commit() + txn_cnt = 0 + if enq_txn_size > 0 and txn_cnt > 0: + send_session.commit() + sender.close() + send_session.close() + + def _receive_msgs(self, session_name, broker, addr, msg_count, msg_content = "Message_%03d", deq_txn_size = 0, + timeout = 0): + """ + Receive messages from a broker + """ + receive_session = broker.session(session_name, transactional_flag = deq_txn_size > 0) + receiver = receive_session.receiver(addr) + txn_cnt = 0 + for i in range(0, msg_count): + try: + msg = receiver.fetch(timeout = timeout) + if deq_txn_size > 0: + txn_cnt += 1 + if txn_cnt >= deq_txn_size: + receive_session.commit() + txn_cnt = 0 + receive_session.acknowledge() + except Empty: + if deq_txn_size > 0: receive_session.rollback() + receiver.close() + receive_session.close() + if i == 0: + self.fail("Broker %s queue \"%s\" is empty" % (broker.qmf_broker.getBroker().getUrl(), addr)) + else: + self.fail("Unable to receive message %d from broker %s queue \"%s\"" % (i, broker.qmf_broker.getBroker().getUrl(), addr)) + if msg.content != msg_content % (i + 1): + receiver.close() + receive_session.close() + self.fail("Unexpected message \"%s\", was expecting \"%s\"." % (msg.content, msg_content % (i + 1))) + try: + msg = receiver.fetch(timeout = 0) + if deq_txn_size > 0: receive_session.rollback() + receiver.close() + receive_session.close() + self.fail("Extra message \"%s\" found on broker %s address \"%s\"" % (msg.content, broker.qmf_broker.getBroker().getUrl(), addr)) + except Empty: + pass + if deq_txn_size > 0 and txn_cnt > 0: + receive_session.commit() + receiver.close() + receive_session.close() + + #--- QMF-specific utility functions + + def _get_qmf_property(self, props, key): + """ + Get the value of a named property key kj from a property list [(k0, v0), (k1, v1), ... (kn, vn)]. + """ + for k,v in props: + if k.name == key: + return v + return None + + def _check_qmf_return(self, method_result): + """ + Check the result of a Qmf-defined method call + """ + self.assertTrue(method_result.status == 0, method_result.text) + + def _check_optional_qmf_property(self, qmf_broker, type, qmf_object, key, expected_val, obj_ref_flag): + """ + Optional Qmf properties don't show up in the properties list when they are not specified. Checks for + these property types involve searching the properties list and making sure it is present or not as + expected. + """ + val = self._get_qmf_property(qmf_object.getProperties(), key) + if val is None: + if len(expected_val) > 0: + self.fail("%s %s exists, but has does not have %s property. Expected value: \"%s\"" % + (type, qmf_object.name, key, expected_val)) + else: + if len(expected_val) > 0: + if obj_ref_flag: + obj = self.qmf.getObjects(_objectId = val, _broker = qmf_broker.getBroker()) + self.assertEqual(len(obj), 1, "More than one object with the same objectId: %s" % obj) + val = obj[0].name + self.assertEqual(val, expected_val, "%s %s exists, but has incorrect %s property. Found \"%s\", expected \"%s\"" % + (type, qmf_object.name, key, val, expected_val)) + else: + self.fail("%s %s exists, but has an unexpected %s property \"%s\" set." % (type, qmf_object.name, key, val)) + + #--- Find/create Qmf broker objects + + def _find_qmf_broker(self, url): + """ + Find the Qmf broker object for the given broker URL. The broker must have been previously added to Qmf through + addBroker() + """ + for b in self.qmf.getObjects(_class="broker"): + if b.getBroker().getUrl() == url: + return b + return None + + def _find_create_broker(self, url): + """ + Find a running broker through Qmf. If it does not exist, add it (assuming the broker is already running). + """ + broker = self._Broker(url) + self._brokers.append(broker) + if self.qmf is not None: + qmf_broker = self._find_qmf_broker(broker.url) + if qmf_broker is None: + self.qmf.addBroker(broker.url) + broker.qmf_broker = self._find_qmf_broker(broker.url) + else: + broker.qmf_broker = qmf_broker + return broker + + #--- Find/create/delete exchanges + + def _find_qmf_exchange(self, qmf_broker, name, type, alternate, durable, auto_delete): + """ + Find Qmf exchange object + """ + for e in self.qmf.getObjects(_class="exchange", _broker = qmf_broker.getBroker()): + if e.name == name: + if len(name) == 0 or (len(name) >= 4 and name[:4] == "amq."): return e # skip checks for special exchanges + self.assertEqual(e.type, type, + "Exchange \"%s\" exists, but is of unexpected type %s; expected type %s." % + (name, e.type, type)) + self._check_optional_qmf_property(qmf_broker, "Exchange", e, "altExchange", alternate, True) + self.assertEqual(e.durable, durable, + "Exchange \"%s\" exists, but has incorrect durability. Found durable=%s, expected durable=%s" % + (name, e.durable, durable)) + self.assertEqual(e.autoDelete, auto_delete, + "Exchange \"%s\" exists, but has incorrect auto-delete property. Found %s, expected %s" % + (name, e.autoDelete, auto_delete)) + return e + return None + + def _find_create_qmf_exchange(self, qmf_broker, name, type, alternate, durable, auto_delete, args): + """ + Find Qmf exchange object if exchange exists, create exchange and return its Qmf object if not + """ + e = self._find_qmf_exchange(qmf_broker, name, type, alternate, durable, auto_delete) + if e is not None: return e + # Does not exist, so create it + props = dict({"exchange-type": type, "type": type, "durable": durable, "auto-delete": auto_delete, "alternate-exchange": alternate}, **args) + self._check_qmf_return(qmf_broker.create(type="exchange", name=name, properties=props, strict=True)) + e = self._find_qmf_exchange(qmf_broker, name, type, alternate, durable, auto_delete) + self.assertNotEqual(e, None, "Creation of exchange %s on broker %s failed" % (name, qmf_broker.getBroker().getUrl())) + return e + + def _find_delete_qmf_exchange(self, qmf_broker, name, type, alternate, durable, auto_delete): + """ + Find and delete Qmf exchange object if it exists + """ + e = self._find_qmf_exchange(qmf_broker, name, type, alternate, durable, auto_delete) + if e is not None and not auto_delete: + self._check_qmf_return(qmf_broker.delete(type="exchange", name=name, options={})) + + #--- Find/create/delete queues + + def _find_qmf_queue(self, qmf_broker, name, alternate_exchange, durable, exclusive, auto_delete): + """ + Find a Qmf queue object + """ + for q in self.qmf.getObjects(_class="queue", _broker = qmf_broker.getBroker()): + if q.name == name: + self._check_optional_qmf_property(qmf_broker, "Queue", q, "altExchange", alternate_exchange, True) + self.assertEqual(q.durable, durable, + "Queue \"%s\" exists, but has incorrect durable property. Found %s, expected %s" % + (name, q.durable, durable)) + self.assertEqual(q.exclusive, exclusive, + "Queue \"%s\" exists, but has incorrect exclusive property. Found %s, expected %s" % + (name, q.exclusive, exclusive)) + self.assertEqual(q.autoDelete, auto_delete, + "Queue \"%s\" exists, but has incorrect auto-delete property. Found %s, expected %s" % + (name, q.autoDelete, auto_delete)) + return q + return None + + def _find_create_qmf_queue(self, qmf_broker, name, alternate_exchange, durable, exclusive, auto_delete, args): + """ + Find Qmf queue object if queue exists, create queue and return its Qmf object if not + """ + q = self._find_qmf_queue(qmf_broker, name, alternate_exchange, durable, exclusive, auto_delete) + if q is not None: return q + # Queue does not exist, so create it + props = dict({"durable": durable, "auto-delete": auto_delete, "exclusive": exclusive, "alternate-exchange": alternate_exchange}, **args) + self._check_qmf_return(qmf_broker.create(type="queue", name=name, properties=props, strict=True)) + q = self._find_qmf_queue(qmf_broker, name, alternate_exchange, durable, exclusive, auto_delete) + self.assertNotEqual(q, None, "Creation of queue %s on broker %s failed" % (name, qmf_broker.getBroker().getUrl())) + return q + + def _find_delete_qmf_queue(self, qmf_broker, name, alternate_exchange, durable, exclusive, auto_delete, args): + """ + Find and delete Qmf queue object if it exists + """ + q = self._find_qmf_queue(qmf_broker, name, alternate_exchange, durable, exclusive, auto_delete) + if q is not None and not auto_delete: + self._check_qmf_return(qmf_broker.delete(type="queue", name=name, options={})) + + #--- Find/create/delete bindings (between an exchange and a queue) + + def _find_qmf_binding(self, qmf_broker, qmf_exchange, qmf_queue, binding_key, binding_args): + """ + Find a Qmf binding object + """ + for b in self.qmf.getObjects(_class="binding", _broker = qmf_broker.getBroker()): + if b.exchangeRef == qmf_exchange.getObjectId() and b.queueRef == qmf_queue.getObjectId(): + if qmf_exchange.type != "fanout": # Fanout ignores the binding key, and always returns "" as the key + self.assertEqual(b.bindingKey, binding_key, + "Binding between exchange %s and queue %s exists, but has mismatching binding key: Found %s, expected %s." % + (qmf_exchange.name, qmf_queue.name, b.bindingKey, binding_key)) + self.assertEqual(b.arguments, binding_args, + "Binding between exchange %s and queue %s exists, but has mismatching arguments: Found %s, expected %s" % + (qmf_exchange.name, qmf_queue.name, b.arguments, binding_args)) + return b + return None + + def _find_create_qmf_binding(self, qmf_broker, qmf_exchange, qmf_queue, binding_key, binding_args): + """ + Find Qmf binding object if it exists, create binding and return its Qmf object if not + """ + b = self._find_qmf_binding(qmf_broker, qmf_exchange, qmf_queue, binding_key, binding_args) + if b is not None: return b + # Does not exist, so create it + self._check_qmf_return(qmf_broker.create(type="binding", name="%s/%s/%s" % (qmf_exchange.name, qmf_queue.name, binding_key), properties=binding_args, strict=True)) + b = self._find_qmf_binding(qmf_broker, qmf_exchange, qmf_queue, binding_key, binding_args) + self.assertNotEqual(b, None, "Creation of binding between exchange %s and queue %s with key %s failed" % + (qmf_exchange.name, qmf_queue.name, binding_key)) + return b + + def _find_delete_qmf_binding(self, qmf_broker, qmf_exchange, qmf_queue, binding_key, binding_args): + """ + Find and delete Qmf binding object if it exists + """ + b = self._find_qmf_binding(qmf_broker, qmf_exchange, qmf_queue, binding_key, binding_args) + if b is not None: + if len(qmf_exchange.name) > 0: # not default exchange + self._check_qmf_return(qmf_broker.delete(type="binding", name="%s/%s/%s" % (qmf_exchange.name, qmf_queue.name, binding_key), options={})) + + #--- Find/create a link + + def _find_qmf_link(self, qmf_from_broker_proxy, host, port): + """ + Find a Qmf link object + """ + for l in self.qmf.getObjects(_class="link", _broker=qmf_from_broker_proxy): + if l.host == host and l.port == port: + return l + return None + + def _find_create_qmf_link(self, qmf_from_broker, qmf_to_broker_proxy, link_durable_flag, auth_mechanism, user_id, + password, transport, pause_interval, link_ready_timeout): + """ + Find a Qmf link object if it exists, create it and return its Qmf link object if not + """ + to_broker_host = qmf_to_broker_proxy.host + to_broker_port = qmf_to_broker_proxy.port + l = self._find_qmf_link(qmf_from_broker.getBroker(), to_broker_host, to_broker_port) + if l is not None: return l + # Does not exist, so create it + self._check_qmf_return(qmf_from_broker.connect(to_broker_host, to_broker_port, link_durable_flag, auth_mechanism, user_id, password, transport)) + l = self._find_qmf_link(qmf_from_broker.getBroker(), to_broker_host, to_broker_port) + self.assertNotEqual(l, None, "Creation of link from broker %s to broker %s failed" % + (qmf_from_broker.getBroker().getUrl(), qmf_to_broker_proxy.getUrl())) + self._wait_for_link(l, pause_interval, link_ready_timeout) + return l + + def _wait_for_link(self, link, pause_interval, link_ready_timeout): + """ + Wait for link to become active (state=Operational) + """ + tot_time = 0 + link.update() + if link.state == "": + # Link mgmt updates for the c++ link object are disabled when in a cluster because of inconsistent state: + # one is "Operational", the other "Passive". In this case, wait a bit and hope for the best... + sleep(2*pause_interval) + else: + while link.state != "Operational" and tot_time < link_ready_timeout: + sleep(pause_interval) + tot_time += pause_interval + link.update() + self.assertEqual(link.state, "Operational", "Timeout: Link not operational, state=%s" % link.state) + + #--- Find/create a bridge + + def _find_qmf_bridge(self, qmf_broker_proxy, qmf_link, source, destination, key): + """ + Find a Qmf link object + """ + for b in self.qmf.getObjects(_class="bridge", _broker=qmf_broker_proxy): + if b.linkRef == qmf_link.getObjectId() and b.src == source and b.dest == destination and b.key == key: + return b + return None + + def _find_create_qmf_bridge(self, qmf_broker_proxy, qmf_link, queue_name, exch_name, topic_key, + queue_route_type_flag, bridge_durable_flag): + """ + Find a Qmf bridge object if it exists, create it and return its Qmf object if not + """ + if queue_route_type_flag: + src = queue_name + dest = exch_name + key = "" + else: + src = exch_name + dest = exch_name + if len(topic_key) > 0: + key = topic_key + else: + key = queue_name + b = self._find_qmf_bridge(qmf_broker_proxy, qmf_link, src, dest, key) + if b is not None: + return b + # Does not exist, so create it + self._check_qmf_return(qmf_link.bridge(bridge_durable_flag, src, dest, key, "", "", queue_route_type_flag, False, False, 1)) + b = self._find_qmf_bridge(qmf_broker_proxy, qmf_link, src, dest, key) + self.assertNotEqual(b, None, "Bridge creation failed: src=%s dest=%s key=%s" % (src, dest, key)) + return b + + def _wait_for_bridge(self, bridge, src_broker, dest_broker, exch_name, queue_name, topic_key, pause_interval, + bridge_ready_timeout): + """ + Wait for bridge to become active by sending messages over the bridge at 1 sec intervals until they are + observed at the destination. + """ + tot_time = 0 + active = False + send_session = src_broker.session("tx") + sender = send_session.sender(self._get_send_address(exch_name, queue_name)) + src_receive_session = src_broker.session("src_rx") + src_receiver = src_receive_session.receiver(queue_name) + dest_receive_session = dest_broker.session("dest_rx") + dest_receiver = dest_receive_session.receiver(queue_name) + while not active and tot_time < bridge_ready_timeout: + sender.send(Message("xyz123", subject = self._get_msg_subject(topic_key))) + try: + src_receiver.fetch(timeout = 0) + src_receive_session.acknowledge() + # Keep receiving msgs, as several may have accumulated + while True: + dest_receiver.fetch(timeout = 0) + dest_receive_session.acknowledge() + sleep(1) + active = True + except Empty: + sleep(pause_interval) + tot_time += pause_interval + dest_receiver.close() + dest_receive_session.close() + src_receiver.close() + src_receive_session.close() + sender.close() + send_session.close() + self.assertTrue(active, "Bridge failed to become active after %ds: %s" % (bridge_ready_timeout, bridge)) + + #--- Find/create/delete utility functions + + def _create_and_bind(self, qmf_broker, exchange_args, queue_args, binding_args): + """ + Create a binding between a named exchange and queue on a broker + """ + e = self._find_create_qmf_exchange(qmf_broker, **exchange_args) + q = self._find_create_qmf_queue(qmf_broker, **queue_args) + return self._find_create_qmf_binding(qmf_broker, e, q, **binding_args) + + def _check_alt_exchange(self, qmf_broker, alt_exch_name, alt_exch_type, alt_exch_op): + """ + Check for existence of alternate exchange. Return the Qmf exchange proxy object for the alternate exchange + """ + if len(alt_exch_name) == 0: return None + if alt_exch_op == _alt_exch_ops.create: + return self._find_create_qmf_exchange(qmf_broker=qmf_broker, name=alt_exch_name, type=alt_exch_type, + alternate="", durable=False, auto_delete=False, args={}) + if alt_exch_op == _alt_exch_ops.delete: + return self._find_delete_qmf_exchange(qmf_broker=qmf_broker, name=alt_exch_name, type=alt_exch_type, + alternate="", durable=False, auto_delete=False) + return self._find_qmf_exchange(qmf_broker=qmf_broker, name=alt_exchange_name, type=alt_exchange_type, + alternate="", durable=False, auto_delete=False) + + def _delete_queue_binding(self, qmf_broker, exchange_args, queue_args, binding_args): + """ + Delete a queue and the binding between it and the exchange + """ + e = self._find_qmf_exchange(qmf_broker, exchange_args["name"], exchange_args["type"], exchange_args["alternate"], exchange_args["durable"], exchange_args["auto_delete"]) + q = self._find_qmf_queue(qmf_broker, queue_args["name"], queue_args["alternate_exchange"], queue_args["durable"], queue_args["exclusive"], queue_args["auto_delete"]) + self._find_delete_qmf_binding(qmf_broker, e, q, **binding_args) + self._find_delete_qmf_queue(qmf_broker, **queue_args) + + def _create_route(self, queue_route_type_flag, src_broker, dest_broker, exch_name, queue_name, topic_key, + link_durable_flag, bridge_durable_flag, auth_mechanism, user_id, password, transport, + pause_interval = 1, link_ready_timeout = 20, bridge_ready_timeout = 20): + """ + Create a route from a source broker to a destination broker + """ + l = self._find_create_qmf_link(dest_broker.qmf_broker, src_broker.qmf_broker.getBroker(), link_durable_flag, + auth_mechanism, user_id, password, transport, pause_interval, link_ready_timeout) + self._links.append(l) + b = self._find_create_qmf_bridge(dest_broker.qmf_broker.getBroker(), l, queue_name, exch_name, topic_key, + queue_route_type_flag, bridge_durable_flag) + self._bridges.append(b) + self._wait_for_bridge(b, src_broker, dest_broker, exch_name, queue_name, topic_key, pause_interval, bridge_ready_timeout) + + # Parameterized test - entry point for tests + + def _do_test(self, + test_name, # Name of test + exch_name = "amq.direct", # Remote exchange name + exch_type = "direct", # Remote exchange type + exch_alt_exch = "", # Remote exchange alternate exchange + exch_alt_exch_type = "direct", # Remote exchange alternate exchange type + exch_durable_flag = False, # Remote exchange durability + exch_auto_delete_flag = False, # Remote exchange auto-delete property + exch_x_args = {}, # Remote exchange args + queue_alt_exch = "", # Remote queue alternate exchange + queue_alt_exch_type = "direct", # Remote queue alternate exchange type + queue_durable_flag = False, # Remote queue durability + queue_exclusive_flag = False, # Remote queue exclusive property + queue_auto_delete_flag = False, # Remote queue auto-delete property + queue_x_args = {}, # Remote queue args + binding_durable_flag = False, # Remote binding durability + binding_x_args = {}, # Remote binding args + topic_key = "", # Binding key For remote topic exchanges only + msg_count = 10, # Number of messages to send + msg_durable_flag = False, # Message durability + link_durable_flag = False, # Route link durability + bridge_durable_flag = False, # Route bridge durability + queue_route_type_flag = False, # Route type: false = bridge route, true = queue route + enq_txn_size = 0, # Enqueue transaction size, 0 = no transactions + deq_txn_size = 0, # Dequeue transaction size, 0 = no transactions + local_cluster_flag = False, # Use a node from the local cluster, otherwise use single local broker + remote_cluster_flag = False, # Use a node from the remote cluster, otherwise use single remote broker + alt_exch_op = _alt_exch_ops.create,# Op on alt exch [create (ensure present), delete (ensure not present), none (neither create nor delete)] + auth_mechanism = "", # Authorization mechanism for linked broker + user_id = "", # User ID for authorization on linked broker + password = "", # Password for authorization on linked broker + transport = "tcp" # Transport for route to linked broker + ): + """ + Parameterized federation test. Sets up a federated link between a source broker and a destination broker and + checks that messages correctly pass over the link to the destination. Where appropriate (non-queue-routes), also + checks for the presence of messages on the source broker. + + In these tests, the concept is to create a LOCAL broker, then create a link to a REMOTE broker using federation. + In other words, the messages sent to the LOCAL broker will be replicated on the REMOTE broker, and tests are + performed on the REMOTE broker to check that the required messages are present. In the case of regular routes, + the LOCAL broker will also retain the messages, and a similar test is performed on this broker. + + TODO: There are several items to improve here: + 1. _do_test() is rather general. Rather create a version for each exchange type and test the exchange/queue + interaction in more detail based on the exchange type + 2. Add a headers and an xml exchange type + 3. Restructure the tests to start and stop brokers and clusters directly rather than relying on previously + started brokers. Then persistence can be checked by stopping and restarting the brokers/clusters. In particular, + test the persistence of links and bridges, both of which take a persistence flag. + 4. Test the behavior of the alternate exchanges when messages are sourced through a link. Also check behavior + when the alternate exchange is not present or is deleted after the reference is made. + 5. Test special queue types (eg LVQ) + """ + local_broker = self._get_broker(local_cluster_flag, "local-port", "local-cluster-ports") + remote_broker = self._get_broker(remote_cluster_flag, "remote-port", "remote-cluster-ports") + + # Check alternate exchanges exist (and create them if not) on both local and remote brokers + self._check_alt_exchange(local_broker.qmf_broker, exch_alt_exch, exch_alt_exch_type, alt_exch_op) + self._check_alt_exchange(local_broker.qmf_broker, queue_alt_exch, queue_alt_exch_type, alt_exch_op) + self._check_alt_exchange(remote_broker.qmf_broker, exch_alt_exch, exch_alt_exch_type, alt_exch_op) + self._check_alt_exchange(remote_broker.qmf_broker, queue_alt_exch, queue_alt_exch_type, alt_exch_op) + + queue_name = "queue_%s" % test_name + exchange_args = {"name": exch_name, "type": exch_type, "alternate": exch_alt_exch, + "durable": exch_durable_flag, "auto_delete": exch_auto_delete_flag, "args": exch_x_args} + queue_args = {"name": queue_name, "alternate_exchange": queue_alt_exch, "durable": queue_durable_flag, + "exclusive": queue_exclusive_flag, "auto_delete": queue_auto_delete_flag, "args": queue_x_args} + binding_args = {"binding_args": binding_x_args} + if exch_type == "topic": + self.assertTrue(len(topic_key) > 0, "Topic exchange selected, but no topic key was set.") + binding_args["binding_key"] = topic_key + elif exch_type == "direct": + binding_args["binding_key"] = queue_name + else: + binding_args["binding_key"] = "" + self._create_and_bind(qmf_broker=local_broker.qmf_broker, exchange_args=exchange_args, queue_args=queue_args, binding_args=binding_args) + self._create_and_bind(qmf_broker=remote_broker.qmf_broker, exchange_args=exchange_args, queue_args=queue_args, binding_args=binding_args) + self._create_route(queue_route_type_flag, local_broker, remote_broker, exch_name, queue_name, topic_key, + link_durable_flag, bridge_durable_flag, auth_mechanism, user_id, password, transport) + + self._send_msgs("send_session", local_broker, addr = self._get_send_address(exch_name, queue_name), + msg_count = msg_count, topic_key = topic_key, msg_durable_flag = msg_durable_flag, enq_txn_size = enq_txn_size) + if not queue_route_type_flag: + self._receive_msgs("local_receive_session", local_broker, addr = queue_name, msg_count = msg_count, deq_txn_size = deq_txn_size) + self._receive_msgs("remote_receive_session", remote_broker, addr = queue_name, msg_count = msg_count, deq_txn_size = deq_txn_size, timeout = 5) + + # Clean up + self._delete_queue_binding(qmf_broker=local_broker.qmf_broker, exchange_args=exchange_args, queue_args=queue_args, binding_args=binding_args) + self._delete_queue_binding(qmf_broker=remote_broker.qmf_broker, exchange_args=exchange_args, queue_args=queue_args, binding_args=binding_args) + +class A_ShortTests(QmfTestBase010): + + def test_route_defaultExch(self): + self._do_test(self._get_name()) + + def test_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True) + + +class A_LongTests(QmfTestBase010): + + def test_route_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct") + + def test_queueRoute_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_route_type_flag=True) + + + def test_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange") + + def test_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True) + + + def test_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout") + + def test_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True) + + + def test_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#") + + def test_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True) + + +class B_ShortTransactionTests(QmfTestBase010): + + def test_txEnq01_route_defaultExch(self): + self._do_test(self._get_name(), enq_txn_size=1) + + def test_txEnq01_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=1) + + def test_txEnq01_txDeq01_route_defaultExch(self): + self._do_test(self._get_name(), enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1) + + +class B_LongTransactionTests(QmfTestBase010): + + def test_txEnq10_route_defaultExch(self): + self._do_test(self._get_name(), enq_txn_size=10, msg_count = 103) + + def test_txEnq10_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=10, msg_count = 103) + + + + + def test_txEnq01_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=1) + + def test_txEnq01_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=1) + + def test_txEnq10_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=10, msg_count = 103) + + def test_txEnq10_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq01_txDeq01_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1) + + + def test_txEnq01_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=1) + + def test_txEnq01_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=1) + + def test_txEnq10_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=10, msg_count = 103) + + def test_txEnq10_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq01_txDeq01_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1) + + + def test_txEnq01_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=1) + + def test_txEnq01_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=1) + + def test_txEnq10_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=10, msg_count = 103) + + def test_txEnq10_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq01_txDeq01_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1) + + +class C_ShortClusterTests(QmfTestBase010): + + def test_locCluster_route_defaultExch(self): + self._do_test(self._get_name(), local_cluster_flag=True) + + def test_locCluster_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, local_cluster_flag=True) + + def test_remCluster_route_defaultExch(self): + self._do_test(self._get_name(), remote_cluster_flag=True) + + def test_remCluster_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_route_defaultExch(self): + self._do_test(self._get_name(), local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + +class C_LongClusterTests(QmfTestBase010): + + def test_locCluster_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", local_cluster_flag=True) + + def test_locCluster_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, local_cluster_flag=True) + + def test_remCluster_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", remote_cluster_flag=True) + + def test_remCluster_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + + def test_locCluster_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", local_cluster_flag=True) + + def test_locCluster_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, local_cluster_flag=True) + + def test_remCluster_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", remote_cluster_flag=True) + + def test_remCluster_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + + def test_locCluster_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", local_cluster_flag=True) + + def test_locCluster_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, local_cluster_flag=True) + + def test_remCluster_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", remote_cluster_flag=True) + + def test_remCluster_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + +class D_ShortClusterTransactionTests(QmfTestBase010): + + def test_txEnq01_locCluster_route_defaultExch(self): + self._do_test(self._get_name(), enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_route_defaultExch(self): + self._do_test(self._get_name(), enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_remCluster_route_defaultExch(self): + self._do_test(self._get_name(), enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_route_defaultExch(self): + self._do_test(self._get_name(), enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_route_defaultExch(self): + self._do_test(self._get_name(), enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_route_defaultExch(self): + self._do_test(self._get_name(), enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + +class D_LongClusterTransactionTests(QmfTestBase010): + + def test_txEnq10_locCluster_route_defaultExch(self): + self._do_test(self._get_name(), enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_remCluster_route_defaultExch(self): + self._do_test(self._get_name(), enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_route_defaultExch(self): + self._do_test(self._get_name(), enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + + def test_txEnq01_locCluster_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq10_locCluster_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_remCluster_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq10_remCluster_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + + def test_txEnq01_locCluster_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq10_locCluster_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_remCluster_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq10_remCluster_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + + def test_txEnq01_locCluster_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq10_locCluster_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_remCluster_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq10_remCluster_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + +class E_ShortPersistenceTests(QmfTestBase010): + + def test_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True) + + def test_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True) + + def test_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True) + + def test_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True) + + +class E_LongPersistenceTests(QmfTestBase010): + + + def test_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True) + + def test_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True) + + def test_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True) + + def test_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True) + + + def test_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True) + + def test_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True) + + def test_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True) + + def test_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True) + + + def test_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True) + + def test_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True) + + def test_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True) + + def test_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True) + + +class F_ShortPersistenceTransactionTests(QmfTestBase010): + + def test_txEnq01_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=1) + + def test_txEnq01_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1) + + def test_txEnq01_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1) + + def test_txEnq01_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1) + + def test_txEnq01_txDeq01_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1) + + +class F_LongPersistenceTransactionTests(QmfTestBase010): + + def test_txEnq10_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq10_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq10_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq10_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103) + + + + + def test_txEnq01_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=1) + + def test_txEnq01_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1) + + def test_txEnq01_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1) + + def test_txEnq01_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1) + + def test_txEnq10_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq10_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq10_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq10_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq01_txDeq01_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1) + + + def test_txEnq01_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=1) + + def test_txEnq01_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1) + + def test_txEnq01_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1) + + def test_txEnq01_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1) + + def test_txEnq10_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq10_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq10_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq10_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq01_txDeq01_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1) + + + def test_txEnq01_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=1) + + def test_txEnq01_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1) + + def test_txEnq01_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1) + + def test_txEnq01_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1) + + def test_txEnq10_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq10_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq10_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq10_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq01_txDeq01_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1) + + +class G_ShortPersistenceClusterTests(QmfTestBase010): + + def test_locCluster_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, local_cluster_flag=True) + + def test_locCluster_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, local_cluster_flag=True) + + def test_locCluster_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True) + + def test_locCluster_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True) + + def test_remCluster_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, remote_cluster_flag=True) + + def test_remCluster_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, remote_cluster_flag=True) + + def test_remCluster_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, remote_cluster_flag=True) + + def test_remCluster_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + +class G_LongPersistenceClusterTests(QmfTestBase010): + + + + def test_locCluster_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, local_cluster_flag=True) + + def test_locCluster_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, local_cluster_flag=True) + + def test_locCluster_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True) + + def test_locCluster_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True) + + def test_remCluster_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, remote_cluster_flag=True) + + def test_remCluster_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, remote_cluster_flag=True) + + def test_remCluster_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, remote_cluster_flag=True) + + def test_remCluster_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + + def test_locCluster_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, local_cluster_flag=True) + + def test_locCluster_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, local_cluster_flag=True) + + def test_locCluster_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True) + + def test_locCluster_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True) + + def test_remCluster_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, remote_cluster_flag=True) + + def test_remCluster_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, remote_cluster_flag=True) + + def test_remCluster_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, remote_cluster_flag=True) + + def test_remCluster_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + + def test_locCluster_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, local_cluster_flag=True) + + def test_locCluster_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, local_cluster_flag=True) + + def test_locCluster_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True) + + def test_locCluster_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True) + + def test_remCluster_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, remote_cluster_flag=True) + + def test_remCluster_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, remote_cluster_flag=True) + + def test_remCluster_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, remote_cluster_flag=True) + + def test_remCluster_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + +class H_ShortPersistenceClusterTransactionTests(QmfTestBase010): + + def test_txEnq01_locCluster_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_remCluster_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + +class H_LongPersistenceClusterTransactionTests(QmfTestBase010): + + def test_txEnq10_locCluster_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_remCluster_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + + + + def test_txEnq01_locCluster_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq10_locCluster_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_remCluster_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq10_remCluster_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + + def test_txEnq01_locCluster_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq10_locCluster_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_remCluster_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq10_remCluster_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + + def test_txEnq01_locCluster_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq10_locCluster_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_remCluster_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq10_remCluster_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + diff --git a/qpid/cpp/src/tests/ipv6_test b/qpid/cpp/src/tests/ipv6_test new file mode 100755 index 0000000000..d75d50fd0a --- /dev/null +++ b/qpid/cpp/src/tests/ipv6_test @@ -0,0 +1,150 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Run a simple test over IPv6 +source ./test_env.sh + +CONFIG=$(dirname $0)/config.null +TEST_HOSTNAME=::1 +COUNT=10 + +trap cleanup EXIT + +error() { echo $*; exit 1; } + +# Don't need --no-module-dir or --no-data-dir as they are set as env vars in test_env.sh +COMMON_OPTS="--daemon --auth no --config $CONFIG" + +# Record all broker ports started +unset PORTS +declare -a PORTS + +# Start new brokers: +# $1 must be integer +# $2 = extra opts +# Append used ports to PORTS variable +start_brokers() { + local -a ports + for (( i=0; $i<$1; i++)) do + ports[$i]=$($QPIDD_EXEC --port 0 $COMMON_OPTS $2) + done + PORTS=( ${PORTS[@]} ${ports[@]} ) +} + +stop_brokers() { + for port in "${PORTS[@]}"; + do + $QPIDD_EXEC -qp $port + done + PORTS=() +} + +cleanup() { + stop_brokers +} + +start_brokers 1 +PORT=${PORTS[0]} +echo "Started IPv6 smoke perftest on broker port $PORT" + +## Test connection via connection settings +./qpid-perftest --count ${COUNT} --port ${PORT} -b $TEST_HOSTNAME --summary + +## Test connection with a URL +URL="amqp:[$TEST_HOSTNAME]:$PORT" + +./qpid-send -b $URL --content-string=hello -a "foo;{create:always}" +MSG=`./qpid-receive -b $URL -a "foo;{create:always}" --messages 1` +test "$MSG" = "hello" || { echo "receive failed '$MSG' != 'hello'"; exit 1; } + +stop_brokers + +# Federation smoke test follows + +# Start 2 brokers + +# In a distribution, the python tools will be absent. +if [ ! -f $QPID_CONFIG_EXEC ] || [ ! -f $QPID_ROUTE_EXEC ] ; then + echo "python tools absent - skipping federation test." +else + + start_brokers 2 + echo "Started Federated brokers on ports ${PORTS[*]}" + # Make broker urls + BROKER0="[::1]:${PORTS[0]}" + BROKER1="[::1]:${PORTS[1]}" + TEST_QUEUE=ipv6-fed-test + + $QPID_CONFIG_EXEC -a $BROKER0 add queue $TEST_QUEUE + $QPID_CONFIG_EXEC -a $BROKER1 add queue $TEST_QUEUE + $QPID_ROUTE_EXEC dynamic add $BROKER1 $BROKER0 amq.direct + $QPID_CONFIG_EXEC -a $BROKER1 bind amq.direct $TEST_QUEUE $TEST_QUEUE + $QPID_ROUTE_EXEC route map $BROKER1 + + ./datagen --count 100 | tee rdata-in | + ./qpid-send -b amqp:$BROKER0 -a amq.direct/$TEST_QUEUE --content-stdin + ./qpid-receive -b amqp:$BROKER1 -a $TEST_QUEUE --print-content yes -m 0 > rdata-out + + cmp rdata-in rdata-out || { echo "Federated data over IPv6 does not compare"; exit 1; } + + stop_brokers + rm rdata-in rdata-out +fi + +# Cluster smoke test follows +test -z $CLUSTER_LIB && exit 0 # Exit if cluster not supported. + +## Test failover in a cluster using IPv6 only +. $srcdir/ais_check # Will exit if clustering not enabled. + +pick_port() { + # We need a fixed port to set --cluster-url. Use qpidd to pick a free port. + # Note this method is racy + PICK=$($QPIDD_EXEC -dp0) + $QPIDD_EXEC -qp $PICK + echo $PICK +} + +ssl_cluster_broker() { # $1 = port + $QPIDD_EXEC $COMMON_OPTS --load-module $CLUSTER_LIB --cluster-name ipv6_test.$HOSTNAME.$$ --cluster-url amqp:[$TEST_HOSTNAME]:$1 --port $1 + # Wait for broker to be ready + ./qpid-ping -b $TEST_HOSTNAME -qp $1 || { echo "Cannot connect to broker on $1"; exit 1; } + echo "Running IPv6 cluster broker on port $1" +} + +PORT1=`pick_port`; ssl_cluster_broker $PORT1 +PORT2=`pick_port`; ssl_cluster_broker $PORT2 + +# Pipe receive output to uniq to remove duplicates +./qpid-receive --connection-options "{reconnect:true, reconnect-timeout:5}" --failover-updates -b amqp:[$TEST_HOSTNAME]:$PORT1 -a "foo;{create:always}" -f | uniq > ssl_test_receive.tmp & + +./qpid-send -b amqp:[$TEST_HOSTNAME]:$PORT2 --content-string=one -a "foo;{create:always}" + +$QPIDD_EXEC -qp $PORT1 # Kill broker 1 receiver should fail-over. +./qpid-send -b amqp:[$TEST_HOSTNAME]:$PORT2 --content-string=two -a "foo;{create:always}" --send-eos 1 +wait # Wait for qpid-receive +{ echo one; echo two; } > ssl_test_receive.cmp +diff ssl_test_receive.tmp ssl_test_receive.cmp || { echo "Failover failed"; exit 1; } + +$QPIDD_EXEC -qp $PORT2 + +rm -f ssl_test_receive.* + diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark index 6138108558..300d34774f 100755 --- a/qpid/cpp/src/tests/qpid-cpp-benchmark +++ b/qpid/cpp/src/tests/qpid-cpp-benchmark @@ -215,7 +215,8 @@ class ReadyReceiver: raise Exception("Receiver error: %s"%(out)) raise Exception("Timed out waiting for receivers to be ready") -def flatten(l): return sum(map(lambda s: s.split(","), l),[]) +def flatten(l): + return sum(map(lambda s: re.split(re.compile("\s*,\s*|\s+"), s), l), []) class RoundRobin: def __init__(self,items): diff --git a/qpid/cpp/src/tests/qpid-perftest.cpp b/qpid/cpp/src/tests/qpid-perftest.cpp index 8a5cf05775..3aff742c62 100644 --- a/qpid/cpp/src/tests/qpid-perftest.cpp +++ b/qpid/cpp/src/tests/qpid-perftest.cpp @@ -396,7 +396,7 @@ struct Controller : public Client { void run() { // Controller try { // Wait for subscribers to be ready. - process(opts.totalSubs, fqn("sub_ready"), bind(expect, _1, "ready")); + process(opts.totalSubs, fqn("sub_ready"), boost::bind(expect, _1, "ready")); LocalQueue pubDone; LocalQueue subDone; @@ -510,10 +510,11 @@ struct PublishThread : public Client { } SubscriptionManager subs(session); LocalQueue lq; - subs.setFlowControl(1, SubscriptionManager::UNLIMITED, true); - subs.subscribe(lq, fqn("pub_start")); + subs.setFlowControl(0, SubscriptionManager::UNLIMITED, false); + Subscription cs = subs.subscribe(lq, fqn("pub_start")); for (size_t j = 0; j < opts.iterations; ++j) { + cs.grantMessageCredit(1); expect(lq.pop().getData(), "start"); AbsTime start=now(); for (size_t i=0; i /dev/null +if (( $? == 0 )); then + CLUSTERING_ENABLED=1 +else + echo "WARNING: No clustering detected; tests using it will be ignored." +fi + +# Test for long test +if [[ "$1" == "LONG_TEST" ]]; then + USE_LONG_TEST=1 + shift # get rid of this param so it is not treated as a test name +fi + +trap stop_brokers INT TERM QUIT + +SKIPTESTS="-i federation_sys.E_* -i federation_sys.F_* -i federation_sys.G_* -i federation_sys.H_*" +if [ -z ${USE_LONG_TEST} ]; then + SKIPTESTS="-i federation_sys.A_Long* -i federation_sys.B_Long* ${SKIPTESTS}" +fi +echo "WARNING: Tests using persistence will be ignored." +if [ -z ${CLUSTERING_ENABLED} ]; then + SKIPTESTS="${SKIPTESTS} -i federation_sys.C_* -i federation_sys.D_*" +elif [ -z ${USE_LONG_TEST} ]; then + SKIPTESTS="${SKIPTESTS} -i federation_sys.C_Long* -i federation_sys.D_Long*" +fi + +start_brokers() { + start_broker() { + ${QPIDD_EXEC} --daemon --port 0 --auth no --no-data-dir $1 > qpidd.port + PORT=`cat qpidd.port` + eval "$2=${PORT}" + } + start_broker "" LOCAL_PORT + start_broker "" REMOTE_PORT + if [ -n "${CLUSTERING_ENABLED}" ]; then + start_broker "--load-module ${CLUSTER_LIB} --cluster-name test-cluster-1" CLUSTER_C1_1 + start_broker "--load-module ${CLUSTER_LIB} --cluster-name test-cluster-1" CLUSTER_C1_2 + start_broker "--load-module ${CLUSTER_LIB} --cluster-name test-cluster-2" CLUSTER_C2_1 + start_broker "--load-module ${CLUSTER_LIB} --cluster-name test-cluster-2" CLUSTER_C2_2 + fi + rm qpidd.port +} + +stop_brokers() { + ${QPIDD_EXEC} -q --port ${LOCAL_PORT} + ${QPIDD_EXEC} -q --port ${REMOTE_PORT} + if [ -n "${CLUSTERING_ENABLED}" ]; then + ${QPID_CLUSTER_EXEC} --all-stop --force localhost:${CLUSTER_C1_1} + ${QPID_CLUSTER_EXEC} --all-stop --force localhost:${CLUSTER_C2_1} + fi +} + +if test -d ${PYTHON_DIR} ; then + start_brokers + if [ -z ${CLUSTERING_ENABLED} ]; then + echo "Running federation tests using brokers on local port ${LOCAL_PORT}, remote port ${REMOTE_PORT} (NOTE: clustering is DISABLED)" + else + echo "Running federation tests using brokers on local port ${LOCAL_PORT}, remote port ${REMOTE_PORT}, local cluster nodes ${CLUSTER_C1_1} ${CLUSTER_C1_2}, remote cluster nodes ${CLUSTER_C2_1} ${CLUSTER_C2_2}" + fi + if [ -z ${USE_LONG_TEST} ]; then + echo "NOTE: To run a full set of federation system tests, use \"make check-long\". To test with persistence, run the store version of this script." + fi + ${QPID_PYTHON_TEST} -m ${MODULENAME} ${SKIPTESTS} -b localhost:${REMOTE_PORT} -Dlocal-port=${LOCAL_PORT} -Dremote-port=${REMOTE_PORT} -Dlocal-cluster-ports="${CLUSTER_C1_1} ${CLUSTER_C1_2}" -Dremote-cluster-ports="${CLUSTER_C2_1} ${CLUSTER_C2_2}" $@ + RETCODE=$? + stop_brokers + if test x${RETCODE} != x0; then + echo "FAIL federation tests"; exit 1; + fi +fi diff --git a/qpid/cpp/src/tests/run_federation_tests b/qpid/cpp/src/tests/run_federation_tests index 590f74746e..14af4807ba 100755 --- a/qpid/cpp/src/tests/run_federation_tests +++ b/qpid/cpp/src/tests/run_federation_tests @@ -55,7 +55,7 @@ stop_brokers() { if test -d ${PYTHON_DIR} ; then start_brokers echo "Running federation tests using brokers on ports $LOCAL_PORT $REMOTE_PORT $REMOTE_B1 $REMOTE_B2" - $QPID_PYTHON_TEST -m federation $SKIPTESTS -b localhost:$LOCAL_PORT -Dremote-port=$REMOTE_PORT -Dextra-brokers="$REMOTE_B1 $REMOTE_B2" $@ + $QPID_PYTHON_TEST -m federation "$SKIPTESTS" -b localhost:$LOCAL_PORT -Dremote-port=$REMOTE_PORT -Dextra-brokers="$REMOTE_B1 $REMOTE_B2" $@ RETCODE=$? stop_brokers if test x$RETCODE != x0; then diff --git a/qpid/cpp/src/tests/run_long_federation_sys_tests b/qpid/cpp/src/tests/run_long_federation_sys_tests new file mode 100644 index 0000000000..69dc08d11c --- /dev/null +++ b/qpid/cpp/src/tests/run_long_federation_sys_tests @@ -0,0 +1,24 @@ +#! /bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Run the federation system tests (long version). + +./run_federation_sys_tests LONG_TEST $@ diff --git a/qpid/cpp/src/tests/run_store_tests.ps1 b/qpid/cpp/src/tests/run_store_tests.ps1 index 76b46737f0..b2f0b1ccd8 100644 --- a/qpid/cpp/src/tests/run_store_tests.ps1 +++ b/qpid/cpp/src/tests/run_store_tests.ps1 @@ -111,7 +111,7 @@ Invoke-Expression "$prog --quit --port $env:QPID_PORT" | Write-Output # Test 2... store.py starts/stops/restarts its own brokers $tests = "*" -$env:PYTHONPATH="$PYTHON_DIR;$srcdir" +$env:PYTHONPATH="$PYTHON_DIR;$QMF_LIB;$srcdir" $env:QPIDD_EXEC="$prog" $env:STORE_LIB="$store_dir\store$suffix.dll" if ($test_store -eq "MSSQL") { diff --git a/qpid/cpp/src/tests/sasl_test_setup.sh b/qpid/cpp/src/tests/sasl_test_setup.sh index 6395ba6ec3..3e69c0f02b 100755 --- a/qpid/cpp/src/tests/sasl_test_setup.sh +++ b/qpid/cpp/src/tests/sasl_test_setup.sh @@ -30,6 +30,7 @@ pwcheck_method: auxprop auxprop_plugin: sasldb sasldb_path: $PWD/sasl_config/qpidd.sasldb sql_select: dummy select +mech_list: ANONYMOUS PLAIN DIGEST-MD5 EXTERNAL EOF # Populate temporary sasl db. -- cgit v1.2.1