diff options
author | Kim van der Riet <kpvdr@apache.org> | 2011-07-19 19:57:06 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2011-07-19 19:57:06 +0000 |
commit | b877c84924aa9c90caa54623735e5f754b124781 (patch) | |
tree | 2e7f4e1b3c7b630f0bed5df66cf2fd960f18fa6d | |
parent | fff763944c9bca13f6fcf2eed8c88ba30db8fdd0 (diff) | |
download | qpid-python-b877c84924aa9c90caa54623735e5f754b124781.tar.gz |
QPID-702656 Patch from Gordon Sim plus tests which detect the condition being solved. Added a make check-long target to the Makefile in the cpp dir to make it easier to run the long tests.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1148503 13f79535-47bb-0310-9956-ffa450edef68
22 files changed, 2471 insertions, 105 deletions
diff --git a/qpid/cpp/Makefile.am b/qpid/cpp/Makefile.am index 01b8507454..9f4b8e2082 100644 --- a/qpid/cpp/Makefile.am +++ b/qpid/cpp/Makefile.am @@ -33,3 +33,7 @@ SUBDIRS = managementgen etc src docs/api docs/man examples bindings/qmf bindings # Update libtool, if needed. libtool: $(LIBTOOL_DEPS) $(SHELL) ./config.status --recheck + +check-long: all + $(MAKE) -C src/tests check-long +
\ No newline at end of file diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index f80e0f1e61..6eaf16b052 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -434,8 +434,9 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, _qmf::ArgsBrokerConnect& hp= dynamic_cast<_qmf::ArgsBrokerConnect&>(args); - QPID_LOG (debug, "Broker::connect()"); string transport = hp.i_transport.empty() ? TCP_TRANSPORT : hp.i_transport; + QPID_LOG (debug, "Broker::connect() " << hp.i_host << ":" << hp.i_port << "; transport=" << transport << + "; durable=" << (hp.i_durable?"T":"F") << "; authMech=\"" << hp.i_authMechanism << "\""); if (!getProtocolFactory(transport)) { QPID_LOG(error, "Transport '" << transport << "' not supported"); return Manageable::STATUS_NOT_IMPLEMENTED; @@ -452,9 +453,9 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, _qmf::ArgsBrokerQueueMoveMessages& moveArgs= dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args); QPID_LOG (debug, "Broker::queueMoveMessages()"); - if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty)) + if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty)) status = Manageable::STATUS_OK; - else + else return Manageable::STATUS_PARAMETER_INVALID; break; } diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp index 58dcc6d7c7..11970db394 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -75,7 +75,7 @@ void DeliveryRecord::deliver(framing::FrameHandler& h, DeliveryId deliveryId, ui { id = deliveryId; if (msg.payload->getRedelivered()){ - msg.payload->getProperties<framing::DeliveryProperties>()->setRedelivered(true); + msg.payload->setRedelivered(); } msg.payload->adjustTtl(); diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index 622cc81002..d68845062d 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -58,7 +58,7 @@ Exchange::PreRoute::PreRoute(Deliverable& msg, Exchange* _p):parent(_p) { if (parent->sequence){ parent->sequenceNo++; - msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo); + msg.getMessage().insertCustomProperty(qpidMsgSequence,parent->sequenceNo); } if (parent->ive) { parent->lastMsg = &( msg.getMessage()); @@ -390,7 +390,7 @@ bool Exchange::MatchQueue::operator()(Exchange::Binding::shared_ptr b) } void Exchange::setProperties(const boost::intrusive_ptr<Message>& msg) { - msg->getProperties<DeliveryProperties>()->setExchange(getName()); + msg->setExchange(getName()); } bool Exchange::routeWithAlternate(Deliverable& msg) diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index d694d1eafd..28886826ca 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -30,7 +30,9 @@ #include "qpid/framing/SendContent.h" #include "qpid/framing/SequenceNumber.h" #include "qpid/framing/TypeFilter.h" +#include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" +#include <boost/pointer_cast.hpp> #include <time.h> @@ -51,18 +53,9 @@ Message::Message(const framing::SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), expiration(FAR_FUTURE), dequeueCallback(0), - inCallback(false), requiredCredit(0), isManagementMessage(false) + inCallback(false), requiredCredit(0), isManagementMessage(false), copyHeaderOnWrite(false) {} -Message::Message(const Message& original) : - PersistableMessage(), frames(original.frames), persistenceId(0), redelivered(false), loaded(false), - staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), - expiration(original.expiration), dequeueCallback(0), - inCallback(false), requiredCredit(0) -{ - setExpiryPolicy(original.expiryPolicy); -} - Message::~Message() {} void Message::forcePersistent() @@ -288,6 +281,9 @@ void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/) sys::Mutex::ScopedLock l(lock); Relay f(out); frames.map_if(f, TypeFilter<HEADER_BODY>()); + //as frame (and pointer to body) has now been passed to handler, + //subsequent modifications should use a copy + copyHeaderOnWrite = true; } // TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over @@ -342,11 +338,30 @@ bool Message::isExcluded(const std::vector<std::string>& excludes) const return false; } +class CloneHeaderBody +{ +public: + void operator()(AMQFrame& f) + { + f.cloneBody(); + } +}; + +AMQHeaderBody* Message::getHeaderBody() +{ + if (copyHeaderOnWrite) { + CloneHeaderBody f; + frames.map_if(f, TypeFilter<HEADER_BODY>()); + copyHeaderOnWrite = false; + } + return frames.getHeaders(); +} + void Message::addTraceId(const std::string& id) { sys::Mutex::ScopedLock l(lock); if (isA<MessageTransferBody>()) { - FieldTable& headers = getProperties<MessageProperties>()->getApplicationHeaders(); + FieldTable& headers = getModifiableProperties<MessageProperties>()->getApplicationHeaders(); std::string trace = headers.getAsString(X_QPID_TRACE); if (trace.empty()) { headers.setString(X_QPID_TRACE, id); @@ -360,7 +375,8 @@ void Message::addTraceId(const std::string& id) void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) { - DeliveryProperties* props = getProperties<DeliveryProperties>(); + sys::Mutex::ScopedLock l(lock); + DeliveryProperties* props = getModifiableProperties<DeliveryProperties>(); if (props->getTtl()) { // AMQP requires setting the expiration property to be posix // time_t in seconds. TTL is in milliseconds @@ -382,9 +398,9 @@ void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) void Message::adjustTtl() { - DeliveryProperties* props = getProperties<DeliveryProperties>(); + sys::Mutex::ScopedLock l(lock); + DeliveryProperties* props = getModifiableProperties<DeliveryProperties>(); if (props->getTtl()) { - sys::Mutex::ScopedLock l(lock); if (expiration < FAR_FUTURE) { sys::AbsTime current( expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now()); @@ -395,6 +411,42 @@ void Message::adjustTtl() } } +void Message::setRedelivered() +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<framing::DeliveryProperties>()->setRedelivered(true); +} + +void Message::insertCustomProperty(const std::string& key, int64_t value) +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<MessageProperties>()->getApplicationHeaders().setInt64(key,value); +} + +void Message::insertCustomProperty(const std::string& key, const std::string& value) +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<MessageProperties>()->getApplicationHeaders().setString(key,value); +} + +void Message::removeCustomProperty(const std::string& key) +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<MessageProperties>()->getApplicationHeaders().erase(key); +} + +void Message::setExchange(const std::string& exchange) +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<DeliveryProperties>()->setExchange(exchange); +} + +void Message::clearApplicationHeadersFlag() +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<MessageProperties>()->clearApplicationHeadersFlag(); +} + void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; } @@ -442,11 +494,6 @@ uint8_t Message::getPriority() const { return getAdapter().getPriority(frames); } -framing::FieldTable& Message::getOrInsertHeaders() -{ - return getProperties<MessageProperties>()->getApplicationHeaders(); -} - bool Message::getIsManagementMessage() const { return isManagementMessage; } void Message::setIsManagementMessage(bool b) { isManagementMessage = b; } diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index e1d6c60a80..a9cb246a6f 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -29,13 +29,17 @@ #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" #include <boost/function.hpp> +#include <boost/intrusive_ptr.hpp> #include <boost/shared_ptr.hpp> +#include <memory> #include <string> #include <vector> namespace qpid { namespace framing { +class AMQBody; +class AMQHeaderBody; class FieldTable; class SequenceNumber; } @@ -53,7 +57,6 @@ public: typedef boost::function<void (const boost::intrusive_ptr<Message>&)> MessageCallback; QPID_BROKER_EXTERN Message(const framing::SequenceNumber& id = framing::SequenceNumber()); - QPID_BROKER_EXTERN Message(const Message&); QPID_BROKER_EXTERN ~Message(); uint64_t getPersistenceId() const { return persistenceId; } @@ -75,7 +78,6 @@ public: bool isImmediate() const; QPID_BROKER_EXTERN const framing::FieldTable* getApplicationHeaders() const; QPID_BROKER_EXTERN std::string getAppId() const; - framing::FieldTable& getOrInsertHeaders(); QPID_BROKER_EXTERN bool isPersistent() const; bool requiresAccept(); @@ -85,18 +87,19 @@ public: sys::AbsTime getExpiration() const { return expiration; } void setExpiration(sys::AbsTime exp) { expiration = exp; } void adjustTtl(); + void setRedelivered(); + void insertCustomProperty(const std::string& key, int64_t value); + void insertCustomProperty(const std::string& key, const std::string& value); + void removeCustomProperty(const std::string& key); + void setExchange(const std::string&); + void clearApplicationHeadersFlag(); framing::FrameSet& getFrames() { return frames; } const framing::FrameSet& getFrames() const { return frames; } - template <class T> T* getProperties() { - qpid::framing::AMQHeaderBody* p = frames.getHeaders(); - return p->get<T>(true); - } - template <class T> const T* getProperties() const { const qpid::framing::AMQHeaderBody* p = frames.getHeaders(); - return p->get<T>(true); + return p->get<T>(); } template <class T> const T* hasProperties() const { @@ -156,9 +159,8 @@ public: bool isExcluded(const std::vector<std::string>& excludes) const; void addTraceId(const std::string& id); - void forcePersistent(); - bool isForcedPersistent(); - + void forcePersistent(); + bool isForcedPersistent(); /** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */ void setDequeueCompleteCallback(MessageCallback& cb); @@ -178,7 +180,7 @@ public: bool redelivered; bool loaded; bool staged; - bool forcePersistentPolicy; // used to force message as durable, via a broker policy + bool forcePersistentPolicy; // used to force message as durable, via a broker policy ConnectionToken* publisher; mutable MessageAdapter* adapter; qpid::sys::AbsTime expiration; @@ -194,6 +196,15 @@ public: uint32_t requiredCredit; bool isManagementMessage; + mutable bool copyHeaderOnWrite; + + /** + * Expects lock to be held + */ + template <class T> T* getModifiableProperties() { + return getHeaderBody()->get<T>(true); + } + qpid::framing::AMQHeaderBody* getHeaderBody(); }; }} diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 42923567a2..dd3f982699 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -525,7 +525,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ { Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); - if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); + if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence); dequeueRequired = messages->push(qm, removed); listeners.populate(copy); @@ -627,11 +627,6 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg } if (traceId.size()) { - //copy on write: take deep copy of message before modifying it - //as the frames may already be available for delivery on other - //threads - boost::intrusive_ptr<Message> copy(new Message(*msg)); - msg = copy; msg->addTraceId(traceId); } diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index afe5b8ac3a..f306517d37 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -318,22 +318,22 @@ class MessageUpdater { lastPos = message.position; // if the ttl > 0, we need to send the calculated expiration time to the updatee - if (message.payload->getProperties<DeliveryProperties>()->getTtl() > 0) { + const DeliveryProperties* dprops = + message.payload->getProperties<DeliveryProperties>(); + if (dprops && dprops->getTtl() > 0) { bool hadMessageProps = message.payload->hasProperties<framing::MessageProperties>(); - framing::MessageProperties* mprops = + const framing::MessageProperties* mprops = message.payload->getProperties<framing::MessageProperties>(); bool hadApplicationHeaders = mprops->hasApplicationHeaders(); - FieldTable& applicationHeaders = mprops->getApplicationHeaders(); - applicationHeaders.setInt64( - UpdateClient::X_QPID_EXPIRATION, - sys::Duration(sys::EPOCH, message.payload->getExpiration())); + message.payload->insertCustomProperty(UpdateClient::X_QPID_EXPIRATION, + sys::Duration(sys::EPOCH, message.payload->getExpiration())); // If message properties or application headers didn't exist // prior to us adding data, we want to remove them on the other side. if (!hadMessageProps) - applicationHeaders.setInt(UpdateClient::X_QPID_NO_MESSAGE_PROPS, 0); + message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_MESSAGE_PROPS, 0); else if (!hadApplicationHeaders) - applicationHeaders.setInt(UpdateClient::X_QPID_NO_HEADERS, 0); + message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_HEADERS, 0); } // We can't send a broker::Message via the normal client API, diff --git a/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp b/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp index e830459aba..cb1376004e 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp @@ -49,18 +49,18 @@ void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>& // Copy expiration from x-property if present. if (msg->hasProperties<MessageProperties>()) { - MessageProperties* mprops = msg->getProperties<MessageProperties>(); + const MessageProperties* mprops = msg->getProperties<MessageProperties>(); if (mprops->hasApplicationHeaders()) { - FieldTable& headers = mprops->getApplicationHeaders(); + const FieldTable& headers = mprops->getApplicationHeaders(); if (headers.isSet(UpdateClient::X_QPID_EXPIRATION)) { msg->setExpiration( sys::AbsTime(sys::EPOCH, headers.getAsInt64(UpdateClient::X_QPID_EXPIRATION))); - headers.erase(UpdateClient::X_QPID_EXPIRATION); + msg->removeCustomProperty(UpdateClient::X_QPID_EXPIRATION); // Erase props/headers that were added by the UpdateClient if (headers.isSet(UpdateClient::X_QPID_NO_MESSAGE_PROPS)) msg->eraseProperties<MessageProperties>(); else if (headers.isSet(UpdateClient::X_QPID_NO_HEADERS)) - mprops->clearApplicationHeadersFlag(); + msg->clearApplicationHeadersFlag(); } } } diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.cpp b/qpid/cpp/src/qpid/framing/AMQFrame.cpp index cd60cd971f..5b9673f0d0 100644 --- a/qpid/cpp/src/qpid/framing/AMQFrame.cpp +++ b/qpid/cpp/src/qpid/framing/AMQFrame.cpp @@ -139,6 +139,11 @@ bool AMQFrame::decode(Buffer& buffer) return true; } +void AMQFrame::cloneBody() +{ + body = body->clone(); +} + std::ostream& operator<<(std::ostream& out, const AMQFrame& f) { return diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.h b/qpid/cpp/src/qpid/framing/AMQFrame.h index c669d12bc0..59c4a7501f 100644 --- a/qpid/cpp/src/qpid/framing/AMQFrame.h +++ b/qpid/cpp/src/qpid/framing/AMQFrame.h @@ -59,6 +59,11 @@ class QPID_COMMON_CLASS_EXTERN AMQFrame : public AMQDataBlock return boost::polymorphic_downcast<const T*>(getBody()); } + /** + * Take a deep copy of the body currently referenced + */ + void cloneBody(); + QPID_COMMON_EXTERN void encode(Buffer& buffer) const; QPID_COMMON_EXTERN bool decode(Buffer& buffer); QPID_COMMON_EXTERN uint32_t encodedSize() const; diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 923005b9fc..50fdc82ee0 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -614,7 +614,7 @@ void ManagementAgent::sendBufferLH(const string& data, props->setAppId("qmf2"); for (i = headers.begin(); i != headers.end(); ++i) { - msg->getOrInsertHeaders().setString(i->first, i->second.asString()); + msg->insertCustomProperty(i->first, i->second.asString()); } DeliveryProperties* dp = diff --git a/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp b/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp index b7d52372f4..0ced4d9161 100644 --- a/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp +++ b/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp @@ -69,10 +69,9 @@ void ReplicatingEventListener::deliverDequeueMessage(const QueuedMessage& dequeu void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage& enqueued) { boost::intrusive_ptr<Message> msg(cloneMessage(*(enqueued.queue), enqueued.payload)); - FieldTable& headers = msg->getProperties<MessageProperties>()->getApplicationHeaders(); - headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName()); - headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE); - headers.setInt(QUEUE_MESSAGE_POSITION,enqueued.position); + msg->insertCustomProperty(REPLICATION_TARGET_QUEUE, enqueued.queue->getName()); + msg->insertCustomProperty(REPLICATION_EVENT_TYPE, ENQUEUE); + msg->insertCustomProperty(QUEUE_MESSAGE_POSITION,enqueued.position); route(msg); } diff --git a/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp b/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp index 4b6d25ac7d..89a2bf516d 100644 --- a/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp +++ b/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp @@ -97,11 +97,10 @@ void ReplicationExchange::handleEnqueueEvent(const FieldTable* args, Deliverable } else { queue->setPosition(seqno1); - FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders(); - headers.erase(REPLICATION_TARGET_QUEUE); - headers.erase(REPLICATION_EVENT_SEQNO); - headers.erase(REPLICATION_EVENT_TYPE); - headers.erase(QUEUE_MESSAGE_POSITION); + msg.getMessage().removeCustomProperty(REPLICATION_TARGET_QUEUE); + msg.getMessage().removeCustomProperty(REPLICATION_EVENT_SEQNO); + msg.getMessage().removeCustomProperty(REPLICATION_EVENT_TYPE); + msg.getMessage().removeCustomProperty(QUEUE_MESSAGE_POSITION); msg.deliverTo(queue); QPID_LOG(debug, "Enqueued replicated message onto " << queueName); if (mgmtExchange != 0) { diff --git a/qpid/cpp/src/tests/ExchangeTest.cpp b/qpid/cpp/src/tests/ExchangeTest.cpp index 88a1cd99c2..fe72f42a46 100644 --- a/qpid/cpp/src/tests/ExchangeTest.cpp +++ b/qpid/cpp/src/tests/ExchangeTest.cpp @@ -253,7 +253,7 @@ QPID_AUTO_TEST_CASE(testIVEOption) TopicExchange topic ("topic1", false, args); intrusive_ptr<Message> msg1 = cmessage("direct1", "abc"); - msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString("a", "abc"); + msg1->insertCustomProperty("a", "abc"); DeliverableMessage dmsg1(msg1); FieldTable args2; diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index ed97c41bff..1a6e56dbfd 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -298,7 +298,7 @@ TESTS_ENVIRONMENT = \ $(srcdir)/run_test system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest -TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests \ +TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_federation_sys_tests \ run_acl_tests run_cli_tests replication_test dynamic_log_level_test \ run_queue_flow_limit_tests @@ -315,6 +315,8 @@ EXTRA_DIST += \ config.null \ ais_check \ run_federation_tests \ + run_federation_sys_tests \ + run_long_federation_sys_tests \ run_cli_tests \ run_acl_tests \ .valgrind.supp \ @@ -352,6 +354,7 @@ CLEANFILES+=valgrind.out *.log *.vglog* dummy_test qpidd.port $(unit_wrappers) # Not run under valgrind, too slow LONG_TESTS+=start_broker fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test stop_broker \ + run_long_federation_sys_tests \ run_failover_soak reliable_replication_test \ federated_cluster_test_with_node_failure diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index 6fdc4c69ad..d94a5cab7f 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -81,13 +81,14 @@ public: Message& getMessage() { return *(msg.get()); } }; -intrusive_ptr<Message> create_message(std::string exchange, std::string routingKey) { +intrusive_ptr<Message> create_message(std::string exchange, std::string routingKey, uint64_t ttl = 0) { intrusive_ptr<Message> msg(new Message()); AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0))); AMQFrame header((AMQHeaderBody())); msg->getFrames().append(method); msg->getFrames().append(header); msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); + if (ttl) msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setTtl(ttl); return msg; } @@ -437,10 +438,10 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){ BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); - msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); - msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b"); - msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c"); - msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); + msg1->insertCustomProperty(key,"a"); + msg2->insertCustomProperty(key,"b"); + msg3->insertCustomProperty(key,"c"); + msg4->insertCustomProperty(key,"a"); //enqueue 4 message queue->deliver(msg1); @@ -462,9 +463,9 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){ intrusive_ptr<Message> msg5 = create_message("e", "A"); intrusive_ptr<Message> msg6 = create_message("e", "B"); intrusive_ptr<Message> msg7 = create_message("e", "C"); - msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); - msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b"); - msg7->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c"); + msg5->insertCustomProperty(key,"a"); + msg6->insertCustomProperty(key,"b"); + msg7->insertCustomProperty(key,"c"); queue->deliver(msg5); queue->deliver(msg6); queue->deliver(msg7); @@ -499,7 +500,7 @@ QPID_AUTO_TEST_CASE(testLVQEmptyKey){ BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); - msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); + msg1->insertCustomProperty(key,"a"); queue->deliver(msg1); queue->deliver(msg2); BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u); @@ -531,12 +532,12 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){ BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); - msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); - msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b"); - msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c"); - msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); - msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b"); - msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c"); + msg1->insertCustomProperty(key,"a"); + msg2->insertCustomProperty(key,"b"); + msg3->insertCustomProperty(key,"c"); + msg4->insertCustomProperty(key,"a"); + msg5->insertCustomProperty(key,"b"); + msg6->insertCustomProperty(key,"c"); //enqueue 4 message queue->deliver(msg1); @@ -600,8 +601,8 @@ QPID_AUTO_TEST_CASE(testLVQMultiQueue){ args.getLVQKey(key); BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); - msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); - msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); + msg1->insertCustomProperty(key,"a"); + msg2->insertCustomProperty(key,"a"); queue1->deliver(msg1); queue2->deliver(msg1); @@ -644,8 +645,8 @@ QPID_AUTO_TEST_CASE(testLVQRecover){ args.getLVQKey(key); BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); - msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); - msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); + msg1->insertCustomProperty(key,"a"); + msg2->insertCustomProperty(key,"a"); // 3 queue1->deliver(msg1); // 4 @@ -665,12 +666,7 @@ QPID_AUTO_TEST_CASE(testLVQRecover){ void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0) { for (uint i = 0; i < count; i++) { - intrusive_ptr<Message> m = create_message("exchange", "key"); - if (i % 2) { - if (oddTtl) m->getProperties<DeliveryProperties>()->setTtl(oddTtl); - } else { - if (evenTtl) m->getProperties<DeliveryProperties>()->setTtl(evenTtl); - } + intrusive_ptr<Message> m = create_message("exchange", "key", i % 2 ? oddTtl : evenTtl); m->setTimestamp(new broker::ExpiryPolicy); queue.deliver(m); } diff --git a/qpid/cpp/src/tests/TxPublishTest.cpp b/qpid/cpp/src/tests/TxPublishTest.cpp index 210abf0a5b..152581e4ba 100644 --- a/qpid/cpp/src/tests/TxPublishTest.cpp +++ b/qpid/cpp/src/tests/TxPublishTest.cpp @@ -50,10 +50,9 @@ struct TxPublishTest TxPublishTest() : queue1(new Queue("queue1", false, &store, 0)), queue2(new Queue("queue2", false, &store, 0)), - msg(MessageUtils::createMessage("exchange", "routing_key", false, "id")), + msg(MessageUtils::createMessage("exchange", "routing_key", true)), op(msg) { - msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT); op.deliverTo(queue1); op.deliverTo(queue2); } diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 4a98c638a2..fd972b4394 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -251,7 +251,7 @@ class Broker(Popen): def get_log(self): return os.path.abspath(self.log) - def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None): + def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None, show_cmd=False): """Start a broker daemon. name determines the data-dir and log file names.""" @@ -280,6 +280,7 @@ class Broker(Popen): cmd += ["--log-enable=%s" % log_level] self.datadir = self.name cmd += ["--data-dir", self.datadir] + if show_cmd: print cmd Popen.__init__(self, cmd, expect, stdout=PIPE) test.cleanup_stop(self) self._host = "127.0.0.1" @@ -400,7 +401,7 @@ class Cluster: _cluster_count = 0 - def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True): + def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False): self.test = test self._brokers=[] self.name = "cluster%d" % Cluster._cluster_count @@ -411,16 +412,16 @@ class Cluster: self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"] assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in" self.args += [ "--load-module", BrokerTest.cluster_lib ] - self.start_n(count, expect=expect, wait=wait) + self.start_n(count, expect=expect, wait=wait, show_cmd=show_cmd) - def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0): + def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0, show_cmd=False): """Add a broker to the cluster. Returns the index of the new broker.""" if not name: name="%s-%d" % (self.name, len(self._brokers)) - self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port)) + self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port, show_cmd=show_cmd)) return self._brokers[-1] - def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[]): - for i in range(count): self.start(expect=expect, wait=wait, args=args) + def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[], show_cmd=False): + for i in range(count): self.start(expect=expect, wait=wait, args=args, show_cmd=show_cmd) # Behave like a list of brokers. def __len__(self): return len(self._brokers) @@ -477,18 +478,18 @@ class BrokerTest(TestCase): self.cleanup_stop(p) return p - def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, log_level=None): + def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, log_level=None, show_cmd=False): """Create and return a broker ready for use""" - b = Broker(self, args=args, name=name, expect=expect, port=port, log_level=log_level) + b = Broker(self, args=args, name=name, expect=expect, port=port, log_level=log_level, show_cmd=show_cmd) if (wait): try: b.ready() except Exception, e: raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e)) return b - def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True): + def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False): """Create and return a cluster ready for use""" - cluster = Cluster(self, count, args, expect=expect, wait=wait) + cluster = Cluster(self, count, args, expect=expect, wait=wait, show_cmd=show_cmd) return cluster def browse(self, session, queue, timeout=0): diff --git a/qpid/cpp/src/tests/federation_sys.py b/qpid/cpp/src/tests/federation_sys.py new file mode 100755 index 0000000000..8c963d6aa9 --- /dev/null +++ b/qpid/cpp/src/tests/federation_sys.py @@ -0,0 +1,2180 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from inspect import stack +from qpid import messaging +from qpid.messaging import Message +from qpid.messaging.exceptions import Empty +from qpid.testlib import TestBase010 +from random import randint +from sys import stdout +from time import sleep + + +class Enum(object): + def __init__(self, **entries): + self.__dict__.update(entries) + def __repr__(self): + args = ['%s=%s' % (k, repr(v)) for (k,v) in vars(self).items()] + return 'Enum(%s)' % ', '.join(args) + + +class QmfTestBase010(TestBase010): + + _brokers = [] + _links = [] + _bridges = [] + _alt_exch_ops = Enum(none=0, create=1, delete=2) + + class _Broker(object): + """ + This broker proxy object holds the Qmf proxy to a broker of known address as well as the QMF broker + object, connection and sessions to the broker. + """ + def __init__(self, url): + self.url = url # format: "host:port" + url_parts = url.split(':') + self.host = url_parts[0] + self.port = int(url_parts[1]) + self.qmf_broker = None + self.connection = messaging.Connection.establish(self.url) + self.sessions = [] + def __str__(self): + return "_Broker %s:%s (%d open sessions)" % (self.host, self.port, len(self.sessions)) + def destroy(self, qmf = None): + if qmf is not None: + qmf.delBroker(self.qmf_broker.getBroker()) + for session in self.sessions: + try: # Session may have been closed by broker error + session.close() + except Exception as (e): print "WARNING: %s: Unable to close session %s (%s): %s %s" % (self, session, hex(id(session)), type(e), e) + try: # Connection may have been closed by broker error + self.connection.close() + except Exception as (e): print "WARNING: %s: Unable to close connection %s (%s): %s %s" % (self, self.connection, hex(id(self.connection)), type(e), e) + def session(self, name, transactional_flag = False): + session = self.connection.session(name, transactional_flag) + self.sessions.append(session) + return session + + def setUp(self): + """ + Called one before each test starts + """ + TestBase010.setUp(self) + self.startQmf(); + + def tearDown(self): + """ + Called once after each test competes. Close all Qmf objects (bridges, links and brokers) + """ + while len(self._bridges): + self._bridges.pop().close() + while len(self._links): + self._links.pop().close() + while len(self._brokers): + b = self._brokers.pop() + if len(self._brokers) <= 1: + b.destroy(None) + else: + b.destroy(self.qmf) + TestBase010.tearDown(self) + self.qmf.close() + + #--- General test utility functions + + def _get_name(self): + """ + Return the name of method which called this method stripped of "test_" prefix. Used for naming + queues and exchanges on a per-test basis. + """ + return stack()[1][3][5:] + + def _get_broker_port(self, key): + """ + Get the port of a broker defined in the environment using -D<key>=portno + """ + return int(self.defines[key]) + + def _get_cluster_ports(self, key): + """ + Get the cluster ports from the parameters of the test which place it in the environment using + -D<key>="port0 port1 ... portN" (space-separated) + """ + ports = [] + ports_str = self.defines[key] + if ports_str: + for p in ports_str.split(): + ports.append(int(p)) + return ports + + def _get_send_address(self, exch_name, queue_name): + """ + Get an address to which to send messages based on the exchange name and queue name, but taking into account + that the exchange name may be "" (the default exchange), in whcih case the format changes slightly. + """ + if len(exch_name) == 0: # Default exchange + return queue_name + return "%s/%s" % (exch_name, queue_name) + + def _get_broker(self, cluster_flag, broker_port_key, cluster_ports_key): + """ + Read the port numbers for pre-started brokers from the environment using keys, then find or create and return + the Qmf broker proxy for the appropriate broker + """ + if cluster_flag: + port = self._get_cluster_ports(cluster_ports_key)[0] # Always use the first node in the cluster + else: + port = self._get_broker_port(broker_port_key) + return self._find_create_broker("localhost:%s" % port) + + def _get_msg_subject(self, topic_key): + """ + Return an appropriate subject for sending a message to a known topic. Return None if there is no topic. + """ + if len(topic_key) == 0: return None + if "*" in topic_key: return topic_key.replace("*", "test") + if "#" in topic_key: return topic_key.replace("#", "multipart.test") + return topic_key + + def _send_msgs(self, session_name, broker, addr, msg_count, msg_content = "Message_%03d", topic_key = "", + msg_durable_flag = False, enq_txn_size = 0): + """ + Send messages to a broker using address addr + """ + send_session = broker.session(session_name, transactional_flag = enq_txn_size > 0) + sender = send_session.sender(addr) + txn_cnt = 0 + for i in range(0, msg_count): + sender.send(Message(msg_content % (i + 1), subject = self._get_msg_subject(topic_key), durable = msg_durable_flag)) + if enq_txn_size > 0: + txn_cnt += 1 + if txn_cnt >= enq_txn_size: + send_session.commit() + txn_cnt = 0 + if enq_txn_size > 0 and txn_cnt > 0: + send_session.commit() + sender.close() + send_session.close() + + def _receive_msgs(self, session_name, broker, addr, msg_count, msg_content = "Message_%03d", deq_txn_size = 0, + timeout = 0): + """ + Receive messages from a broker + """ + receive_session = broker.session(session_name, transactional_flag = deq_txn_size > 0) + receiver = receive_session.receiver(addr) + txn_cnt = 0 + for i in range(0, msg_count): + try: + msg = receiver.fetch(timeout = timeout) + if deq_txn_size > 0: + txn_cnt += 1 + if txn_cnt >= deq_txn_size: + receive_session.commit() + txn_cnt = 0 + receive_session.acknowledge() + except Empty: + if deq_txn_size > 0: receive_session.rollback() + receiver.close() + receive_session.close() + if i == 0: + self.fail("Broker %s queue \"%s\" is empty" % (broker.qmf_broker.getBroker().getUrl(), addr)) + else: + self.fail("Unable to receive message %d from broker %s queue \"%s\"" % (i, broker.qmf_broker.getBroker().getUrl(), addr)) + if msg.content != msg_content % (i + 1): + receiver.close() + receive_session.close() + self.fail("Unexpected message \"%s\", was expecting \"%s\"." % (msg.content, msg_content % (i + 1))) + try: + msg = receiver.fetch(timeout = 0) + if deq_txn_size > 0: receive_session.rollback() + receiver.close() + receive_session.close() + self.fail("Extra message \"%s\" found on broker %s address \"%s\"" % (msg.content, broker.qmf_broker.getBroker().getUrl(), addr)) + except Empty: + pass + if deq_txn_size > 0 and txn_cnt > 0: + receive_session.commit() + receiver.close() + receive_session.close() + + #--- QMF-specific utility functions + + def _get_qmf_property(self, props, key): + """ + Get the value of a named property key kj from a property list [(k0, v0), (k1, v1), ... (kn, vn)]. + """ + for k,v in props: + if k.name == key: + return v + return None + + def _check_qmf_return(self, method_result): + """ + Check the result of a Qmf-defined method call + """ + self.assertTrue(method_result.status == 0, method_result.text) + + def _check_optional_qmf_property(self, qmf_broker, type, qmf_object, key, expected_val, obj_ref_flag): + """ + Optional Qmf properties don't show up in the properties list when they are not specified. Checks for + these property types involve searching the properties list and making sure it is present or not as + expected. + """ + val = self._get_qmf_property(qmf_object.getProperties(), key) + if val is None: + if len(expected_val) > 0: + self.fail("%s %s exists, but has does not have %s property. Expected value: \"%s\"" % + (type, qmf_object.name, key, expected_val)) + else: + if len(expected_val) > 0: + if obj_ref_flag: + obj = self.qmf.getObjects(_objectId = val, _broker = qmf_broker.getBroker()) + self.assertEqual(len(obj), 1, "More than one object with the same objectId: %s" % obj) + val = obj[0].name + self.assertEqual(val, expected_val, "%s %s exists, but has incorrect %s property. Found \"%s\", expected \"%s\"" % + (type, qmf_object.name, key, val, expected_val)) + else: + self.fail("%s %s exists, but has an unexpected %s property \"%s\" set." % (type, qmf_object.name, key, val)) + + #--- Find/create Qmf broker objects + + def _find_qmf_broker(self, url): + """ + Find the Qmf broker object for the given broker URL. The broker must have been previously added to Qmf through + addBroker() + """ + for b in self.qmf.getObjects(_class="broker"): + if b.getBroker().getUrl() == url: + return b + return None + + def _find_create_broker(self, url): + """ + Find a running broker through Qmf. If it does not exist, add it (assuming the broker is already running). + """ + broker = self._Broker(url) + self._brokers.append(broker) + if self.qmf is not None: + qmf_broker = self._find_qmf_broker(broker.url) + if qmf_broker is None: + self.qmf.addBroker(broker.url) + broker.qmf_broker = self._find_qmf_broker(broker.url) + else: + broker.qmf_broker = qmf_broker + return broker + + #--- Find/create/delete exchanges + + def _find_qmf_exchange(self, qmf_broker, name, type, alternate, durable, auto_delete): + """ + Find Qmf exchange object + """ + for e in self.qmf.getObjects(_class="exchange", _broker = qmf_broker.getBroker()): + if e.name == name: + if len(name) == 0 or (len(name) >= 4 and name[:4] == "amq."): return e # skip checks for special exchanges + self.assertEqual(e.type, type, + "Exchange \"%s\" exists, but is of unexpected type %s; expected type %s." % + (name, e.type, type)) + self._check_optional_qmf_property(qmf_broker, "Exchange", e, "altExchange", alternate, True) + self.assertEqual(e.durable, durable, + "Exchange \"%s\" exists, but has incorrect durability. Found durable=%s, expected durable=%s" % + (name, e.durable, durable)) + self.assertEqual(e.autoDelete, auto_delete, + "Exchange \"%s\" exists, but has incorrect auto-delete property. Found %s, expected %s" % + (name, e.autoDelete, auto_delete)) + return e + return None + + def _find_create_qmf_exchange(self, qmf_broker, name, type, alternate, durable, auto_delete, args): + """ + Find Qmf exchange object if exchange exists, create exchange and return its Qmf object if not + """ + e = self._find_qmf_exchange(qmf_broker, name, type, alternate, durable, auto_delete) + if e is not None: return e + # Does not exist, so create it + props = dict({"exchange-type": type, "type": type, "durable": durable, "auto-delete": auto_delete, "alternate-exchange": alternate}, **args) + self._check_qmf_return(qmf_broker.create(type="exchange", name=name, properties=props, strict=True)) + e = self._find_qmf_exchange(qmf_broker, name, type, alternate, durable, auto_delete) + self.assertNotEqual(e, None, "Creation of exchange %s on broker %s failed" % (name, qmf_broker.getBroker().getUrl())) + return e + + def _find_delete_qmf_exchange(self, qmf_broker, name, type, alternate, durable, auto_delete): + """ + Find and delete Qmf exchange object if it exists + """ + e = self._find_qmf_exchange(qmf_broker, name, type, alternate, durable, auto_delete) + if e is not None and not auto_delete: + self._check_qmf_return(qmf_broker.delete(type="exchange", name=name, options={})) + + #--- Find/create/delete queues + + def _find_qmf_queue(self, qmf_broker, name, alternate_exchange, durable, exclusive, auto_delete): + """ + Find a Qmf queue object + """ + for q in self.qmf.getObjects(_class="queue", _broker = qmf_broker.getBroker()): + if q.name == name: + self._check_optional_qmf_property(qmf_broker, "Queue", q, "altExchange", alternate_exchange, True) + self.assertEqual(q.durable, durable, + "Queue \"%s\" exists, but has incorrect durable property. Found %s, expected %s" % + (name, q.durable, durable)) + self.assertEqual(q.exclusive, exclusive, + "Queue \"%s\" exists, but has incorrect exclusive property. Found %s, expected %s" % + (name, q.exclusive, exclusive)) + self.assertEqual(q.autoDelete, auto_delete, + "Queue \"%s\" exists, but has incorrect auto-delete property. Found %s, expected %s" % + (name, q.autoDelete, auto_delete)) + return q + return None + + def _find_create_qmf_queue(self, qmf_broker, name, alternate_exchange, durable, exclusive, auto_delete, args): + """ + Find Qmf queue object if queue exists, create queue and return its Qmf object if not + """ + q = self._find_qmf_queue(qmf_broker, name, alternate_exchange, durable, exclusive, auto_delete) + if q is not None: return q + # Queue does not exist, so create it + props = dict({"durable": durable, "auto-delete": auto_delete, "exclusive": exclusive, "alternate-exchange": alternate_exchange}, **args) + self._check_qmf_return(qmf_broker.create(type="queue", name=name, properties=props, strict=True)) + q = self._find_qmf_queue(qmf_broker, name, alternate_exchange, durable, exclusive, auto_delete) + self.assertNotEqual(q, None, "Creation of queue %s on broker %s failed" % (name, qmf_broker.getBroker().getUrl())) + return q + + def _find_delete_qmf_queue(self, qmf_broker, name, alternate_exchange, durable, exclusive, auto_delete, args): + """ + Find and delete Qmf queue object if it exists + """ + q = self._find_qmf_queue(qmf_broker, name, alternate_exchange, durable, exclusive, auto_delete) + if q is not None and not auto_delete: + self._check_qmf_return(qmf_broker.delete(type="queue", name=name, options={})) + + #--- Find/create/delete bindings (between an exchange and a queue) + + def _find_qmf_binding(self, qmf_broker, qmf_exchange, qmf_queue, binding_key, binding_args): + """ + Find a Qmf binding object + """ + for b in self.qmf.getObjects(_class="binding", _broker = qmf_broker.getBroker()): + if b.exchangeRef == qmf_exchange.getObjectId() and b.queueRef == qmf_queue.getObjectId(): + if qmf_exchange.type != "fanout": # Fanout ignores the binding key, and always returns "" as the key + self.assertEqual(b.bindingKey, binding_key, + "Binding between exchange %s and queue %s exists, but has mismatching binding key: Found %s, expected %s." % + (qmf_exchange.name, qmf_queue.name, b.bindingKey, binding_key)) + self.assertEqual(b.arguments, binding_args, + "Binding between exchange %s and queue %s exists, but has mismatching arguments: Found %s, expected %s" % + (qmf_exchange.name, qmf_queue.name, b.arguments, binding_args)) + return b + return None + + def _find_create_qmf_binding(self, qmf_broker, qmf_exchange, qmf_queue, binding_key, binding_args): + """ + Find Qmf binding object if it exists, create binding and return its Qmf object if not + """ + b = self._find_qmf_binding(qmf_broker, qmf_exchange, qmf_queue, binding_key, binding_args) + if b is not None: return b + # Does not exist, so create it + self._check_qmf_return(qmf_broker.create(type="binding", name="%s/%s/%s" % (qmf_exchange.name, qmf_queue.name, binding_key), properties=binding_args, strict=True)) + b = self._find_qmf_binding(qmf_broker, qmf_exchange, qmf_queue, binding_key, binding_args) + self.assertNotEqual(b, None, "Creation of binding between exchange %s and queue %s with key %s failed" % + (qmf_exchange.name, qmf_queue.name, binding_key)) + return b + + def _find_delete_qmf_binding(self, qmf_broker, qmf_exchange, qmf_queue, binding_key, binding_args): + """ + Find and delete Qmf binding object if it exists + """ + b = self._find_qmf_binding(qmf_broker, qmf_exchange, qmf_queue, binding_key, binding_args) + if b is not None: + if len(qmf_exchange.name) > 0: # not default exchange + self._check_qmf_return(qmf_broker.delete(type="binding", name="%s/%s/%s" % (qmf_exchange.name, qmf_queue.name, binding_key), options={})) + + #--- Find/create a link + + def _find_qmf_link(self, qmf_from_broker_proxy, host, port): + """ + Find a Qmf link object + """ + for l in self.qmf.getObjects(_class="link", _broker=qmf_from_broker_proxy): + if l.host == host and l.port == port: + return l + return None + + def _find_create_qmf_link(self, qmf_from_broker, qmf_to_broker_proxy, link_durable_flag, auth_mechanism, user_id, + password, transport, pause_interval, link_ready_timeout): + """ + Find a Qmf link object if it exists, create it and return its Qmf link object if not + """ + to_broker_host = qmf_to_broker_proxy.host + to_broker_port = qmf_to_broker_proxy.port + l = self._find_qmf_link(qmf_from_broker.getBroker(), to_broker_host, to_broker_port) + if l is not None: return l + # Does not exist, so create it + self._check_qmf_return(qmf_from_broker.connect(to_broker_host, to_broker_port, link_durable_flag, auth_mechanism, user_id, password, transport)) + l = self._find_qmf_link(qmf_from_broker.getBroker(), to_broker_host, to_broker_port) + self.assertNotEqual(l, None, "Creation of link from broker %s to broker %s failed" % + (qmf_from_broker.getBroker().getUrl(), qmf_to_broker_proxy.getUrl())) + self._wait_for_link(l, pause_interval, link_ready_timeout) + return l + + def _wait_for_link(self, link, pause_interval, link_ready_timeout): + """ + Wait for link to become active (state=Operational) + """ + tot_time = 0 + link.update() + if link.state == "": + # Link mgmt updates for the c++ link object are disabled when in a cluster because of inconsistent state: + # one is "Operational", the other "Passive". In this case, wait a bit and hope for the best... + sleep(2*pause_interval) + else: + while link.state != "Operational" and tot_time < link_ready_timeout: + sleep(pause_interval) + tot_time += pause_interval + link.update() + self.assertEqual(link.state, "Operational", "Timeout: Link not operational, state=%s" % link.state) + + #--- Find/create a bridge + + def _find_qmf_bridge(self, qmf_broker_proxy, qmf_link, source, destination, key): + """ + Find a Qmf link object + """ + for b in self.qmf.getObjects(_class="bridge", _broker=qmf_broker_proxy): + if b.linkRef == qmf_link.getObjectId() and b.src == source and b.dest == destination and b.key == key: + return b + return None + + def _find_create_qmf_bridge(self, qmf_broker_proxy, qmf_link, queue_name, exch_name, topic_key, + queue_route_type_flag, bridge_durable_flag): + """ + Find a Qmf bridge object if it exists, create it and return its Qmf object if not + """ + if queue_route_type_flag: + src = queue_name + dest = exch_name + key = "" + else: + src = exch_name + dest = exch_name + if len(topic_key) > 0: + key = topic_key + else: + key = queue_name + b = self._find_qmf_bridge(qmf_broker_proxy, qmf_link, src, dest, key) + if b is not None: + return b + # Does not exist, so create it + self._check_qmf_return(qmf_link.bridge(bridge_durable_flag, src, dest, key, "", "", queue_route_type_flag, False, False, 1)) + b = self._find_qmf_bridge(qmf_broker_proxy, qmf_link, src, dest, key) + self.assertNotEqual(b, None, "Bridge creation failed: src=%s dest=%s key=%s" % (src, dest, key)) + return b + + def _wait_for_bridge(self, bridge, src_broker, dest_broker, exch_name, queue_name, topic_key, pause_interval, + bridge_ready_timeout): + """ + Wait for bridge to become active by sending messages over the bridge at 1 sec intervals until they are + observed at the destination. + """ + tot_time = 0 + active = False + send_session = src_broker.session("tx") + sender = send_session.sender(self._get_send_address(exch_name, queue_name)) + src_receive_session = src_broker.session("src_rx") + src_receiver = src_receive_session.receiver(queue_name) + dest_receive_session = dest_broker.session("dest_rx") + dest_receiver = dest_receive_session.receiver(queue_name) + while not active and tot_time < bridge_ready_timeout: + sender.send(Message("xyz123", subject = self._get_msg_subject(topic_key))) + try: + src_receiver.fetch(timeout = 0) + src_receive_session.acknowledge() + # Keep receiving msgs, as several may have accumulated + while True: + dest_receiver.fetch(timeout = 0) + dest_receive_session.acknowledge() + sleep(1) + active = True + except Empty: + sleep(pause_interval) + tot_time += pause_interval + dest_receiver.close() + dest_receive_session.close() + src_receiver.close() + src_receive_session.close() + sender.close() + send_session.close() + self.assertTrue(active, "Bridge failed to become active after %ds: %s" % (bridge_ready_timeout, bridge)) + + #--- Find/create/delete utility functions + + def _create_and_bind(self, qmf_broker, exchange_args, queue_args, binding_args): + """ + Create a binding between a named exchange and queue on a broker + """ + e = self._find_create_qmf_exchange(qmf_broker, **exchange_args) + q = self._find_create_qmf_queue(qmf_broker, **queue_args) + return self._find_create_qmf_binding(qmf_broker, e, q, **binding_args) + + def _check_alt_exchange(self, qmf_broker, alt_exch_name, alt_exch_type, alt_exch_op): + """ + Check for existence of alternate exchange. Return the Qmf exchange proxy object for the alternate exchange + """ + if len(alt_exch_name) == 0: return None + if alt_exch_op == _alt_exch_ops.create: + return self._find_create_qmf_exchange(qmf_broker=qmf_broker, name=alt_exch_name, type=alt_exch_type, + alternate="", durable=False, auto_delete=False, args={}) + if alt_exch_op == _alt_exch_ops.delete: + return self._find_delete_qmf_exchange(qmf_broker=qmf_broker, name=alt_exch_name, type=alt_exch_type, + alternate="", durable=False, auto_delete=False) + return self._find_qmf_exchange(qmf_broker=qmf_broker, name=alt_exchange_name, type=alt_exchange_type, + alternate="", durable=False, auto_delete=False) + + def _delete_queue_binding(self, qmf_broker, exchange_args, queue_args, binding_args): + """ + Delete a queue and the binding between it and the exchange + """ + e = self._find_qmf_exchange(qmf_broker, exchange_args["name"], exchange_args["type"], exchange_args["alternate"], exchange_args["durable"], exchange_args["auto_delete"]) + q = self._find_qmf_queue(qmf_broker, queue_args["name"], queue_args["alternate_exchange"], queue_args["durable"], queue_args["exclusive"], queue_args["auto_delete"]) + self._find_delete_qmf_binding(qmf_broker, e, q, **binding_args) + self._find_delete_qmf_queue(qmf_broker, **queue_args) + + def _create_route(self, queue_route_type_flag, src_broker, dest_broker, exch_name, queue_name, topic_key, + link_durable_flag, bridge_durable_flag, auth_mechanism, user_id, password, transport, + pause_interval = 1, link_ready_timeout = 20, bridge_ready_timeout = 20): + """ + Create a route from a source broker to a destination broker + """ + l = self._find_create_qmf_link(dest_broker.qmf_broker, src_broker.qmf_broker.getBroker(), link_durable_flag, + auth_mechanism, user_id, password, transport, pause_interval, link_ready_timeout) + self._links.append(l) + b = self._find_create_qmf_bridge(dest_broker.qmf_broker.getBroker(), l, queue_name, exch_name, topic_key, + queue_route_type_flag, bridge_durable_flag) + self._bridges.append(b) + self._wait_for_bridge(b, src_broker, dest_broker, exch_name, queue_name, topic_key, pause_interval, bridge_ready_timeout) + + # Parameterized test - entry point for tests + + def _do_test(self, + test_name, # Name of test + exch_name = "", # Remote exchange name + exch_type = "direct", # Remote exchange type + exch_alt_exch = "", # Remote exchange alternate exchange + exch_alt_exch_type = "direct", # Remote exchange alternate exchange type + exch_durable_flag = False, # Remote exchange durability + exch_auto_delete_flag = False, # Remote exchange auto-delete property + exch_x_args = {}, # Remote exchange args + queue_alt_exch = "", # Remote queue alternate exchange + queue_alt_exch_type = "direct", # Remote queue alternate exchange type + queue_durable_flag = False, # Remote queue durability + queue_exclusive_flag = False, # Remote queue exclusive property + queue_auto_delete_flag = False, # Remote queue auto-delete property + queue_x_args = {}, # Remote queue args + binding_durable_flag = False, # Remote binding durability + binding_x_args = {}, # Remote binding args + topic_key = "", # Binding key For remote topic exchanges only + msg_count = 10, # Number of messages to send + msg_durable_flag = False, # Message durability + link_durable_flag = False, # Route link durability + bridge_durable_flag = False, # Route bridge durability + queue_route_type_flag = False, # Route type: false = bridge route, true = queue route + enq_txn_size = 0, # Enqueue transaction size, 0 = no transactions + deq_txn_size = 0, # Dequeue transaction size, 0 = no transactions + local_cluster_flag = False, # Use a node from the local cluster, otherwise use single local broker + remote_cluster_flag = False, # Use a node from the remote cluster, otherwise use single remote broker + alt_exch_op = _alt_exch_ops.create,# Op on alt exch [create (ensure present), delete (ensure not present), none (neither create nor delete)] + auth_mechanism = "", # Authorization mechanism for linked broker + user_id = "", # User ID for authorization on linked broker + password = "", # Password for authorization on linked broker + transport = "tcp" # Transport for route to linked broker + ): + """ + Parameterized federation test. Sets up a federated link between a source broker and a destination broker and + checks that messages correctly pass over the link to the destination. Where appropriate (non-queue-routes), also + checks for the presence of messages on the source broker. + + In these tests, the concept is to create a LOCAL broker, then create a link to a REMOTE broker using federation. + In other words, the messages sent to the LOCAL broker will be replicated on the REMOTE broker, and tests are + performed on the REMOTE broker to check that the required messages are present. In the case of regular routes, + the LOCAL broker will also retain the messages, and a similar test is performed on this broker. + + TODO: There are several items to improve here: + 1. _do_test() is rather general. Rather create a version for each exchange type and test the exchange/queue + interaction in more detail based on the exchange type + 2. Add a headers and an xml exchange type + 3. Restructure the tests to start and stop brokers and clusters directly rather than relying on previously + started brokers. Then persistence can be checked by stopping and restarting the brokers/clusters. In particular, + test the persistence of links and bridges, both of which take a persistence flag. + 4. Test the behavior of the alternate exchanges when messages are sourced through a link. Also check behavior + when the alternate exchange is not present or is deleted after the reference is made. + 5. Test special queue types (eg LVQ) + """ + local_broker = self._get_broker(local_cluster_flag, "local-port", "local-cluster-ports") + remote_broker = self._get_broker(remote_cluster_flag, "remote-port", "remote-cluster-ports") + + # Check alternate exchanges exist (and create them if not) on both local and remote brokers + self._check_alt_exchange(local_broker.qmf_broker, exch_alt_exch, exch_alt_exch_type, alt_exch_op) + self._check_alt_exchange(local_broker.qmf_broker, queue_alt_exch, queue_alt_exch_type, alt_exch_op) + self._check_alt_exchange(remote_broker.qmf_broker, exch_alt_exch, exch_alt_exch_type, alt_exch_op) + self._check_alt_exchange(remote_broker.qmf_broker, queue_alt_exch, queue_alt_exch_type, alt_exch_op) + + queue_name = "queue_%s" % test_name + exchange_args = {"name": exch_name, "type": exch_type, "alternate": exch_alt_exch, + "durable": exch_durable_flag, "auto_delete": exch_auto_delete_flag, "args": exch_x_args} + queue_args = {"name": queue_name, "alternate_exchange": queue_alt_exch, "durable": queue_durable_flag, + "exclusive": queue_exclusive_flag, "auto_delete": queue_auto_delete_flag, "args": queue_x_args} + binding_args = {"binding_args": binding_x_args} + if exch_type == "topic": + self.assertTrue(len(topic_key) > 0, "Topic exchange selected, but no topic key was set.") + binding_args["binding_key"] = topic_key + elif exch_type == "direct": + binding_args["binding_key"] = queue_name + else: + binding_args["binding_key"] = "" + self._create_and_bind(qmf_broker=local_broker.qmf_broker, exchange_args=exchange_args, queue_args=queue_args, binding_args=binding_args) + self._create_and_bind(qmf_broker=remote_broker.qmf_broker, exchange_args=exchange_args, queue_args=queue_args, binding_args=binding_args) + self._create_route(queue_route_type_flag, local_broker, remote_broker, exch_name, queue_name, topic_key, + link_durable_flag, bridge_durable_flag, auth_mechanism, user_id, password, transport) + + self._send_msgs("send_session", local_broker, addr = self._get_send_address(exch_name, queue_name), + msg_count = msg_count, topic_key = topic_key, msg_durable_flag = msg_durable_flag, enq_txn_size = enq_txn_size) + if not queue_route_type_flag: + self._receive_msgs("local_receive_session", local_broker, addr = queue_name, msg_count = msg_count, deq_txn_size = deq_txn_size) + self._receive_msgs("remote_receive_session", remote_broker, addr = queue_name, msg_count = msg_count, deq_txn_size = deq_txn_size, timeout = 5) + + # Clean up + self._delete_queue_binding(qmf_broker=local_broker.qmf_broker, exchange_args=exchange_args, queue_args=queue_args, binding_args=binding_args) + self._delete_queue_binding(qmf_broker=remote_broker.qmf_broker, exchange_args=exchange_args, queue_args=queue_args, binding_args=binding_args) + +class A_ShortTests(QmfTestBase010): + + def test_route_defaultExch(self): + self._do_test(self._get_name()) + + def test_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True) + + +class A_LongTests(QmfTestBase010): + + def test_route_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct") + + def test_queueRoute_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_route_type_flag=True) + + + def test_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange") + + def test_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True) + + + def test_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout") + + def test_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True) + + + def test_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#") + + def test_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True) + + +class B_ShortTransactionTests(QmfTestBase010): + + def test_txEnq01_route_defaultExch(self): + self._do_test(self._get_name(), enq_txn_size=1) + + def test_txEnq01_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=1) + + def test_txEnq01_txDeq01_route_defaultExch(self): + self._do_test(self._get_name(), enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1) + + +class B_LongTransactionTests(QmfTestBase010): + + def test_txEnq10_route_defaultExch(self): + self._do_test(self._get_name(), enq_txn_size=10, msg_count = 103) + + def test_txEnq10_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=10, msg_count = 103) + + + def test_txEnq01_route_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", enq_txn_size=1) + + def test_txEnq01_queueRoute_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_route_type_flag=True, enq_txn_size=1) + + def test_txEnq10_route_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", enq_txn_size=10, msg_count = 103) + + def test_txEnq10_queueRoute_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq01_txDeq01_route_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_queueRoute_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1) + + + def test_txEnq01_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=1) + + def test_txEnq01_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=1) + + def test_txEnq10_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=10, msg_count = 103) + + def test_txEnq10_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq01_txDeq01_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1) + + + def test_txEnq01_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=1) + + def test_txEnq01_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=1) + + def test_txEnq10_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=10, msg_count = 103) + + def test_txEnq10_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq01_txDeq01_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1) + + + def test_txEnq01_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=1) + + def test_txEnq01_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=1) + + def test_txEnq10_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=10, msg_count = 103) + + def test_txEnq10_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq01_txDeq01_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1) + + +class C_ShortClusterTests(QmfTestBase010): + + def test_locCluster_route_defaultExch(self): + self._do_test(self._get_name(), local_cluster_flag=True) + + def test_locCluster_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, local_cluster_flag=True) + + def test_remCluster_route_defaultExch(self): + self._do_test(self._get_name(), remote_cluster_flag=True) + + def test_remCluster_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_route_defaultExch(self): + self._do_test(self._get_name(), local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + +class C_LongClusterTests(QmfTestBase010): + + def test_locCluster_route_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", local_cluster_flag=True) + + def test_locCluster_queueRoute_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_route_type_flag=True, local_cluster_flag=True) + + def test_remCluster_route_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", remote_cluster_flag=True) + + def test_remCluster_queueRoute_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_route_type_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_route_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_queueRoute_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + + def test_locCluster_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", local_cluster_flag=True) + + def test_locCluster_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, local_cluster_flag=True) + + def test_remCluster_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", remote_cluster_flag=True) + + def test_remCluster_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + + def test_locCluster_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", local_cluster_flag=True) + + def test_locCluster_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, local_cluster_flag=True) + + def test_remCluster_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", remote_cluster_flag=True) + + def test_remCluster_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + + def test_locCluster_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", local_cluster_flag=True) + + def test_locCluster_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, local_cluster_flag=True) + + def test_remCluster_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", remote_cluster_flag=True) + + def test_remCluster_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + +class D_ShortClusterTransactionTests(QmfTestBase010): + + def test_txEnq01_locCluster_route_defaultExch(self): + self._do_test(self._get_name(), enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_route_defaultExch(self): + self._do_test(self._get_name(), enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_remCluster_route_defaultExch(self): + self._do_test(self._get_name(), enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_route_defaultExch(self): + self._do_test(self._get_name(), enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_route_defaultExch(self): + self._do_test(self._get_name(), enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_route_defaultExch(self): + self._do_test(self._get_name(), enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + +class D_LongClusterTransactionTests(QmfTestBase010): + + def test_txEnq10_locCluster_route_defaultExch(self): + self._do_test(self._get_name(), enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_remCluster_route_defaultExch(self): + self._do_test(self._get_name(), enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_route_defaultExch(self): + self._do_test(self._get_name(), enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_queueRoute_defaultExch(self): + self._do_test(self._get_name(), queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + + def test_txEnq01_locCluster_route_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_queueRoute_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq10_locCluster_route_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_queueRoute_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_route_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_queueRoute_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_remCluster_route_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_queueRoute_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq10_remCluster_route_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_queueRoute_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_route_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_queueRoute_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_route_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_queueRoute_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_route_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_queueRoute_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_route_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + + def test_txEnq01_locCluster_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq10_locCluster_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_remCluster_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq10_remCluster_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_route_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + + def test_txEnq01_locCluster_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq10_locCluster_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_remCluster_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq10_remCluster_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_route_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + + def test_txEnq01_locCluster_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq10_locCluster_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_remCluster_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq10_remCluster_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_route_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + +class E_ShortPersistenceTests(QmfTestBase010): + + def test_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True) + + def test_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True) + + def test_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True) + + def test_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True) + + +class E_LongPersistenceTests(QmfTestBase010): + + def test_route_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True) + + def test_route_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True) + + def test_queueRoute_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, queue_route_type_flag=True) + + def test_queueRoute_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True) + + + def test_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True) + + def test_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True) + + def test_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True) + + def test_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True) + + + def test_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True) + + def test_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True) + + def test_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True) + + def test_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True) + + + def test_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True) + + def test_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True) + + def test_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True) + + def test_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True) + + +class F_ShortPersistenceTransactionTests(QmfTestBase010): + + def test_txEnq01_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=1) + + def test_txEnq01_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1) + + def test_txEnq01_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1) + + def test_txEnq01_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1) + + def test_txEnq01_txDeq01_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1) + + +class F_LongPersistenceTransactionTests(QmfTestBase010): + + def test_txEnq10_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq10_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq10_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq10_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103) + + + def test_txEnq01_route_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, enq_txn_size=1) + + def test_txEnq01_route_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1) + + def test_txEnq01_queueRoute_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1) + + def test_txEnq01_queueRoute_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1) + + def test_txEnq10_route_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq10_route_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq10_queueRoute_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq10_queueRoute_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq01_txDeq01_route_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_route_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_queueRoute_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_queueRoute_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1) + + + def test_txEnq01_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=1) + + def test_txEnq01_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1) + + def test_txEnq01_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1) + + def test_txEnq01_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1) + + def test_txEnq10_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq10_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq10_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq10_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq01_txDeq01_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1) + + + def test_txEnq01_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=1) + + def test_txEnq01_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1) + + def test_txEnq01_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1) + + def test_txEnq01_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1) + + def test_txEnq10_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq10_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq10_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq10_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq01_txDeq01_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1) + + + def test_txEnq01_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=1) + + def test_txEnq01_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1) + + def test_txEnq01_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1) + + def test_txEnq01_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1) + + def test_txEnq10_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq10_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq10_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq10_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103) + + def test_txEnq01_txDeq01_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1) + + def test_txEnq01_txDeq01_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1) + + +class G_ShortPersistenceClusterTests(QmfTestBase010): + + def test_locCluster_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, local_cluster_flag=True) + + def test_locCluster_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, local_cluster_flag=True) + + def test_locCluster_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True) + + def test_locCluster_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True) + + def test_remCluster_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, remote_cluster_flag=True) + + def test_remCluster_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, remote_cluster_flag=True) + + def test_remCluster_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, remote_cluster_flag=True) + + def test_remCluster_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + +class G_LongPersistenceClusterTests(QmfTestBase010): + + def test_locCluster_route_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, local_cluster_flag=True) + + def test_locCluster_route_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, local_cluster_flag=True) + + def test_locCluster_queueRoute_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True) + + def test_locCluster_queueRoute_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True) + + def test_remCluster_route_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, remote_cluster_flag=True) + + def test_remCluster_route_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, remote_cluster_flag=True) + + def test_remCluster_queueRoute_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, queue_route_type_flag=True, remote_cluster_flag=True) + + def test_remCluster_queueRoute_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_route_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_route_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_queueRoute_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_queueRoute_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + + def test_locCluster_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, local_cluster_flag=True) + + def test_locCluster_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, local_cluster_flag=True) + + def test_locCluster_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True) + + def test_locCluster_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True) + + def test_remCluster_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, remote_cluster_flag=True) + + def test_remCluster_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, remote_cluster_flag=True) + + def test_remCluster_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, remote_cluster_flag=True) + + def test_remCluster_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + + def test_locCluster_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, local_cluster_flag=True) + + def test_locCluster_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, local_cluster_flag=True) + + def test_locCluster_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True) + + def test_locCluster_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True) + + def test_remCluster_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, remote_cluster_flag=True) + + def test_remCluster_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, remote_cluster_flag=True) + + def test_remCluster_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, remote_cluster_flag=True) + + def test_remCluster_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + + def test_locCluster_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, local_cluster_flag=True) + + def test_locCluster_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, local_cluster_flag=True) + + def test_locCluster_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True) + + def test_locCluster_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True) + + def test_remCluster_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, remote_cluster_flag=True) + + def test_remCluster_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, remote_cluster_flag=True) + + def test_remCluster_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, remote_cluster_flag=True) + + def test_remCluster_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + def test_locCluster_remCluster_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, local_cluster_flag=True, remote_cluster_flag=True) + + +class H_ShortPersistenceClusterTransactionTests(QmfTestBase010): + + def test_txEnq01_locCluster_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_remCluster_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + +class H_LongPersistenceClusterTransactionTests(QmfTestBase010): + + def test_txEnq10_locCluster_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_remCluster_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_route_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_route_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_queueRoute_durQueue_defaultExch(self): + self._do_test(self._get_name(), queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_queueRoute_durMsg_durQueue_defaultExch(self): + self._do_test(self._get_name(), msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + + def test_txEnq01_locCluster_route_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_route_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_queueRoute_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_queueRoute_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq10_locCluster_route_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_route_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_queueRoute_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_queueRoute_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_route_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_route_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_queueRoute_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_queueRoute_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_remCluster_route_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_route_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_queueRoute_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_queueRoute_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq10_remCluster_route_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_route_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_queueRoute_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_queueRoute_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_route_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_route_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_queueRoute_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_queueRoute_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_route_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_route_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_queueRoute_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_queueRoute_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_route_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_route_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_queueRoute_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_queueRoute_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_route_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_route_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_durMsg_durQueue_amqDirectExch(self): + self._do_test(self._get_name(), exch_name="amq.direct", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + + def test_txEnq01_locCluster_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq10_locCluster_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_remCluster_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq10_remCluster_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_route_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_route_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_durMsg_durQueue_directExch(self): + self._do_test(self._get_name(), exch_name="testDirectExchange", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + + def test_txEnq01_locCluster_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq10_locCluster_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_remCluster_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq10_remCluster_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_route_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_route_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_durMsg_durQueue_fanoutExch(self): + self._do_test(self._get_name(), exch_name="testFanoutExchange", exch_type="fanout", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + + def test_txEnq01_locCluster_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_locCluster_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True) + + def test_txEnq10_locCluster_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq10_locCluster_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True) + + def test_txEnq01_remCluster_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_remCluster_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq10_remCluster_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq10_remCluster_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_remCluster_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_locCluster_remCluster_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq10_locCluster_remCluster_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=10, msg_count = 103, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_route_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_route_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + + def test_txEnq01_txDeq01_locCluster_remCluster_queueRoute_durMsg_durQueue_topicExch(self): + self._do_test(self._get_name(), exch_name="testTopicExchange", exch_type="topic", topic_key=self._get_name()+".#", msg_durable_flag=True, queue_durable_flag=True, queue_route_type_flag=True, enq_txn_size=1, deq_txn_size=1, local_cluster_flag=True, remote_cluster_flag=True) + diff --git a/qpid/cpp/src/tests/run_federation_sys_tests b/qpid/cpp/src/tests/run_federation_sys_tests new file mode 100755 index 0000000000..f5f772d72e --- /dev/null +++ b/qpid/cpp/src/tests/run_federation_sys_tests @@ -0,0 +1,97 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Run the federation system tests. + +source ./test_env.sh + +MODULENAME=federation_sys + +# Test for clustering +ps -u root | grep 'aisexec\|corosync' > /dev/null +if (( $? == 0 )); then + CLUSTERING_ENABLED=1 +else + echo "WARNING: No clustering detected; tests using it will be ignored." +fi + +# Test for long test +if [[ "$1" == "LONG_TEST" ]]; then + USE_LONG_TEST=1 + shift # get rid of this param so it is not treated as a test name +fi + +trap stop_brokers INT TERM QUIT + +SKIPTESTS="-i federation_sys.E_* -i federation_sys.F_* -i federation_sys.G_* -i federation_sys.H_*" +if [ -z ${USE_LONG_TEST} ]; then + SKIPTESTS="-i federation_sys.A_Long* -i federation_sys.B_Long* ${SKIPTESTS}" +fi +echo "WARNING: Tests using persistence will be ignored." +if [ -z ${CLUSTERING_ENABLED} ]; then + SKIPTESTS="${SKIPTESTS} -i federation_sys.C_* -i federation_sys.D_*" +elif [ -z ${USE_LONG_TEST} ]; then + SKIPTESTS="${SKIPTESTS} -i federation_sys.C_Long* -i federation_sys.D_Long*" +fi + +start_brokers() { + start_broker() { + ${QPIDD_EXEC} --daemon --port 0 --auth no --no-data-dir $1 > qpidd.port + PORT=`cat qpidd.port` + eval "$2=${PORT}" + } + start_broker "" LOCAL_PORT + start_broker "" REMOTE_PORT + if [ -n "${CLUSTERING_ENABLED}" ]; then + start_broker "--load-module ${CLUSTER_LIB} --cluster-name test-cluster-1" CLUSTER_C1_1 + start_broker "--load-module ${CLUSTER_LIB} --cluster-name test-cluster-1" CLUSTER_C1_2 + start_broker "--load-module ${CLUSTER_LIB} --cluster-name test-cluster-2" CLUSTER_C2_1 + start_broker "--load-module ${CLUSTER_LIB} --cluster-name test-cluster-2" CLUSTER_C2_2 + fi + rm qpidd.port +} + +stop_brokers() { + ${QPIDD_EXEC} -q --port ${LOCAL_PORT} + ${QPIDD_EXEC} -q --port ${REMOTE_PORT} + if [ -n "${CLUSTERING_ENABLED}" ]; then + ${QPID_CLUSTER_EXEC} --all-stop --force localhost:${CLUSTER_C1_1} + ${QPID_CLUSTER_EXEC} --all-stop --force localhost:${CLUSTER_C2_1} + fi +} + +if test -d ${PYTHON_DIR} ; then + start_brokers + if [ -z ${CLUSTERING_ENABLED} ]; then + echo "Running federation tests using brokers on local port ${LOCAL_PORT}, remote port ${REMOTE_PORT} (NOTE: clustering is DISABLED)" + else + echo "Running federation tests using brokers on local port ${LOCAL_PORT}, remote port ${REMOTE_PORT}, local cluster nodes ${CLUSTER_C1_1} ${CLUSTER_C1_2}, remote cluster nodes ${CLUSTER_C2_1} ${CLUSTER_C2_2}" + fi + if [ -z ${USE_LONG_TEST} ]; then + echo "NOTE: To run a full set of federation system tests, use \"make check-long\". To test with persistence, run the store version of this script." + fi + ${QPID_PYTHON_TEST} -m ${MODULENAME} ${SKIPTESTS} -b localhost:${REMOTE_PORT} -Dlocal-port=${LOCAL_PORT} -Dremote-port=${REMOTE_PORT} -Dlocal-cluster-ports="${CLUSTER_C1_1} ${CLUSTER_C1_2}" -Dremote-cluster-ports="${CLUSTER_C2_1} ${CLUSTER_C2_2}" $@ + RETCODE=$? + stop_brokers + if test x${RETCODE} != x0; then + echo "FAIL federation tests"; exit 1; + fi +fi diff --git a/qpid/cpp/src/tests/run_long_federation_sys_tests b/qpid/cpp/src/tests/run_long_federation_sys_tests new file mode 100644 index 0000000000..69dc08d11c --- /dev/null +++ b/qpid/cpp/src/tests/run_long_federation_sys_tests @@ -0,0 +1,24 @@ +#! /bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Run the federation system tests (long version). + +./run_federation_sys_tests LONG_TEST $@ |