summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.h1
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp25
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.h19
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp12
-rw-r--r--cpp/src/qpid/broker/SessionHandler.h1
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp45
-rw-r--r--cpp/src/qpid/broker/SessionState.h6
-rw-r--r--cpp/src/qpid/framing/AMQFrame.cpp5
-rwxr-xr-xcpp/src/tests/python_tests2
-rw-r--r--cpp/xml/extra.xml29
-rw-r--r--python/cpp_failing_0-10.txt95
-rw-r--r--python/cpp_failing_0-10_preview.txt6
-rw-r--r--python/qpid/codec010.py10
-rw-r--r--python/qpid/testlib.py47
-rw-r--r--python/tests_0-10/alternate_exchange.py28
-rw-r--r--python/tests_0-10/broker.py38
-rw-r--r--python/tests_0-10/query.py233
-rw-r--r--python/tests_0-10_preview/__init__.py32
-rw-r--r--python/tests_0-10_preview/alternate_exchange.py179
-rw-r--r--python/tests_0-10_preview/broker.py111
-rw-r--r--python/tests_0-10_preview/dtx.py645
-rw-r--r--python/tests_0-10_preview/example.py95
-rw-r--r--python/tests_0-10_preview/exchange.py335
-rw-r--r--python/tests_0-10_preview/execution.py29
-rw-r--r--python/tests_0-10_preview/message.py834
-rw-r--r--python/tests_0-10_preview/persistence.py67
-rw-r--r--python/tests_0-10_preview/query.py227
-rw-r--r--python/tests_0-10_preview/queue.py338
-rw-r--r--python/tests_0-10_preview/testlib.py66
-rw-r--r--python/tests_0-10_preview/tx.py231
30 files changed, 3585 insertions, 206 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h
index 6e92f89706..3314ec6be3 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/cpp/src/qpid/broker/BrokerAdapter.h
@@ -83,6 +83,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
Exchange010Handler* getExchange010Handler() { throw framing::NotImplementedException("Class not implemented"); }
Queue010Handler* getQueue010Handler() { throw framing::NotImplementedException("Class not implemented"); }
Message010Handler* getMessage010Handler() { throw framing::NotImplementedException("Class not implemented"); }
+ Execution010Handler* getExecution010Handler() { throw framing::NotImplementedException("Class not implemented"); }
// Handlers no longer implemented in BrokerAdapter:
#define BADHANDLER() assert(0); throw framing::NotImplementedException("")
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index 15c29ed482..5ab0dd84a9 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -37,7 +37,8 @@ SessionAdapter::SessionAdapter(SemanticState& s) :
HandlerImpl(s),
exchangeImpl(s),
queueImpl(s),
- messageImpl(s)
+ messageImpl(s),
+ executionImpl(s)
{}
@@ -377,6 +378,28 @@ void SessionAdapter::MessageHandlerImpl::acquire(const SequenceSet& transfers)
}
*/
+
+void SessionAdapter::ExecutionHandlerImpl::sync()
+{
+ //TODO
+}
+
+void SessionAdapter::ExecutionHandlerImpl::result(uint32_t /*commandId*/, const string& /*value*/)
+{
+ //TODO
+}
+
+void SessionAdapter::ExecutionHandlerImpl::exception(uint16_t /*errorCode*/,
+ uint32_t /*commandId*/,
+ uint8_t /*classCode*/,
+ uint8_t /*commandCode*/,
+ uint8_t /*fieldIndex*/,
+ const std::string& /*description*/,
+ const framing::FieldTable& /*errorInfo*/)
+{
+ //TODO
+}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h
index 0dd3529359..c2d61392d7 100644
--- a/cpp/src/qpid/broker/SessionAdapter.h
+++ b/cpp/src/qpid/broker/SessionAdapter.h
@@ -55,6 +55,7 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
Message010Handler* getMessage010Handler(){ return &messageImpl; }
Exchange010Handler* getExchange010Handler(){ return &exchangeImpl; }
Queue010Handler* getQueue010Handler(){ return &queueImpl; }
+ Execution010Handler* getExecution010Handler(){ return &executionImpl; }
BasicHandler* getBasicHandler() { throw framing::NotImplementedException("Class not implemented"); }
@@ -172,9 +173,27 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
};
+ class ExecutionHandlerImpl : public Execution010Handler, public HandlerImpl
+ {
+ public:
+ ExecutionHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
+
+ void sync();
+ void result(uint32_t commandId, const string& value);
+ void exception(uint16_t errorCode,
+ uint32_t commandId,
+ uint8_t classCode,
+ uint8_t commandCode,
+ uint8_t fieldIndex,
+ const std::string& description,
+ const framing::FieldTable& errorInfo);
+
+ };
+
ExchangeHandlerImpl exchangeImpl;
QueueHandlerImpl queueImpl;
MessageHandlerImpl messageImpl;
+ ExecutionHandlerImpl executionImpl;
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index 919a3e6ee8..3baa3a89a7 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -69,12 +69,6 @@ void SessionHandler::handleIn(AMQFrame& f) {
QPID_MSG("Channel " << channel.get() << " is not open"));
}
}
- } catch(const ChannelException& e) {
- ignoring=true; // Ignore trailing frames sent by client.
- session->detach();
- session.reset();
- //TODO: implement new exception handling mechanism
- //peerSession.closed(e.code, e.what());
}catch(const ConnectionException& e){
connection.close(e.code, e.what(), classId(m), methodId(m));
}catch(const std::exception& e){
@@ -83,6 +77,12 @@ void SessionHandler::handleIn(AMQFrame& f) {
}
}
+void SessionHandler::destroy() {
+ ignoring=true; // Ignore trailing frames sent by client.
+ session->detach();
+ session.reset();
+}
+
void SessionHandler::handleOut(AMQFrame& f) {
channel.handle(f); // Send it.
if (session->sent(f))
diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h
index 4b031f2951..fa013a1c15 100644
--- a/cpp/src/qpid/broker/SessionHandler.h
+++ b/cpp/src/qpid/broker/SessionHandler.h
@@ -70,6 +70,7 @@ class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler,
void localSuspend();
void detach() { localSuspend(); }
void sendCompletion();
+ void destroy();
protected:
void handleIn(framing::AMQFrame&);
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 571b8848ae..ceaa70db18 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -168,16 +168,16 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
return status;
}
-void SessionState::handleCommand(framing::AMQMethodBody* method)
+void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber& id)
{
- SequenceNumber id = nextIn++;
+ id = nextIn++;
Invoker::Result invocation = invoke(adapter, *method);
completed.add(id);
if (!invocation.wasHandled()) {
throw NotImplementedException("Not implemented");
} else if (invocation.hasResult()) {
- getProxy().getExecution().result(id.getValue(), invocation.getResult());
+ getProxy().getExecution010().result(id, invocation.getResult());
}
if (method->isSync()) {
sendCompletion();
@@ -185,16 +185,18 @@ void SessionState::handleCommand(framing::AMQMethodBody* method)
//TODO: if window gets too large send unsolicited completion
}
-void SessionState::handleContent(AMQFrame& frame)
+void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id)
{
intrusive_ptr<Message> msg(msgBuilder.getMessage());
- if (!msg) {//start of frameset will be indicated by frame flags
- SequenceNumber id = nextIn++;
+ if (frame.getBof() && frame.getBos()) {//start of frameset
+ id = nextIn++;
msgBuilder.start(id);
msg = msgBuilder.getMessage();
+ } else {
+ id = msg->getCommandId();
}
msgBuilder.handle(frame);
- if (frame.getEof() && frame.getEos()) {//end of frameset will be indicated by frame flags
+ if (frame.getEof() && frame.getEos()) {//end of frameset
msg->setPublisher(&getConnection());
semanticState.handle(msg);
msgBuilder.end();
@@ -210,16 +212,27 @@ void SessionState::handle(AMQFrame& frame)
{
received(frame);
- //TODO: make command handling more uniform, regardless of whether
- //commands carry content. (For now, assume all single frame
- //assmblies are non-content bearing and all content-bearing
- //assmeblies will have more than one frame):
- if (frame.getBof() && frame.getEof()) {
- handleCommand(frame.getMethod());
- } else {
- handleContent(frame);
+ SequenceNumber commandId;
+ try {
+ //TODO: make command handling more uniform, regardless of whether
+ //commands carry content. (For now, assume all single frame
+ //assemblies are non-content bearing and all content-bearing
+ //assemblies will have more than one frame):
+ if (frame.getBof() && frame.getEof()) {
+ handleCommand(frame.getMethod(), commandId);
+ } else {
+ handleContent(frame, commandId);
+ }
+ } catch(const ChannelException& e) {
+ //TODO: better implementation of new exception handling mechanism
+ AMQMethodBody* m = frame.getMethod();
+ if (m) {
+ getProxy().getExecution010().exception(e.code, commandId, m->amqpClassId(), m->amqpMethodId(), 0, e.what(), FieldTable());
+ } else {
+ getProxy().getExecution010().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable());
+ }
+ handler->destroy();
}
-
}
DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index c936edee21..ecf0b41a7a 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -93,8 +93,6 @@ class SessionState : public framing::SessionState,
void activateOutput();
void handle(framing::AMQFrame& frame);
- void handleCommand(framing::AMQMethodBody* method);
- void handleContent(framing::AMQFrame& frame);
void complete(const framing::SequenceSet& ranges);
void sendCompletion();
@@ -138,8 +136,10 @@ class SessionState : public framing::SessionState,
RangedOperation ackOp;
management::Session::shared_ptr mgmtObject;
+ void handleCommand(framing::AMQMethodBody* method, framing::SequenceNumber& id);
+ void handleContent(framing::AMQFrame& frame, framing::SequenceNumber& id);
- friend class SessionManager;
+ friend class SessionManager;
};
diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp
index 3750f4a9a8..eeb658600d 100644
--- a/cpp/src/qpid/framing/AMQFrame.cpp
+++ b/cpp/src/qpid/framing/AMQFrame.cpp
@@ -49,12 +49,15 @@ uint32_t AMQFrame::frameOverhead() {
void AMQFrame::encode(Buffer& buffer) const
{
+ //set track first (controls on track 0, everything else on 1):
+ uint8_t track = getBody()->type() ? 1 : 0;
+
uint8_t flags = (bof ? 0x08 : 0) | (eof ? 0x04 : 0) | (bos ? 0x02 : 0) | (eos ? 0x01 : 0);
buffer.putOctet(flags);
buffer.putOctet(getBody()->type());
buffer.putShort(size() - 1); // Don't include end marker (it's not part of the frame itself)
buffer.putOctet(0);
- buffer.putOctet(0x0f & subchannel);
+ buffer.putOctet(0x0f & track);
buffer.putShort(channel);
buffer.putLong(0);
body->encode(buffer);
diff --git a/cpp/src/tests/python_tests b/cpp/src/tests/python_tests
index d9754ed0fb..73b3b970c1 100755
--- a/cpp/src/tests/python_tests
+++ b/cpp/src/tests/python_tests
@@ -1,7 +1,7 @@
#!/bin/sh
# Run the python tests.
if test -d ../../../python ; then
- cd ../../../python && ./run-tests -v -s ../specs/amqp.0-10-preview.xml -I cpp_failing_0-10.txt -b localhost:$QPID_PORT $PYTHON_TESTS
+ cd ../../../python && ./run-tests -v -s ../specs/amqp.0-10-preview.xml -I cpp_failing_0-10_preview.txt -b localhost:$QPID_PORT $PYTHON_TESTS && ./run-tests --skip-self-test -v -s "0-10" -I cpp_failing_0-10.txt -b localhost:$QPID_PORT $PYTHON_TESTS
else
echo Warning: python tests not found.
fi
diff --git a/cpp/xml/extra.xml b/cpp/xml/extra.xml
index 789887ae81..b838a466c4 100644
--- a/cpp/xml/extra.xml
+++ b/cpp/xml/extra.xml
@@ -582,6 +582,33 @@
</class>
+<class name="execution010" index="3">
+ <method name = "sync" index="1">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <chassis name="client" implement="MUST" />
+ </method>
+ <method name = "result" index="2">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <chassis name="client" implement="MUST" />
+ <field name="command-id" domain="command-id"/>
+ <field name="value" domain="long-struct"/>
+ </method>
+ <method name = "exception" index="3">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <chassis name="client" implement="MUST" />
+ <field name="error-code" domain="short"/>
+ <field name="command-id" domain="long"/>
+ <field name="class-code" domain="octet"/>
+ <field name="command-code" domain="octet"/>
+ <field name="field-index" domain="octet"/>
+ <field name="description" domain="mediumstr"/>
+ <field name="error-info" domain="table"/>
+ </method>
+</class>
+
<class name="message010" index="4">
<doc>blah, blah</doc>
<method name = "transfer" index="1">
@@ -708,7 +735,7 @@
<field name="binding-key" domain="shortstr"/>
<field name="arguments" domain="table"/>
<result>
- <struct size="long" type="1">
+ <struct size="long" type="2">
<field name="exchange-not-found" domain="bit"/>
<field name="queue-not-found" domain="bit"/>
<field name="queue-not-matched" domain="bit"/>
diff --git a/python/cpp_failing_0-10.txt b/python/cpp_failing_0-10.txt
index 77031cad08..974c165ae9 100644
--- a/python/cpp_failing_0-10.txt
+++ b/python/cpp_failing_0-10.txt
@@ -1,6 +1,97 @@
tests.codec.FieldTableTestCase.test_field_table_decode
tests.codec.FieldTableTestCase.test_field_table_multiple_name_value_pair
tests.codec.FieldTableTestCase.test_field_table_name_value_pair
-tests_0-10.alternate_exchange.AlternateExchangeTests.test_immediate
+tests_0-10.query.QueryTests.test_exchange_bound_header
+tests_0-10.persistence.PersistenceTests.test_ack_message_from_deleted_queue
+tests_0-10.persistence.PersistenceTests.test_delete_queue_after_publish
+tests_0-10.persistence.PersistenceTests.test_queue_deletion
+tests_0-10.tx.TxTests.test_auto_rollback
+tests_0-10.tx.TxTests.test_commit
+tests_0-10.tx.TxTests.test_rollback
+tests_0-10.execution.ExecutionTests.test_flush
+tests_0-10.alternate_exchange.AlternateExchangeTests.test_delete_while_used_by_exchange
+tests_0-10.alternate_exchange.AlternateExchangeTests.test_delete_while_used_by_queue
+tests_0-10.alternate_exchange.AlternateExchangeTests.test_queue_delete
+tests_0-10.alternate_exchange.AlternateExchangeTests.test_unroutable
+tests_0-10.exchange.DeclareMethodPassiveFieldNotFoundRuleTests.test
+tests_0-10.exchange.DefaultExchangeRuleTests.testDefaultExchange
+tests_0-10.exchange.ExchangeTests.testHeadersBindNoMatchArg
+tests_0-10.exchange.HeadersExchangeTests.testMatchAll
+tests_0-10.exchange.HeadersExchangeTests.testMatchAny
+tests_0-10.exchange.MiscellaneousErrorsTests.testDifferentDeclaredType
+tests_0-10.exchange.MiscellaneousErrorsTests.testTypeNotKnown
+tests_0-10.exchange.RecommendedTypesRuleTests.testDirect
+tests_0-10.exchange.RecommendedTypesRuleTests.testFanout
+tests_0-10.exchange.RecommendedTypesRuleTests.testHeaders
+tests_0-10.exchange.RecommendedTypesRuleTests.testTopic
+tests_0-10.exchange.RequiredInstancesRuleTests.testAmqDirect
+tests_0-10.exchange.RequiredInstancesRuleTests.testAmqFanOut
+tests_0-10.exchange.RequiredInstancesRuleTests.testAmqMatch
+tests_0-10.exchange.RequiredInstancesRuleTests.testAmqTopic
+tests_0-10.dtx.DtxTests.test_bad_resume
+tests_0-10.dtx.DtxTests.test_end
+tests_0-10.dtx.DtxTests.test_end_suspend_and_fail
+tests_0-10.dtx.DtxTests.test_end_unknown_xid
+tests_0-10.dtx.DtxTests.test_forget_xid_on_completion
+tests_0-10.dtx.DtxTests.test_get_timeout
+tests_0-10.dtx.DtxTests.test_implicit_end
+tests_0-10.dtx.DtxTests.test_invalid_commit_one_phase_false
+tests_0-10.dtx.DtxTests.test_invalid_commit_one_phase_true
+tests_0-10.dtx.DtxTests.test_recover
+tests_0-10.dtx.DtxTests.test_select_required
+tests_0-10.dtx.DtxTests.test_set_timeout
+tests_0-10.dtx.DtxTests.test_simple_commit
+tests_0-10.dtx.DtxTests.test_simple_prepare_commit
+tests_0-10.dtx.DtxTests.test_simple_prepare_rollback
+tests_0-10.dtx.DtxTests.test_simple_rollback
+tests_0-10.dtx.DtxTests.test_start_already_known
+tests_0-10.dtx.DtxTests.test_start_join
+tests_0-10.dtx.DtxTests.test_start_join_and_resume
+tests_0-10.dtx.DtxTests.test_suspend_resume
+tests_0-10.dtx.DtxTests.test_suspend_start_end_resume
+tests_0-10.message.MessageTests.test_ack
+tests_0-10.message.MessageTests.test_acquire
+tests_0-10.message.MessageTests.test_cancel
+tests_0-10.message.MessageTests.test_consume_exclusive
+tests_0-10.message.MessageTests.test_consume_no_local
+tests_0-10.message.MessageTests.test_consume_no_local_awkward
+tests_0-10.message.MessageTests.test_consume_queue_errors
+tests_0-10.message.MessageTests.test_consume_unique_consumers
+tests_0-10.message.MessageTests.test_credit_flow_bytes
+tests_0-10.message.MessageTests.test_credit_flow_messages
+tests_0-10.message.MessageTests.test_no_size
+tests_0-10.message.MessageTests.test_qos_prefetch_count
+tests_0-10.message.MessageTests.test_qos_prefetch_size
+tests_0-10.message.MessageTests.test_ranged_ack
+tests_0-10.message.MessageTests.test_recover
+tests_0-10.message.MessageTests.test_recover_requeue
+tests_0-10.message.MessageTests.test_reject
+tests_0-10.message.MessageTests.test_release
+tests_0-10.message.MessageTests.test_release_ordering
+tests_0-10.message.MessageTests.test_release_unacquired
+tests_0-10.message.MessageTests.test_subscribe_not_acquired
+tests_0-10.message.MessageTests.test_subscribe_not_acquired_2
+tests_0-10.message.MessageTests.test_subscribe_not_acquired_3
+tests_0-10.message.MessageTests.test_window_flow_bytes
+tests_0-10.message.MessageTests.test_window_flow_messages
+tests_0-10.testlib.TestBaseTest.testAssertEmptyFail
+tests_0-10.testlib.TestBaseTest.testAssertEmptyPass
+tests_0-10.testlib.TestBaseTest.testMessageProperties
+tests_0-10.queue.QueueTests.test_autodelete_shared
+tests_0-10.queue.QueueTests.test_bind
+tests_0-10.queue.QueueTests.test_declare_exclusive
+tests_0-10.queue.QueueTests.test_declare_passive
+tests_0-10.queue.QueueTests.test_delete_ifempty
+tests_0-10.queue.QueueTests.test_delete_ifunused
+tests_0-10.queue.QueueTests.test_delete_simple
+tests_0-10.queue.QueueTests.test_purge
+tests_0-10.queue.QueueTests.test_unbind_direct
+tests_0-10.queue.QueueTests.test_unbind_fanout
+tests_0-10.queue.QueueTests.test_unbind_headers
+tests_0-10.queue.QueueTests.test_unbind_topic
tests_0-10.broker.BrokerTests.test_closed_channel
-
+tests_0-10.broker.BrokerTests.test_ack_and_no_ack
+tests_0-10.broker.BrokerTests.test_invalid_channel
+tests_0-10.broker.BrokerTests.test_simple_delivery_queued
+tests_0-10.broker.BrokerTests.test_simple_delivery_immediate
+tests_0-10.example.ExampleTest.test_example
diff --git a/python/cpp_failing_0-10_preview.txt b/python/cpp_failing_0-10_preview.txt
new file mode 100644
index 0000000000..91c2a7fce4
--- /dev/null
+++ b/python/cpp_failing_0-10_preview.txt
@@ -0,0 +1,6 @@
+tests.codec.FieldTableTestCase.test_field_table_decode
+tests.codec.FieldTableTestCase.test_field_table_multiple_name_value_pair
+tests.codec.FieldTableTestCase.test_field_table_name_value_pair
+tests_0-10_preview.alternate_exchange.AlternateExchangeTests.test_immediate
+tests_0-10_preview.broker.BrokerTests.test_closed_channel
+
diff --git a/python/qpid/codec010.py b/python/qpid/codec010.py
index 5894981fc6..2dcba4e917 100644
--- a/python/qpid/codec010.py
+++ b/python/qpid/codec010.py
@@ -111,6 +111,11 @@ class Codec(Packer):
def write_str8(self, s):
self.write_vbin8(s.encode("utf8"))
+ def read_str16(self):
+ return self.read_vbin16().decode("utf8")
+ def write_str16(self, s):
+ self.write_vbin16(s.encode("utf8"))
+
def read_vbin16(self):
return self.read(self.read_uint16())
@@ -125,9 +130,10 @@ class Codec(Packer):
self.write(b)
def write_map(self, m):
- pass
+ self.write_uint32(0) #hack
def read_map(self):
- pass
+ size = self.read_uint32() #hack
+ self.read(size) #hack
def write_array(self, a):
pass
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py
index 5174fe10f4..e8e54b3a56 100644
--- a/python/qpid/testlib.py
+++ b/python/qpid/testlib.py
@@ -29,6 +29,11 @@ from getopt import getopt, GetoptError
from qpid.content import Content
from qpid.message import Message
+#0-10 support
+from qpid.connection010 import Connection
+from qpid.spec010 import load
+from qpid.util import connect
+
def findmodules(root):
"""Find potential python modules under directory root"""
found = []
@@ -125,23 +130,29 @@ Options:
if opt in ("-S", "--skip-self-test"): self.skip_self_test = True
if opt in ("-F", "--spec-folder"): TestRunner.SPEC_FOLDER = value
# Abbreviations for default settings.
- if (self.specfile == "0-8"):
- self.specfile = self.get_spec_file("amqp.0-8.xml")
- elif (self.specfile == "0-9"):
- self.specfile = self.get_spec_file("amqp.0-9.xml")
- self.errata.append(self.get_spec_file("amqp-errata.0-9.xml"))
-
- if (self.specfile == None):
- self._die("No XML specification provided")
- print "Using specification from:", self.specfile
- self.spec = qpid.spec.load(self.specfile, *self.errata)
+ if (self.specfile == "0-10"):
+ self.spec = load(self.get_spec_file("amqp.0-10.xml"))
+ else:
+ if (self.specfile == "0-8"):
+ self.specfile = self.get_spec_file("amqp.0-8.xml")
+ elif (self.specfile == "0-9"):
+ self.specfile = self.get_spec_file("amqp.0-9.xml")
+ self.errata.append(self.get_spec_file("amqp-errata.0-9.xml"))
+
+ if (self.specfile == None):
+ self._die("No XML specification provided")
+ print "Using specification from:", self.specfile
+
+ self.spec = qpid.spec.load(self.specfile, *self.errata)
if len(self.tests) == 0:
if not self.skip_self_test:
self.tests=findmodules("tests")
if self.use08spec():
self.tests+=findmodules("tests_0-8")
- elif (self.spec.major == 0 and self.spec.minor == 10) or (self.spec.major == 99 and self.spec.minor == 0):
+ elif (self.spec.major == 99 and self.spec.minor == 0):
+ self.tests+=findmodules("tests_0-10_preview")
+ elif (self.spec.major == 0 and self.spec.minor == 10):
self.tests+=findmodules("tests_0-10")
else:
self.tests+=findmodules("tests_0-9")
@@ -330,3 +341,17 @@ class TestBase(unittest.TestCase):
self.assertEqual("close", message.method.name)
self.assertEqual(expectedCode, message.reply_code)
+class TestBase010(unittest.TestCase):
+ """
+ Base class for Qpid test cases. using the final 0-10 spec
+ """
+
+ def setUp(self):
+ spec = testrunner.spec
+ self.conn = Connection(connect(testrunner.host, testrunner.port), spec)
+ self.conn.start(timeout=10)
+ self.session = self.conn.session("test-session", timeout=10)
+
+ def tearDown(self):
+ self.session.close(timeout=10)
+ self.conn.close(timeout=10)
diff --git a/python/tests_0-10/alternate_exchange.py b/python/tests_0-10/alternate_exchange.py
index 83f8d85811..225b3cfb69 100644
--- a/python/tests_0-10/alternate_exchange.py
+++ b/python/tests_0-10/alternate_exchange.py
@@ -93,34 +93,6 @@ class AlternateExchangeTests(TestBase):
self.assertEqual("Three", dlq.get(timeout=1).content.body)
self.assertEmpty(dlq)
-
- def test_immediate(self):
- """
- Test that messages in a queue being deleted are delivered to the alternate-exchange if specified
- """
- channel = self.channel
- #set up a 'dead letter queue':
- channel.exchange_declare(exchange="dlq", type="fanout")
- channel.queue_declare(queue="immediate", exclusive=True, auto_delete=True)
- channel.queue_bind(exchange="dlq", queue="immediate")
- self.subscribe(destination="dlq", queue="immediate")
- dlq = self.client.queue("dlq")
-
- #create a queue using the dlq as its alternate exchange:
- channel.queue_declare(queue="no-consumers", alternate_exchange="dlq", exclusive=True, auto_delete=True)
- #send it some messages:
- #TODO: WE HAVE LOST THE IMMEDIATE FLAG; FIX THIS ONCE ITS BACK
- channel.message_transfer(content=Content("no one wants me", properties={'routing_key':"no-consumers"}))
-
- #check the messages were delivered to the dlq:
- self.assertEqual("no one wants me", dlq.get(timeout=1).content.body)
- self.assertEmpty(dlq)
-
- #cleanup:
- channel.queue_delete(queue="no-consumers")
- channel.exchange_delete(exchange="dlq")
-
-
def test_delete_while_used_by_queue(self):
"""
Ensure an exchange still in use as an alternate-exchange for a
diff --git a/python/tests_0-10/broker.py b/python/tests_0-10/broker.py
index 99936ba742..bfecb5c166 100644
--- a/python/tests_0-10/broker.py
+++ b/python/tests_0-10/broker.py
@@ -19,9 +19,10 @@
from qpid.client import Closed
from qpid.queue import Empty
from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
+from qpid.testlib import TestBase010
+from qpid.datatypes import Message
-class BrokerTests(TestBase):
+class BrokerTests(TestBase010):
"""Tests for basic Broker functionality"""
def test_ack_and_no_ack(self):
@@ -57,37 +58,36 @@ class BrokerTests(TestBase):
"""
Test simple message delivery where consume is issued before publish
"""
- channel = self.channel
- self.exchange_declare(channel, exchange="test-exchange", type="direct")
- self.queue_declare(channel, queue="test-queue")
- channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
+ session = self.session
+ session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="test-queue", exchange="amq.fanout")
consumer_tag = "tag1"
- self.subscribe(queue="test-queue", destination=consumer_tag)
- queue = self.client.queue(consumer_tag)
+ session.message_subscribe(queue="test-queue", destination=consumer_tag)
+ session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = consumer_tag)
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = consumer_tag)
+ queue = session.incoming(consumer_tag)
body = "Immediate Delivery"
- channel.message_transfer(destination="test-exchange", content = Content(body, properties = {"routing_key" : "key"}))
+ session.message_transfer("amq.fanout", None, None, Message(body))
msg = queue.get(timeout=5)
self.assert_(msg.content.body == body)
- # TODO: Ensure we fail if immediate=True and there's no consumer.
-
-
def test_simple_delivery_queued(self):
"""
Test basic message delivery where publish is issued before consume
(i.e. requires queueing of the message)
"""
- channel = self.channel
- self.exchange_declare(channel, exchange="test-exchange", type="direct")
- self.queue_declare(channel, queue="test-queue")
- channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
+ session = self.session
+ session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="test-queue", exchange="amq.fanout")
body = "Queued Delivery"
- channel.message_transfer(destination="test-exchange", content = Content(body, properties = {"routing_key" : "key"}))
+ session.message_transfer("amq.fanout", None, None, Message(body))
consumer_tag = "tag1"
- self.subscribe(queue="test-queue", destination=consumer_tag)
- queue = self.client.queue(consumer_tag)
+ session.message_subscribe(queue="test-queue", destination=consumer_tag)
+ session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = consumer_tag)
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = consumer_tag)
+ queue = session.incoming(consumer_tag)
msg = queue.get(timeout=5)
self.assert_(msg.content.body == body)
diff --git a/python/tests_0-10/query.py b/python/tests_0-10/query.py
index eba2ee6dd1..0bbfb079cc 100644
--- a/python/tests_0-10/query.py
+++ b/python/tests_0-10/query.py
@@ -19,209 +19,212 @@
from qpid.client import Client, Closed
from qpid.queue import Empty
from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
+from qpid.testlib import TestBase010
-class QueryTests(TestBase):
- """Tests for various query methods introduced in 0-10 and available in 0-9 for preview"""
+class QueryTests(TestBase010):
+ """Tests for various query methods"""
+
+ def test_queue_query(self):
+ session = self.session
+ session.queue_declare(queue="my-queue", exclusive=True)
+ result = session.queue_query(queue="my-queue")
+ self.assertEqual("my-queue", result.queue)
def test_exchange_query(self):
"""
Test that the exchange_query method works as expected
"""
- channel = self.channel
+ session = self.session
#check returned type for the standard exchanges
- self.assert_type("direct", channel.exchange_query(name="amq.direct"))
- self.assert_type("topic", channel.exchange_query(name="amq.topic"))
- self.assert_type("fanout", channel.exchange_query(name="amq.fanout"))
- self.assert_type("headers", channel.exchange_query(name="amq.match"))
- self.assert_type("direct", channel.exchange_query(name=""))
+ self.assertEqual("direct", session.exchange_query(name="amq.direct").type)
+ self.assertEqual("topic", session.exchange_query(name="amq.topic").type)
+ self.assertEqual("fanout", session.exchange_query(name="amq.fanout").type)
+ self.assertEqual("headers", session.exchange_query(name="amq.match").type)
+ self.assertEqual("direct", session.exchange_query(name="").type)
#declare an exchange
- channel.exchange_declare(exchange="my-test-exchange", type= "direct", durable=False)
+ session.exchange_declare(exchange="my-test-exchange", type= "direct", durable=False)
#check that the result of a query is as expected
- response = channel.exchange_query(name="my-test-exchange")
- self.assert_type("direct", response)
- self.assertEqual(False, response.durable)
- self.assertEqual(False, response.not_found)
+ response = session.exchange_query(name="my-test-exchange")
+ self.assertEqual("direct", response.type)
+ self.assert_(not response.durable)
+ self.assert_(not response.not_found)
#delete the exchange
- channel.exchange_delete(exchange="my-test-exchange")
+ session.exchange_delete(exchange="my-test-exchange")
#check that the query now reports not-found
- self.assertEqual(True, channel.exchange_query(name="my-test-exchange").not_found)
-
- def assert_type(self, expected_type, response):
- self.assertEqual(expected_type, response.__getattr__("type"))
+ self.assert_(session.exchange_query(name="my-test-exchange").not_found)
- def test_binding_query_direct(self):
+ def test_exchange_bound_direct(self):
"""
- Test that the binding_query method works as expected with the direct exchange
+ Test that the exchange_bound method works as expected with the direct exchange
"""
- self.binding_query_with_key("amq.direct")
+ self.exchange_bound_with_key("amq.direct")
- def test_binding_query_topic(self):
+ def test_exchange_bound_topic(self):
"""
- Test that the binding_query method works as expected with the direct exchange
+ Test that the exchange_bound method works as expected with the direct exchange
"""
- self.binding_query_with_key("amq.topic")
+ self.exchange_bound_with_key("amq.topic")
- def binding_query_with_key(self, exchange_name):
- channel = self.channel
+ def exchange_bound_with_key(self, exchange_name):
+ session = self.session
#setup: create two queues
- channel.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
- channel.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
- channel.queue_bind(exchange=exchange_name, queue="used-queue", routing_key="used-key")
+ session.exchange_bind(exchange=exchange_name, queue="used-queue", binding_key="used-key")
# test detection of any binding to specific queue
- response = channel.binding_query(exchange=exchange_name, queue="used-queue")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(False, response.queue_not_matched)
+ response = session.exchange_bound(exchange=exchange_name, queue="used-queue")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assert_(not response.queue_not_matched)
# test detection of specific binding to any queue
- response = channel.binding_query(exchange=exchange_name, routing_key="used-key")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(False, response.key_not_matched)
+ response = session.exchange_bound(exchange=exchange_name, binding_key="used-key")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assert_(not response.key_not_matched)
# test detection of specific binding to specific queue
- response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="used-key")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(False, response.queue_not_matched)
- self.assertEqual(False, response.key_not_matched)
+ response = session.exchange_bound(exchange=exchange_name, queue="used-queue", binding_key="used-key")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assert_(not response.queue_not_matched)
+ self.assert_(not response.key_not_matched)
# test unmatched queue, unspecified binding
- response = channel.binding_query(exchange=exchange_name, queue="unused-queue")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
+ response = session.exchange_bound(exchange=exchange_name, queue="unused-queue")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
self.assertEqual(True, response.queue_not_matched)
# test unspecified queue, unmatched binding
- response = channel.binding_query(exchange=exchange_name, routing_key="unused-key")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
+ response = session.exchange_bound(exchange=exchange_name, binding_key="unused-key")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
self.assertEqual(True, response.key_not_matched)
# test matched queue, unmatched binding
- response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="unused-key")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(False, response.queue_not_matched)
+ response = session.exchange_bound(exchange=exchange_name, queue="used-queue", binding_key="unused-key")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assert_(not response.queue_not_matched)
self.assertEqual(True, response.key_not_matched)
# test unmatched queue, matched binding
- response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="used-key")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
+ response = session.exchange_bound(exchange=exchange_name, queue="unused-queue", binding_key="used-key")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
self.assertEqual(True, response.queue_not_matched)
- self.assertEqual(False, response.key_not_matched)
+ self.assert_(not response.key_not_matched)
# test unmatched queue, unmatched binding
- response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="unused-key")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
+ response = session.exchange_bound(exchange=exchange_name, queue="unused-queue", binding_key="unused-key")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
self.assertEqual(True, response.queue_not_matched)
self.assertEqual(True, response.key_not_matched)
#test exchange not found
- self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found)
+ self.assertEqual(True, session.exchange_bound(exchange="unknown-exchange").exchange_not_found)
#test queue not found
- self.assertEqual(True, channel.binding_query(exchange=exchange_name, queue="unknown-queue").queue_not_found)
+ self.assertEqual(True, session.exchange_bound(exchange=exchange_name, queue="unknown-queue").queue_not_found)
- def test_binding_query_fanout(self):
+ def test_exchange_bound_fanout(self):
"""
- Test that the binding_query method works as expected with fanout exchange
+ Test that the exchange_bound method works as expected with fanout exchange
"""
- channel = self.channel
+ session = self.session
#setup
- channel.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
- channel.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
- channel.queue_bind(exchange="amq.fanout", queue="used-queue")
+ session.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
+ session.exchange_bind(exchange="amq.fanout", queue="used-queue")
# test detection of any binding to specific queue
- response = channel.binding_query(exchange="amq.fanout", queue="used-queue")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(False, response.queue_not_matched)
+ response = session.exchange_bound(exchange="amq.fanout", queue="used-queue")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assert_(not response.queue_not_matched)
# test unmatched queue, unspecified binding
- response = channel.binding_query(exchange="amq.fanout", queue="unused-queue")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
+ response = session.exchange_bound(exchange="amq.fanout", queue="unused-queue")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
self.assertEqual(True, response.queue_not_matched)
#test exchange not found
- self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found)
+ self.assertEqual(True, session.exchange_bound(exchange="unknown-exchange").exchange_not_found)
#test queue not found
- self.assertEqual(True, channel.binding_query(exchange="amq.fanout", queue="unknown-queue").queue_not_found)
+ self.assertEqual(True, session.exchange_bound(exchange="amq.fanout", queue="unknown-queue").queue_not_found)
- def test_binding_query_header(self):
+ def test_exchange_bound_header(self):
"""
- Test that the binding_query method works as expected with headers exchanges
+ Test that the exchange_bound method works as expected with headers exchanges
"""
- channel = self.channel
+ session = self.session
#setup
- channel.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
- channel.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
- channel.queue_bind(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"} )
+ session.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
+ session.exchange_bind(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"} )
# test detection of any binding to specific queue
- response = channel.binding_query(exchange="amq.match", queue="used-queue")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(False, response.queue_not_matched)
+ response = session.exchange_bound(exchange="amq.match", queue="used-queue")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assert_(not response.queue_not_matched)
# test detection of specific binding to any queue
- response = channel.binding_query(exchange="amq.match", arguments={"x-match":"all", "a":"A"})
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(False, response.args_not_matched)
+ response = session.exchange_bound(exchange="amq.match", arguments={"x-match":"all", "a":"A"})
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assert_(not response.args_not_matched)
# test detection of specific binding to specific queue
- response = channel.binding_query(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"})
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(False, response.queue_not_matched)
- self.assertEqual(False, response.args_not_matched)
+ response = session.exchange_bound(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"})
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assert_(not response.queue_not_matched)
+ self.assert_(not response.args_not_matched)
# test unmatched queue, unspecified binding
- response = channel.binding_query(exchange="amq.match", queue="unused-queue")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
+ response = session.exchange_bound(exchange="amq.match", queue="unused-queue")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
self.assertEqual(True, response.queue_not_matched)
# test unspecified queue, unmatched binding
- response = channel.binding_query(exchange="amq.match", arguments={"x-match":"all", "b":"B"})
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
+ response = session.exchange_bound(exchange="amq.match", arguments={"x-match":"all", "b":"B"})
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
self.assertEqual(True, response.args_not_matched)
# test matched queue, unmatched binding
- response = channel.binding_query(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "b":"B"})
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(False, response.queue_not_matched)
+ response = session.exchange_bound(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "b":"B"})
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assert_(not response.queue_not_matched)
self.assertEqual(True, response.args_not_matched)
# test unmatched queue, matched binding
- response = channel.binding_query(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "a":"A"})
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
+ response = session.exchange_bound(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "a":"A"})
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
self.assertEqual(True, response.queue_not_matched)
- self.assertEqual(False, response.args_not_matched)
+ self.assert_(not response.args_not_matched)
# test unmatched queue, unmatched binding
- response = channel.binding_query(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "b":"B"})
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
+ response = session.exchange_bound(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "b":"B"})
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
self.assertEqual(True, response.queue_not_matched)
self.assertEqual(True, response.args_not_matched)
#test exchange not found
- self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found)
+ self.assertEqual(True, session.exchange_bound(exchange="unknown-exchange").exchange_not_found)
#test queue not found
- self.assertEqual(True, channel.binding_query(exchange="amq.match", queue="unknown-queue").queue_not_found)
+ self.assertEqual(True, session.exchange_bound(exchange="amq.match", queue="unknown-queue").queue_not_found)
diff --git a/python/tests_0-10_preview/__init__.py b/python/tests_0-10_preview/__init__.py
new file mode 100644
index 0000000000..fe96d9e122
--- /dev/null
+++ b/python/tests_0-10_preview/__init__.py
@@ -0,0 +1,32 @@
+# Do not delete - marks this directory as a python package.
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from alternate_exchange import *
+from broker import *
+from dtx import *
+from example import *
+from exchange import *
+from execution import *
+from message import *
+from query import *
+from queue import *
+from testlib import *
+from tx import *
diff --git a/python/tests_0-10_preview/alternate_exchange.py b/python/tests_0-10_preview/alternate_exchange.py
new file mode 100644
index 0000000000..83f8d85811
--- /dev/null
+++ b/python/tests_0-10_preview/alternate_exchange.py
@@ -0,0 +1,179 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+
+class AlternateExchangeTests(TestBase):
+ """
+ Tests for the new mechanism for message returns introduced in 0-10
+ and available in 0-9 for preview
+ """
+
+ def test_unroutable(self):
+ """
+ Test that unroutable messages are delivered to the alternate-exchange if specified
+ """
+ channel = self.channel
+ #create an exchange with an alternate defined
+ channel.exchange_declare(exchange="secondary", type="fanout")
+ channel.exchange_declare(exchange="primary", type="direct", alternate_exchange="secondary")
+
+ #declare, bind (to the alternate exchange) and consume from a queue for 'returned' messages
+ channel.queue_declare(queue="returns", exclusive=True, auto_delete=True)
+ channel.queue_bind(queue="returns", exchange="secondary")
+ self.subscribe(destination="a", queue="returns")
+ returned = self.client.queue("a")
+
+ #declare, bind (to the primary exchange) and consume from a queue for 'processed' messages
+ channel.queue_declare(queue="processed", exclusive=True, auto_delete=True)
+ channel.queue_bind(queue="processed", exchange="primary", routing_key="my-key")
+ self.subscribe(destination="b", queue="processed")
+ processed = self.client.queue("b")
+
+ #publish to the primary exchange
+ #...one message that makes it to the 'processed' queue:
+ channel.message_transfer(destination="primary", content=Content("Good", properties={'routing_key':"my-key"}))
+ #...and one that does not:
+ channel.message_transfer(destination="primary", content=Content("Bad", properties={'routing_key':"unused-key"}))
+
+ #delete the exchanges
+ channel.exchange_delete(exchange="primary")
+ channel.exchange_delete(exchange="secondary")
+
+ #verify behaviour
+ self.assertEqual("Good", processed.get(timeout=1).content.body)
+ self.assertEqual("Bad", returned.get(timeout=1).content.body)
+ self.assertEmpty(processed)
+ self.assertEmpty(returned)
+
+ def test_queue_delete(self):
+ """
+ Test that messages in a queue being deleted are delivered to the alternate-exchange if specified
+ """
+ channel = self.channel
+ #set up a 'dead letter queue':
+ channel.exchange_declare(exchange="dlq", type="fanout")
+ channel.queue_declare(queue="deleted", exclusive=True, auto_delete=True)
+ channel.queue_bind(exchange="dlq", queue="deleted")
+ self.subscribe(destination="dlq", queue="deleted")
+ dlq = self.client.queue("dlq")
+
+ #create a queue using the dlq as its alternate exchange:
+ channel.queue_declare(queue="delete-me", alternate_exchange="dlq")
+ #send it some messages:
+ channel.message_transfer(content=Content("One", properties={'routing_key':"delete-me"}))
+ channel.message_transfer(content=Content("Two", properties={'routing_key':"delete-me"}))
+ channel.message_transfer(content=Content("Three", properties={'routing_key':"delete-me"}))
+ #delete it:
+ channel.queue_delete(queue="delete-me")
+ #delete the dlq exchange:
+ channel.exchange_delete(exchange="dlq")
+
+ #check the messages were delivered to the dlq:
+ self.assertEqual("One", dlq.get(timeout=1).content.body)
+ self.assertEqual("Two", dlq.get(timeout=1).content.body)
+ self.assertEqual("Three", dlq.get(timeout=1).content.body)
+ self.assertEmpty(dlq)
+
+
+ def test_immediate(self):
+ """
+ Test that messages in a queue being deleted are delivered to the alternate-exchange if specified
+ """
+ channel = self.channel
+ #set up a 'dead letter queue':
+ channel.exchange_declare(exchange="dlq", type="fanout")
+ channel.queue_declare(queue="immediate", exclusive=True, auto_delete=True)
+ channel.queue_bind(exchange="dlq", queue="immediate")
+ self.subscribe(destination="dlq", queue="immediate")
+ dlq = self.client.queue("dlq")
+
+ #create a queue using the dlq as its alternate exchange:
+ channel.queue_declare(queue="no-consumers", alternate_exchange="dlq", exclusive=True, auto_delete=True)
+ #send it some messages:
+ #TODO: WE HAVE LOST THE IMMEDIATE FLAG; FIX THIS ONCE ITS BACK
+ channel.message_transfer(content=Content("no one wants me", properties={'routing_key':"no-consumers"}))
+
+ #check the messages were delivered to the dlq:
+ self.assertEqual("no one wants me", dlq.get(timeout=1).content.body)
+ self.assertEmpty(dlq)
+
+ #cleanup:
+ channel.queue_delete(queue="no-consumers")
+ channel.exchange_delete(exchange="dlq")
+
+
+ def test_delete_while_used_by_queue(self):
+ """
+ Ensure an exchange still in use as an alternate-exchange for a
+ queue can't be deleted
+ """
+ channel = self.channel
+ channel.exchange_declare(exchange="alternate", type="fanout")
+ channel.queue_declare(queue="q", exclusive=True, auto_delete=True, alternate_exchange="alternate")
+ try:
+ channel.exchange_delete(exchange="alternate")
+ self.fail("Expected deletion of in-use alternate-exchange to fail")
+ except Closed, e:
+ #cleanup:
+ other = self.connect()
+ channel = other.channel(1)
+ channel.session_open()
+ channel.exchange_delete(exchange="alternate")
+ channel.session_close()
+ other.close()
+
+ self.assertConnectionException(530, e.args[0])
+
+
+
+ def test_delete_while_used_by_exchange(self):
+ """
+ Ensure an exchange still in use as an alternate-exchange for
+ another exchange can't be deleted
+ """
+ channel = self.channel
+ channel.exchange_declare(exchange="alternate", type="fanout")
+ channel.exchange_declare(exchange="e", type="fanout", alternate_exchange="alternate")
+ try:
+ channel.exchange_delete(exchange="alternate")
+ #cleanup:
+ channel.exchange_delete(exchange="e")
+ self.fail("Expected deletion of in-use alternate-exchange to fail")
+ except Closed, e:
+ #cleanup:
+ other = self.connect()
+ channel = other.channel(1)
+ channel.session_open()
+ channel.exchange_delete(exchange="e")
+ channel.exchange_delete(exchange="alternate")
+ channel.session_close()
+ other.close()
+
+ self.assertConnectionException(530, e.args[0])
+
+
+ def assertEmpty(self, queue):
+ try:
+ msg = queue.get(timeout=1)
+ self.fail("Queue not empty: " + msg)
+ except Empty: None
+
diff --git a/python/tests_0-10_preview/broker.py b/python/tests_0-10_preview/broker.py
new file mode 100644
index 0000000000..99936ba742
--- /dev/null
+++ b/python/tests_0-10_preview/broker.py
@@ -0,0 +1,111 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+
+class BrokerTests(TestBase):
+ """Tests for basic Broker functionality"""
+
+ def test_ack_and_no_ack(self):
+ """
+ First, this test tries to receive a message with a no-ack
+ consumer. Second, this test tries to explicitly receive and
+ acknowledge a message with an acknowledging consumer.
+ """
+ ch = self.channel
+ self.queue_declare(ch, queue = "myqueue")
+
+ # No ack consumer
+ ctag = "tag1"
+ self.subscribe(ch, queue = "myqueue", destination = ctag)
+ body = "test no-ack"
+ ch.message_transfer(content = Content(body, properties = {"routing_key" : "myqueue"}))
+ msg = self.client.queue(ctag).get(timeout = 5)
+ self.assert_(msg.content.body == body)
+
+ # Acknowledging consumer
+ self.queue_declare(ch, queue = "otherqueue")
+ ctag = "tag2"
+ self.subscribe(ch, queue = "otherqueue", destination = ctag, confirm_mode = 1)
+ ch.message_flow(destination=ctag, unit=0, value=0xFFFFFFFF)
+ ch.message_flow(destination=ctag, unit=1, value=0xFFFFFFFF)
+ body = "test ack"
+ ch.message_transfer(content = Content(body, properties = {"routing_key" : "otherqueue"}))
+ msg = self.client.queue(ctag).get(timeout = 5)
+ msg.complete()
+ self.assert_(msg.content.body == body)
+
+ def test_simple_delivery_immediate(self):
+ """
+ Test simple message delivery where consume is issued before publish
+ """
+ channel = self.channel
+ self.exchange_declare(channel, exchange="test-exchange", type="direct")
+ self.queue_declare(channel, queue="test-queue")
+ channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
+ consumer_tag = "tag1"
+ self.subscribe(queue="test-queue", destination=consumer_tag)
+ queue = self.client.queue(consumer_tag)
+
+ body = "Immediate Delivery"
+ channel.message_transfer(destination="test-exchange", content = Content(body, properties = {"routing_key" : "key"}))
+ msg = queue.get(timeout=5)
+ self.assert_(msg.content.body == body)
+
+ # TODO: Ensure we fail if immediate=True and there's no consumer.
+
+
+ def test_simple_delivery_queued(self):
+ """
+ Test basic message delivery where publish is issued before consume
+ (i.e. requires queueing of the message)
+ """
+ channel = self.channel
+ self.exchange_declare(channel, exchange="test-exchange", type="direct")
+ self.queue_declare(channel, queue="test-queue")
+ channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
+ body = "Queued Delivery"
+ channel.message_transfer(destination="test-exchange", content = Content(body, properties = {"routing_key" : "key"}))
+
+ consumer_tag = "tag1"
+ self.subscribe(queue="test-queue", destination=consumer_tag)
+ queue = self.client.queue(consumer_tag)
+ msg = queue.get(timeout=5)
+ self.assert_(msg.content.body == body)
+
+ def test_invalid_channel(self):
+ channel = self.client.channel(200)
+ try:
+ channel.queue_declare(exclusive=True)
+ self.fail("Expected error on queue_declare for invalid channel")
+ except Closed, e:
+ self.assertConnectionException(504, e.args[0])
+
+ def test_closed_channel(self):
+ channel = self.client.channel(200)
+ channel.session_open()
+ channel.session_close()
+ try:
+ channel.queue_declare(exclusive=True)
+ self.fail("Expected error on queue_declare for closed channel")
+ except Closed, e:
+ if isinstance(e.args[0], str): self.fail(e)
+ self.assertConnectionException(504, e.args[0])
diff --git a/python/tests_0-10_preview/dtx.py b/python/tests_0-10_preview/dtx.py
new file mode 100644
index 0000000000..f84f91c75a
--- /dev/null
+++ b/python/tests_0-10_preview/dtx.py
@@ -0,0 +1,645 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+from struct import pack, unpack
+from time import sleep
+
+class DtxTests(TestBase):
+ """
+ Tests for the amqp dtx related classes.
+
+ Tests of the form test_simple_xxx test the basic transactional
+ behaviour. The approach here is to 'swap' a message from one queue
+ to another by consuming and re-publishing in the same
+ transaction. That transaction is then completed in different ways
+ and the appropriate result verified.
+
+ The other tests enforce more specific rules and behaviour on a
+ per-method or per-field basis.
+ """
+
+ XA_RBROLLBACK = 1
+ XA_RBTIMEOUT = 2
+ XA_OK = 0
+ tx_counter = 0
+
+ def reset_channel(self):
+ self.channel.session_close()
+ self.channel = self.client.channel(self.channel.id + 1)
+ self.channel.session_open()
+
+ def test_simple_commit(self):
+ """
+ Test basic one-phase commit behaviour.
+ """
+ channel = self.channel
+ tx = self.xid("my-xid")
+ self.txswap(tx, "commit")
+
+ #neither queue should have any messages accessible
+ self.assertMessageCount(0, "queue-a")
+ self.assertMessageCount(0, "queue-b")
+
+ #commit
+ self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=True).status)
+
+ #should close and reopen channel to ensure no unacked messages are held
+ self.reset_channel()
+
+ #check result
+ self.assertMessageCount(0, "queue-a")
+ self.assertMessageCount(1, "queue-b")
+ self.assertMessageId("commit", "queue-b")
+
+ def test_simple_prepare_commit(self):
+ """
+ Test basic two-phase commit behaviour.
+ """
+ channel = self.channel
+ tx = self.xid("my-xid")
+ self.txswap(tx, "prepare-commit")
+
+ #prepare
+ self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).status)
+
+ #neither queue should have any messages accessible
+ self.assertMessageCount(0, "queue-a")
+ self.assertMessageCount(0, "queue-b")
+
+ #commit
+ self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=False).status)
+
+ self.reset_channel()
+
+ #check result
+ self.assertMessageCount(0, "queue-a")
+ self.assertMessageCount(1, "queue-b")
+ self.assertMessageId("prepare-commit", "queue-b")
+
+
+ def test_simple_rollback(self):
+ """
+ Test basic rollback behaviour.
+ """
+ channel = self.channel
+ tx = self.xid("my-xid")
+ self.txswap(tx, "rollback")
+
+ #neither queue should have any messages accessible
+ self.assertMessageCount(0, "queue-a")
+ self.assertMessageCount(0, "queue-b")
+
+ #rollback
+ self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status)
+
+ self.reset_channel()
+
+ #check result
+ self.assertMessageCount(1, "queue-a")
+ self.assertMessageCount(0, "queue-b")
+ self.assertMessageId("rollback", "queue-a")
+
+ def test_simple_prepare_rollback(self):
+ """
+ Test basic rollback behaviour after the transaction has been prepared.
+ """
+ channel = self.channel
+ tx = self.xid("my-xid")
+ self.txswap(tx, "prepare-rollback")
+
+ #prepare
+ self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).status)
+
+ #neither queue should have any messages accessible
+ self.assertMessageCount(0, "queue-a")
+ self.assertMessageCount(0, "queue-b")
+
+ #rollback
+ self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status)
+
+ self.reset_channel()
+
+ #check result
+ self.assertMessageCount(1, "queue-a")
+ self.assertMessageCount(0, "queue-b")
+ self.assertMessageId("prepare-rollback", "queue-a")
+
+ def test_select_required(self):
+ """
+ check that an error is flagged if select is not issued before
+ start or end
+ """
+ channel = self.channel
+ tx = self.xid("dummy")
+ try:
+ channel.dtx_demarcation_start(xid=tx)
+
+ #if we get here we have failed, but need to do some cleanup:
+ channel.dtx_demarcation_end(xid=tx)
+ channel.dtx_coordination_rollback(xid=tx)
+ self.fail("Channel not selected for use with dtx, expected exception!")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_start_already_known(self):
+ """
+ Verify that an attempt to start an association with a
+ transaction that is already known is not allowed (unless the
+ join flag is set).
+ """
+ #create two channels on different connection & select them for use with dtx:
+ channel1 = self.channel
+ channel1.dtx_demarcation_select()
+
+ other = self.connect()
+ channel2 = other.channel(1)
+ channel2.session_open()
+ channel2.dtx_demarcation_select()
+
+ #create a xid
+ tx = self.xid("dummy")
+ #start work on one channel under that xid:
+ channel1.dtx_demarcation_start(xid=tx)
+ #then start on the other without the join set
+ failed = False
+ try:
+ channel2.dtx_demarcation_start(xid=tx)
+ except Closed, e:
+ failed = True
+ error = e
+
+ #cleanup:
+ if not failed:
+ channel2.dtx_demarcation_end(xid=tx)
+ other.close()
+ channel1.dtx_demarcation_end(xid=tx)
+ channel1.dtx_coordination_rollback(xid=tx)
+
+ #verification:
+ if failed: self.assertConnectionException(503, e.args[0])
+ else: self.fail("Xid already known, expected exception!")
+
+ def test_forget_xid_on_completion(self):
+ """
+ Verify that a xid is 'forgotten' - and can therefore be used
+ again - once it is completed.
+ """
+ #do some transactional work & complete the transaction
+ self.test_simple_commit()
+ # channel has been reset, so reselect for use with dtx
+ self.channel.dtx_demarcation_select()
+
+ #start association for the same xid as the previously completed txn
+ tx = self.xid("my-xid")
+ self.channel.dtx_demarcation_start(xid=tx)
+ self.channel.dtx_demarcation_end(xid=tx)
+ self.channel.dtx_coordination_rollback(xid=tx)
+
+ def test_start_join_and_resume(self):
+ """
+ Ensure the correct error is signalled when both the join and
+ resume flags are set on starting an association between a
+ channel and a transcation.
+ """
+ channel = self.channel
+ channel.dtx_demarcation_select()
+ tx = self.xid("dummy")
+ try:
+ channel.dtx_demarcation_start(xid=tx, join=True, resume=True)
+ #failed, but need some cleanup:
+ channel.dtx_demarcation_end(xid=tx)
+ channel.dtx_coordination_rollback(xid=tx)
+ self.fail("Join and resume both set, expected exception!")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_start_join(self):
+ """
+ Verify 'join' behaviour, where a channel is associated with a
+ transaction that is already associated with another channel.
+ """
+ #create two channels & select them for use with dtx:
+ channel1 = self.channel
+ channel1.dtx_demarcation_select()
+
+ channel2 = self.client.channel(2)
+ channel2.session_open()
+ channel2.dtx_demarcation_select()
+
+ #setup
+ channel1.queue_declare(queue="one", exclusive=True, auto_delete=True)
+ channel1.queue_declare(queue="two", exclusive=True, auto_delete=True)
+ channel1.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage"))
+ channel1.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage"))
+
+ #create a xid
+ tx = self.xid("dummy")
+ #start work on one channel under that xid:
+ channel1.dtx_demarcation_start(xid=tx)
+ #then start on the other with the join flag set
+ channel2.dtx_demarcation_start(xid=tx, join=True)
+
+ #do work through each channel
+ self.swap(channel1, "one", "two")#swap 'a' from 'one' to 'two'
+ self.swap(channel2, "two", "one")#swap 'b' from 'two' to 'one'
+
+ #mark end on both channels
+ channel1.dtx_demarcation_end(xid=tx)
+ channel2.dtx_demarcation_end(xid=tx)
+
+ #commit and check
+ channel1.dtx_coordination_commit(xid=tx, one_phase=True)
+ self.assertMessageCount(1, "one")
+ self.assertMessageCount(1, "two")
+ self.assertMessageId("a", "two")
+ self.assertMessageId("b", "one")
+
+
+ def test_suspend_resume(self):
+ """
+ Test suspension and resumption of an association
+ """
+ channel = self.channel
+ channel.dtx_demarcation_select()
+
+ #setup
+ channel.queue_declare(queue="one", exclusive=True, auto_delete=True)
+ channel.queue_declare(queue="two", exclusive=True, auto_delete=True)
+ channel.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage"))
+ channel.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage"))
+
+ tx = self.xid("dummy")
+
+ channel.dtx_demarcation_start(xid=tx)
+ self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two'
+ channel.dtx_demarcation_end(xid=tx, suspend=True)
+
+ channel.dtx_demarcation_start(xid=tx, resume=True)
+ self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one'
+ channel.dtx_demarcation_end(xid=tx)
+
+ #commit and check
+ channel.dtx_coordination_commit(xid=tx, one_phase=True)
+ self.assertMessageCount(1, "one")
+ self.assertMessageCount(1, "two")
+ self.assertMessageId("a", "two")
+ self.assertMessageId("b", "one")
+
+ def test_suspend_start_end_resume(self):
+ """
+ Test suspension and resumption of an association with work
+ done on another transaction when the first transaction is
+ suspended
+ """
+ channel = self.channel
+ channel.dtx_demarcation_select()
+
+ #setup
+ channel.queue_declare(queue="one", exclusive=True, auto_delete=True)
+ channel.queue_declare(queue="two", exclusive=True, auto_delete=True)
+ channel.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage"))
+ channel.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage"))
+
+ tx = self.xid("dummy")
+
+ channel.dtx_demarcation_start(xid=tx)
+ self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two'
+ channel.dtx_demarcation_end(xid=tx, suspend=True)
+
+ channel.dtx_demarcation_start(xid=tx, resume=True)
+ self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one'
+ channel.dtx_demarcation_end(xid=tx)
+
+ #commit and check
+ channel.dtx_coordination_commit(xid=tx, one_phase=True)
+ self.assertMessageCount(1, "one")
+ self.assertMessageCount(1, "two")
+ self.assertMessageId("a", "two")
+ self.assertMessageId("b", "one")
+
+ def test_end_suspend_and_fail(self):
+ """
+ Verify that the correct error is signalled if the suspend and
+ fail flag are both set when disassociating a transaction from
+ the channel
+ """
+ channel = self.channel
+ channel.dtx_demarcation_select()
+ tx = self.xid("suspend_and_fail")
+ channel.dtx_demarcation_start(xid=tx)
+ try:
+ channel.dtx_demarcation_end(xid=tx, suspend=True, fail=True)
+ self.fail("Suspend and fail both set, expected exception!")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ #cleanup
+ other = self.connect()
+ channel = other.channel(1)
+ channel.session_open()
+ channel.dtx_coordination_rollback(xid=tx)
+ channel.session_close()
+ other.close()
+
+
+ def test_end_unknown_xid(self):
+ """
+ Verifies that the correct exception is thrown when an attempt
+ is made to end the association for a xid not previously
+ associated with the channel
+ """
+ channel = self.channel
+ channel.dtx_demarcation_select()
+ tx = self.xid("unknown-xid")
+ try:
+ channel.dtx_demarcation_end(xid=tx)
+ self.fail("Attempted to end association with unknown xid, expected exception!")
+ except Closed, e:
+ #FYI: this is currently *not* the exception specified, but I think the spec is wrong! Confirming...
+ self.assertConnectionException(503, e.args[0])
+
+ def test_end(self):
+ """
+ Verify that the association is terminated by end and subsequent
+ operations are non-transactional
+ """
+ channel = self.client.channel(2)
+ channel.session_open()
+ channel.queue_declare(queue="tx-queue", exclusive=True, auto_delete=True)
+
+ #publish a message under a transaction
+ channel.dtx_demarcation_select()
+ tx = self.xid("dummy")
+ channel.dtx_demarcation_start(xid=tx)
+ channel.message_transfer(content=Content(properties={'routing_key':"tx-queue", 'message_id':"one"}, body="DtxMessage"))
+ channel.dtx_demarcation_end(xid=tx)
+
+ #now that association with txn is ended, publish another message
+ channel.message_transfer(content=Content(properties={'routing_key':"tx-queue", 'message_id':"two"}, body="DtxMessage"))
+
+ #check the second message is available, but not the first
+ self.assertMessageCount(1, "tx-queue")
+ self.subscribe(channel, queue="tx-queue", destination="results", confirm_mode=1)
+ msg = self.client.queue("results").get(timeout=1)
+ self.assertEqual("two", msg.content['message_id'])
+ channel.message_cancel(destination="results")
+ #ack the message then close the channel
+ msg.complete()
+ channel.session_close()
+
+ channel = self.channel
+ #commit the transaction and check that the first message (and
+ #only the first message) is then delivered
+ channel.dtx_coordination_commit(xid=tx, one_phase=True)
+ self.assertMessageCount(1, "tx-queue")
+ self.assertMessageId("one", "tx-queue")
+
+ def test_invalid_commit_one_phase_true(self):
+ """
+ Test that a commit with one_phase = True is rejected if the
+ transaction in question has already been prepared.
+ """
+ other = self.connect()
+ tester = other.channel(1)
+ tester.session_open()
+ tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+ tester.dtx_demarcation_select()
+ tx = self.xid("dummy")
+ tester.dtx_demarcation_start(xid=tx)
+ tester.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever"))
+ tester.dtx_demarcation_end(xid=tx)
+ tester.dtx_coordination_prepare(xid=tx)
+ failed = False
+ try:
+ tester.dtx_coordination_commit(xid=tx, one_phase=True)
+ except Closed, e:
+ failed = True
+ error = e
+
+ if failed:
+ self.channel.dtx_coordination_rollback(xid=tx)
+ self.assertConnectionException(503, e.args[0])
+ else:
+ tester.session_close()
+ other.close()
+ self.fail("Invalid use of one_phase=True, expected exception!")
+
+ def test_invalid_commit_one_phase_false(self):
+ """
+ Test that a commit with one_phase = False is rejected if the
+ transaction in question has not yet been prepared.
+ """
+ """
+ Test that a commit with one_phase = True is rejected if the
+ transaction in question has already been prepared.
+ """
+ other = self.connect()
+ tester = other.channel(1)
+ tester.session_open()
+ tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+ tester.dtx_demarcation_select()
+ tx = self.xid("dummy")
+ tester.dtx_demarcation_start(xid=tx)
+ tester.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever"))
+ tester.dtx_demarcation_end(xid=tx)
+ failed = False
+ try:
+ tester.dtx_coordination_commit(xid=tx, one_phase=False)
+ except Closed, e:
+ failed = True
+ error = e
+
+ if failed:
+ self.channel.dtx_coordination_rollback(xid=tx)
+ self.assertConnectionException(503, e.args[0])
+ else:
+ tester.session_close()
+ other.close()
+ self.fail("Invalid use of one_phase=False, expected exception!")
+
+ def test_implicit_end(self):
+ """
+ Test that an association is implicitly ended when the channel
+ is closed (whether by exception or explicit client request)
+ and the transaction in question is marked as rollback only.
+ """
+ channel1 = self.channel
+ channel2 = self.client.channel(2)
+ channel2.session_open()
+
+ #setup:
+ channel2.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+ channel2.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever"))
+ tx = self.xid("dummy")
+
+ channel2.dtx_demarcation_select()
+ channel2.dtx_demarcation_start(xid=tx)
+ channel2.message_subscribe(queue="dummy", destination="dummy", confirm_mode=1)
+ channel2.message_flow(destination="dummy", unit=0, value=1)
+ channel2.message_flow(destination="dummy", unit=1, value=0xFFFFFFFF)
+ self.client.queue("dummy").get(timeout=1).complete()
+ channel2.message_cancel(destination="dummy")
+ channel2.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever"))
+ channel2.session_close()
+
+ self.assertEqual(self.XA_RBROLLBACK, channel1.dtx_coordination_prepare(xid=tx).status)
+ channel1.dtx_coordination_rollback(xid=tx)
+
+ def test_get_timeout(self):
+ """
+ Check that get-timeout returns the correct value, (and that a
+ transaction with a timeout can complete normally)
+ """
+ channel = self.channel
+ tx = self.xid("dummy")
+
+ channel.dtx_demarcation_select()
+ channel.dtx_demarcation_start(xid=tx)
+ self.assertEqual(0, channel.dtx_coordination_get_timeout(xid=tx).timeout)
+ channel.dtx_coordination_set_timeout(xid=tx, timeout=60)
+ self.assertEqual(60, channel.dtx_coordination_get_timeout(xid=tx).timeout)
+ self.assertEqual(self.XA_OK, channel.dtx_demarcation_end(xid=tx).status)
+ self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status)
+
+ def test_set_timeout(self):
+ """
+ Test the timeout of a transaction results in the expected
+ behaviour
+ """
+ #open new channel to allow self.channel to be used in checking te queue
+ channel = self.client.channel(2)
+ channel.session_open()
+ #setup:
+ tx = self.xid("dummy")
+ channel.queue_declare(queue="queue-a", exclusive=True, auto_delete=True)
+ channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True)
+ channel.message_transfer(content=Content(properties={'routing_key':"queue-a", 'message_id':"timeout"}, body="DtxMessage"))
+
+ channel.dtx_demarcation_select()
+ channel.dtx_demarcation_start(xid=tx)
+ self.swap(channel, "queue-a", "queue-b")
+ channel.dtx_coordination_set_timeout(xid=tx, timeout=2)
+ sleep(3)
+ #check that the work has been rolled back already
+ self.assertMessageCount(1, "queue-a")
+ self.assertMessageCount(0, "queue-b")
+ self.assertMessageId("timeout", "queue-a")
+ #check the correct codes are returned when we try to complete the txn
+ self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_demarcation_end(xid=tx).status)
+ self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_coordination_rollback(xid=tx).status)
+
+
+
+ def test_recover(self):
+ """
+ Test basic recover behaviour
+ """
+ channel = self.channel
+
+ channel.dtx_demarcation_select()
+ channel.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+
+ prepared = []
+ for i in range(1, 10):
+ tx = self.xid("tx%s" % (i))
+ channel.dtx_demarcation_start(xid=tx)
+ channel.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="message%s" % (i)))
+ channel.dtx_demarcation_end(xid=tx)
+ if i in [2, 5, 6, 8]:
+ channel.dtx_coordination_prepare(xid=tx)
+ prepared.append(tx)
+ else:
+ channel.dtx_coordination_rollback(xid=tx)
+
+ xids = channel.dtx_coordination_recover().in_doubt
+
+ #rollback the prepared transactions returned by recover
+ for x in xids:
+ channel.dtx_coordination_rollback(xid=x)
+
+ #validate against the expected list of prepared transactions
+ actual = set(xids)
+ expected = set(prepared)
+ intersection = actual.intersection(expected)
+
+ if intersection != expected:
+ missing = expected.difference(actual)
+ extra = actual.difference(expected)
+ for x in missing:
+ channel.dtx_coordination_rollback(xid=x)
+ self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra))
+
+ def test_bad_resume(self):
+ """
+ Test that a resume on a session not selected for use with dtx fails
+ """
+ channel = self.channel
+ try:
+ channel.dtx_demarcation_start(resume=True)
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def xid(self, txid):
+ DtxTests.tx_counter += 1
+ branchqual = "v%s" % DtxTests.tx_counter
+ return pack('!LBB', 0, len(txid), len(branchqual)) + txid + branchqual
+
+ def txswap(self, tx, id):
+ channel = self.channel
+ #declare two queues:
+ channel.queue_declare(queue="queue-a", exclusive=True, auto_delete=True)
+ channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True)
+ #put message with specified id on one queue:
+ channel.message_transfer(content=Content(properties={'routing_key':"queue-a", 'message_id':id}, body="DtxMessage"))
+
+ #start the transaction:
+ channel.dtx_demarcation_select()
+ self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_start(xid=tx).status)
+
+ #'swap' the message from one queue to the other, under that transaction:
+ self.swap(self.channel, "queue-a", "queue-b")
+
+ #mark the end of the transactional work:
+ self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_end(xid=tx).status)
+
+ def swap(self, channel, src, dest):
+ #consume from src:
+ channel.message_subscribe(destination="temp-swap", queue=src, confirm_mode=1)
+ channel.message_flow(destination="temp-swap", unit=0, value=1)
+ channel.message_flow(destination="temp-swap", unit=1, value=0xFFFFFFFF)
+ msg = self.client.queue("temp-swap").get(timeout=1)
+ channel.message_cancel(destination="temp-swap")
+ msg.complete();
+
+ #re-publish to dest
+ channel.message_transfer(content=Content(properties={'routing_key':dest, 'message_id':msg.content['message_id']},
+ body=msg.content.body))
+
+ def assertMessageCount(self, expected, queue):
+ self.assertEqual(expected, self.channel.queue_query(queue=queue).message_count)
+
+ def assertMessageId(self, expected, queue):
+ self.channel.message_subscribe(queue=queue, destination="results")
+ self.channel.message_flow(destination="results", unit=0, value=1)
+ self.channel.message_flow(destination="results", unit=1, value=0xFFFFFFFF)
+ self.assertEqual(expected, self.client.queue("results").get(timeout=1).content['message_id'])
+ self.channel.message_cancel(destination="results")
diff --git a/python/tests_0-10_preview/example.py b/python/tests_0-10_preview/example.py
new file mode 100644
index 0000000000..da5ee2441f
--- /dev/null
+++ b/python/tests_0-10_preview/example.py
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+
+class ExampleTest (TestBase):
+ """
+ An example Qpid test, illustrating the unittest framework and the
+ python Qpid client. The test class must inherit TestBase. The
+ test code uses the Qpid client to interact with a qpid broker and
+ verify it behaves as expected.
+ """
+
+ def test_example(self):
+ """
+ An example test. Note that test functions must start with 'test_'
+ to be recognized by the test framework.
+ """
+
+ # By inheriting TestBase, self.client is automatically connected
+ # and self.channel is automatically opened as channel(1)
+ # Other channel methods mimic the protocol.
+ channel = self.channel
+
+ # Now we can send regular commands. If you want to see what the method
+ # arguments mean or what other commands are available, you can use the
+ # python builtin help() method. For example:
+ #help(chan)
+ #help(chan.exchange_declare)
+
+ # If you want browse the available protocol methods without being
+ # connected to a live server you can use the amqp-doc utility:
+ #
+ # Usage amqp-doc [<options>] <spec> [<pattern_1> ... <pattern_n>]
+ #
+ # Options:
+ # -e, --regexp use regex instead of glob when matching
+
+ # Now that we know what commands are available we can use them to
+ # interact with the server.
+
+ # Here we use ordinal arguments.
+ self.exchange_declare(channel, 0, "test", "direct")
+
+ # Here we use keyword arguments.
+ self.queue_declare(channel, queue="test-queue")
+ channel.queue_bind(queue="test-queue", exchange="test", routing_key="key")
+
+ # Call Channel.basic_consume to register as a consumer.
+ # All the protocol methods return a message object. The message object
+ # has fields corresponding to the reply method fields, plus a content
+ # field that is filled if the reply includes content. In this case the
+ # interesting field is the consumer_tag.
+ channel.message_subscribe(queue="test-queue", destination="consumer_tag")
+ channel.message_flow(destination="consumer_tag", unit=0, value=0xFFFFFFFF)
+ channel.message_flow(destination="consumer_tag", unit=1, value=0xFFFFFFFF)
+
+ # We can use the Client.queue(...) method to access the queue
+ # corresponding to our consumer_tag.
+ queue = self.client.queue("consumer_tag")
+
+ # Now lets publish a message and see if our consumer gets it. To do
+ # this we need to import the Content class.
+ sent = Content("Hello World!")
+ sent["routing_key"] = "key"
+ channel.message_transfer(destination="test", content=sent)
+
+ # Now we'll wait for the message to arrive. We can use the timeout
+ # argument in case the server hangs. By default queue.get() will wait
+ # until a message arrives or the connection to the server dies.
+ msg = queue.get(timeout=10)
+
+ # And check that we got the right response with assertEqual
+ self.assertEqual(sent.body, msg.content.body)
+
+ # Now acknowledge the message.
+ msg.complete()
+
diff --git a/python/tests_0-10_preview/exchange.py b/python/tests_0-10_preview/exchange.py
new file mode 100644
index 0000000000..86c39b7736
--- /dev/null
+++ b/python/tests_0-10_preview/exchange.py
@@ -0,0 +1,335 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Tests for exchange behaviour.
+
+Test classes ending in 'RuleTests' are derived from rules in amqp.xml.
+"""
+
+import Queue, logging
+from qpid.testlib import TestBase
+from qpid.content import Content
+from qpid.client import Closed
+
+
+class StandardExchangeVerifier:
+ """Verifies standard exchange behavior.
+
+ Used as base class for classes that test standard exchanges."""
+
+ def verifyDirectExchange(self, ex):
+ """Verify that ex behaves like a direct exchange."""
+ self.queue_declare(queue="q")
+ self.channel.queue_bind(queue="q", exchange=ex, routing_key="k")
+ self.assertPublishConsume(exchange=ex, queue="q", routing_key="k")
+ try:
+ self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk")
+ self.fail("Expected Empty exception")
+ except Queue.Empty: None # Expected
+
+ def verifyFanOutExchange(self, ex):
+ """Verify that ex behaves like a fanout exchange."""
+ self.queue_declare(queue="q")
+ self.channel.queue_bind(queue="q", exchange=ex)
+ self.queue_declare(queue="p")
+ self.channel.queue_bind(queue="p", exchange=ex)
+ for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex)
+
+ def verifyTopicExchange(self, ex):
+ """Verify that ex behaves like a topic exchange"""
+ self.queue_declare(queue="a")
+ self.channel.queue_bind(queue="a", exchange=ex, routing_key="a.#.b.*")
+ q = self.consume("a")
+ self.assertPublishGet(q, ex, "a.b.x")
+ self.assertPublishGet(q, ex, "a.x.b.x")
+ self.assertPublishGet(q, ex, "a.x.x.b.x")
+ # Shouldn't match
+ self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b"}))
+ self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b.x.y"}))
+ self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"x.a.b.x"}))
+ self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b"}))
+ self.assert_(q.empty())
+
+ def verifyHeadersExchange(self, ex):
+ """Verify that ex is a headers exchange"""
+ self.queue_declare(queue="q")
+ self.channel.queue_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} )
+ q = self.consume("q")
+ headers = {"name":"fred", "age":3}
+ self.assertPublishGet(q, exchange=ex, properties=headers)
+ self.channel.message_transfer(destination=ex) # No headers, won't deliver
+ self.assertEmpty(q);
+
+
+class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier):
+ """
+ The server SHOULD implement these standard exchange types: topic, headers.
+
+ Client attempts to declare an exchange with each of these standard types.
+ """
+
+ def testDirect(self):
+ """Declare and test a direct exchange"""
+ self.exchange_declare(0, exchange="d", type="direct")
+ self.verifyDirectExchange("d")
+
+ def testFanout(self):
+ """Declare and test a fanout exchange"""
+ self.exchange_declare(0, exchange="f", type="fanout")
+ self.verifyFanOutExchange("f")
+
+ def testTopic(self):
+ """Declare and test a topic exchange"""
+ self.exchange_declare(0, exchange="t", type="topic")
+ self.verifyTopicExchange("t")
+
+ def testHeaders(self):
+ """Declare and test a headers exchange"""
+ self.exchange_declare(0, exchange="h", type="headers")
+ self.verifyHeadersExchange("h")
+
+
+class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier):
+ """
+ The server MUST, in each virtual host, pre-declare an exchange instance
+ for each standard exchange type that it implements, where the name of the
+ exchange instance is amq. followed by the exchange type name.
+
+ Client creates a temporary queue and attempts to bind to each required
+ exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if
+ those types are defined).
+ """
+ def testAmqDirect(self): self.verifyDirectExchange("amq.direct")
+
+ def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout")
+
+ def testAmqTopic(self): self.verifyTopicExchange("amq.topic")
+
+ def testAmqMatch(self): self.verifyHeadersExchange("amq.match")
+
+class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier):
+ """
+ The server MUST predeclare a direct exchange to act as the default exchange
+ for content Publish methods and for default queue bindings.
+
+ Client checks that the default exchange is active by specifying a queue
+ binding with no exchange name, and publishing a message with a suitable
+ routing key but without specifying the exchange name, then ensuring that
+ the message arrives in the queue correctly.
+ """
+ def testDefaultExchange(self):
+ # Test automatic binding by queue name.
+ self.queue_declare(queue="d")
+ self.assertPublishConsume(queue="d", routing_key="d")
+ # Test explicit bind to default queue
+ self.verifyDirectExchange("")
+
+
+# TODO aconway 2006-09-27: Fill in empty tests:
+
+class DefaultAccessRuleTests(TestBase):
+ """
+ The server MUST NOT allow clients to access the default exchange except
+ by specifying an empty exchange name in the Queue.Bind and content Publish
+ methods.
+ """
+
+class ExtensionsRuleTests(TestBase):
+ """
+ The server MAY implement other exchange types as wanted.
+ """
+
+
+class DeclareMethodMinimumRuleTests(TestBase):
+ """
+ The server SHOULD support a minimum of 16 exchanges per virtual host and
+ ideally, impose no limit except as defined by available resources.
+
+ The client creates as many exchanges as it can until the server reports
+ an error; the number of exchanges successfuly created must be at least
+ sixteen.
+ """
+
+
+class DeclareMethodTicketFieldValidityRuleTests(TestBase):
+ """
+ The client MUST provide a valid access ticket giving "active" access to
+ the realm in which the exchange exists or will be created, or "passive"
+ access if the if-exists flag is set.
+
+ Client creates access ticket with wrong access rights and attempts to use
+ in this method.
+ """
+
+
+class DeclareMethodExchangeFieldReservedRuleTests(TestBase):
+ """
+ Exchange names starting with "amq." are reserved for predeclared and
+ standardised exchanges. The client MUST NOT attempt to create an exchange
+ starting with "amq.".
+
+
+ """
+
+
+class DeclareMethodTypeFieldTypedRuleTests(TestBase):
+ """
+ Exchanges cannot be redeclared with different types. The client MUST not
+ attempt to redeclare an existing exchange with a different type than used
+ in the original Exchange.Declare method.
+
+
+ """
+
+
+class DeclareMethodTypeFieldSupportRuleTests(TestBase):
+ """
+ The client MUST NOT attempt to create an exchange with a type that the
+ server does not support.
+
+
+ """
+
+
+class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase):
+ """
+ If set, and the exchange does not already exist, the server MUST raise a
+ channel exception with reply code 404 (not found).
+ """
+ def test(self):
+ try:
+ self.channel.exchange_declare(exchange="humpty_dumpty", passive=True)
+ self.fail("Expected 404 for passive declaration of unknown exchange.")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+
+class DeclareMethodDurableFieldSupportRuleTests(TestBase):
+ """
+ The server MUST support both durable and transient exchanges.
+
+
+ """
+
+
+class DeclareMethodDurableFieldStickyRuleTests(TestBase):
+ """
+ The server MUST ignore the durable field if the exchange already exists.
+
+
+ """
+
+
+class DeclareMethodAutoDeleteFieldStickyRuleTests(TestBase):
+ """
+ The server MUST ignore the auto-delete field if the exchange already
+ exists.
+
+
+ """
+
+
+class DeleteMethodTicketFieldValidityRuleTests(TestBase):
+ """
+ The client MUST provide a valid access ticket giving "active" access
+ rights to the exchange's access realm.
+
+ Client creates access ticket with wrong access rights and attempts to use
+ in this method.
+ """
+
+
+class DeleteMethodExchangeFieldExistsRuleTests(TestBase):
+ """
+ The client MUST NOT attempt to delete an exchange that does not exist.
+ """
+
+
+class HeadersExchangeTests(TestBase):
+ """
+ Tests for headers exchange functionality.
+ """
+ def setUp(self):
+ TestBase.setUp(self)
+ self.queue_declare(queue="q")
+ self.q = self.consume("q")
+
+ def myAssertPublishGet(self, headers):
+ self.assertPublishGet(self.q, exchange="amq.match", properties=headers)
+
+ def myBasicPublish(self, headers):
+ self.channel.message_transfer(destination="amq.match", content=Content("foobar", properties={'application_headers':headers}))
+
+ def testMatchAll(self):
+ self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3})
+ self.myAssertPublishGet({"name":"fred", "age":3})
+ self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"})
+
+ # None of these should match
+ self.myBasicPublish({})
+ self.myBasicPublish({"name":"barney"})
+ self.myBasicPublish({"name":10})
+ self.myBasicPublish({"name":"fred", "age":2})
+ self.assertEmpty(self.q)
+
+ def testMatchAny(self):
+ self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3})
+ self.myAssertPublishGet({"name":"fred"})
+ self.myAssertPublishGet({"name":"fred", "ignoreme":10})
+ self.myAssertPublishGet({"ignoreme":10, "age":3})
+
+ # Wont match
+ self.myBasicPublish({})
+ self.myBasicPublish({"irrelevant":0})
+ self.assertEmpty(self.q)
+
+
+class MiscellaneousErrorsTests(TestBase):
+ """
+ Test some miscellaneous error conditions
+ """
+ def testTypeNotKnown(self):
+ try:
+ self.channel.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type")
+ self.fail("Expected 503 for declaration of unknown exchange type.")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def testDifferentDeclaredType(self):
+ self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="direct")
+ try:
+ self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="topic")
+ self.fail("Expected 530 for redeclaration of exchange with different type.")
+ except Closed, e:
+ self.assertConnectionException(530, e.args[0])
+ #cleanup
+ other = self.connect()
+ c2 = other.channel(1)
+ c2.session_open()
+ c2.exchange_delete(exchange="test_different_declared_type_exchange")
+
+class ExchangeTests(TestBase):
+ def testHeadersBindNoMatchArg(self):
+ self.channel.queue_declare(queue="q", exclusive=True, auto_delete=True)
+ try:
+ self.channel.queue_bind(queue="q", exchange="amq.match", arguments={"name":"fred" , "age":3} )
+ self.fail("Expected failure for missing x-match arg.")
+ except Closed, e:
+ self.assertConnectionException(541, e.args[0])
diff --git a/python/tests_0-10_preview/execution.py b/python/tests_0-10_preview/execution.py
new file mode 100644
index 0000000000..3ff6d8ea65
--- /dev/null
+++ b/python/tests_0-10_preview/execution.py
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+
+class ExecutionTests (TestBase):
+ def test_flush(self):
+ channel = self.channel
+ for i in [1, 2, 3]:
+ channel.message_transfer(
+ content=Content(properties={'routing_key':str(i)}))
+ assert(channel.completion.wait(channel.completion.command_id, timeout=1))
diff --git a/python/tests_0-10_preview/message.py b/python/tests_0-10_preview/message.py
new file mode 100644
index 0000000000..a3d32bdb2d
--- /dev/null
+++ b/python/tests_0-10_preview/message.py
@@ -0,0 +1,834 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+from qpid.reference import Reference, ReferenceId
+
+class MessageTests(TestBase):
+ """Tests for 'methods' on the amqp message 'class'"""
+
+ def test_consume_no_local(self):
+ """
+ Test that the no_local flag is honoured in the consume method
+ """
+ channel = self.channel
+ #setup, declare two queues:
+ channel.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True)
+ channel.queue_declare(queue="test-queue-1b", exclusive=True, auto_delete=True)
+ #establish two consumers one of which excludes delivery of locally sent messages
+ self.subscribe(destination="local_included", queue="test-queue-1a")
+ self.subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True)
+
+ #send a message
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1a"}, body="consume_no_local"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1b"}, body="consume_no_local"))
+
+ #check the queues of the two consumers
+ excluded = self.client.queue("local_excluded")
+ included = self.client.queue("local_included")
+ msg = included.get(timeout=1)
+ self.assertEqual("consume_no_local", msg.content.body)
+ try:
+ excluded.get(timeout=1)
+ self.fail("Received locally published message though no_local=true")
+ except Empty: None
+
+ def test_consume_no_local_awkward(self):
+
+ """
+ If an exclusive queue gets a no-local delivered to it, that
+ message could 'block' delivery of subsequent messages or it
+ could be left on the queue, possibly never being consumed
+ (this is the case for example in the qpid JMS mapping of
+ topics). This test excercises a Qpid C++ broker hack that
+ deletes such messages.
+ """
+
+ channel = self.channel
+ #setup:
+ channel.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
+ #establish consumer which excludes delivery of locally sent messages
+ self.subscribe(destination="local_excluded", queue="test-queue", no_local=True)
+
+ #send a 'local' message
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-queue"}, body="local"))
+
+ #send a non local message
+ other = self.connect()
+ channel2 = other.channel(1)
+ channel2.session_open()
+ channel2.message_transfer(content=Content(properties={'routing_key' : "test-queue"}, body="foreign"))
+ channel2.session_close()
+ other.close()
+
+ #check that the second message only is delivered
+ excluded = self.client.queue("local_excluded")
+ msg = excluded.get(timeout=1)
+ self.assertEqual("foreign", msg.content.body)
+ try:
+ excluded.get(timeout=1)
+ self.fail("Received extra message")
+ except Empty: None
+ #check queue is empty
+ self.assertEqual(0, channel.queue_query(queue="test-queue").message_count)
+
+
+ def test_consume_exclusive(self):
+ """
+ Test that the exclusive flag is honoured in the consume method
+ """
+ channel = self.channel
+ #setup, declare a queue:
+ channel.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True)
+
+ #check that an exclusive consumer prevents other consumer being created:
+ self.subscribe(destination="first", queue="test-queue-2", exclusive=True)
+ try:
+ self.subscribe(destination="second", queue="test-queue-2")
+ self.fail("Expected consume request to fail due to previous exclusive consumer")
+ except Closed, e:
+ self.assertChannelException(403, e.args[0])
+
+ #open new channel and cleanup last consumer:
+ channel = self.client.channel(2)
+ channel.session_open()
+
+ #check that an exclusive consumer cannot be created if a consumer already exists:
+ self.subscribe(channel, destination="first", queue="test-queue-2")
+ try:
+ self.subscribe(destination="second", queue="test-queue-2", exclusive=True)
+ self.fail("Expected exclusive consume request to fail due to previous consumer")
+ except Closed, e:
+ self.assertChannelException(403, e.args[0])
+
+ def test_consume_queue_errors(self):
+ """
+ Test error conditions associated with the queue field of the consume method:
+ """
+ channel = self.channel
+ try:
+ #queue specified but doesn't exist:
+ self.subscribe(queue="invalid-queue", destination="")
+ self.fail("Expected failure when consuming from non-existent queue")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+ channel = self.client.channel(2)
+ channel.session_open()
+ try:
+ #queue not specified and none previously declared for channel:
+ self.subscribe(channel, queue="", destination="")
+ self.fail("Expected failure when consuming from unspecified queue")
+ except Closed, e:
+ self.assertConnectionException(530, e.args[0])
+
+ def test_consume_unique_consumers(self):
+ """
+ Ensure unique consumer tags are enforced
+ """
+ channel = self.channel
+ #setup, declare a queue:
+ channel.queue_declare(queue="test-queue-3", exclusive=True, auto_delete=True)
+
+ #check that attempts to use duplicate tags are detected and prevented:
+ self.subscribe(destination="first", queue="test-queue-3")
+ try:
+ self.subscribe(destination="first", queue="test-queue-3")
+ self.fail("Expected consume request to fail due to non-unique tag")
+ except Closed, e:
+ self.assertConnectionException(530, e.args[0])
+
+ def test_cancel(self):
+ """
+ Test compliance of the basic.cancel method
+ """
+ channel = self.channel
+ #setup, declare a queue:
+ channel.queue_declare(queue="test-queue-4", exclusive=True, auto_delete=True)
+ self.subscribe(destination="my-consumer", queue="test-queue-4")
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="One"))
+
+ #cancel should stop messages being delivered
+ channel.message_cancel(destination="my-consumer")
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="Two"))
+ myqueue = self.client.queue("my-consumer")
+ msg = myqueue.get(timeout=1)
+ self.assertEqual("One", msg.content.body)
+ try:
+ msg = myqueue.get(timeout=1)
+ self.fail("Got message after cancellation: " + msg)
+ except Empty: None
+
+ #cancellation of non-existant consumers should be handled without error
+ channel.message_cancel(destination="my-consumer")
+ channel.message_cancel(destination="this-never-existed")
+
+
+ def test_ack(self):
+ """
+ Test basic ack/recover behaviour
+ """
+ channel = self.channel
+ channel.queue_declare(queue="test-ack-queue", exclusive=True, auto_delete=True)
+
+ self.subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1)
+ queue = self.client.queue("consumer_tag")
+
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="One"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Two"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Three"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Four"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Five"))
+
+ msg1 = queue.get(timeout=1)
+ msg2 = queue.get(timeout=1)
+ msg3 = queue.get(timeout=1)
+ msg4 = queue.get(timeout=1)
+ msg5 = queue.get(timeout=1)
+
+ self.assertEqual("One", msg1.content.body)
+ self.assertEqual("Two", msg2.content.body)
+ self.assertEqual("Three", msg3.content.body)
+ self.assertEqual("Four", msg4.content.body)
+ self.assertEqual("Five", msg5.content.body)
+
+ msg2.complete(cumulative=True)#One and Two
+ msg4.complete(cumulative=False)
+
+ channel.message_recover(requeue=False)
+
+ msg3b = queue.get(timeout=1)
+ msg5b = queue.get(timeout=1)
+
+ self.assertEqual("Three", msg3b.content.body)
+ self.assertEqual("Five", msg5b.content.body)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.content.body)
+ except Empty: None
+
+
+ def test_recover(self):
+ """
+ Test recover behaviour
+ """
+ channel = self.channel
+ channel.queue_declare(queue="queue-a", exclusive=True, auto_delete=True)
+ channel.queue_bind(exchange="amq.fanout", queue="queue-a")
+ channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True)
+ channel.queue_bind(exchange="amq.fanout", queue="queue-b")
+
+ self.subscribe(queue="queue-a", destination="unconfirmed", confirm_mode=1)
+ self.subscribe(queue="queue-b", destination="confirmed", confirm_mode=0)
+ confirmed = self.client.queue("confirmed")
+ unconfirmed = self.client.queue("unconfirmed")
+
+ data = ["One", "Two", "Three", "Four", "Five"]
+ for d in data:
+ channel.message_transfer(destination="amq.fanout", content=Content(body=d))
+
+ for q in [confirmed, unconfirmed]:
+ for d in data:
+ self.assertEqual(d, q.get(timeout=1).content.body)
+ self.assertEmpty(q)
+
+ channel.message_recover(requeue=False)
+
+ self.assertEmpty(confirmed)
+
+ while len(data):
+ msg = None
+ for d in data:
+ msg = unconfirmed.get(timeout=1)
+ self.assertEqual(d, msg.content.body)
+ self.assertEqual(True, msg.content['redelivered'])
+ self.assertEmpty(unconfirmed)
+ data.remove(msg.content.body)
+ msg.complete(cumulative=False)
+ channel.message_recover(requeue=False)
+
+
+ def test_recover_requeue(self):
+ """
+ Test requeing on recovery
+ """
+ channel = self.channel
+ channel.queue_declare(queue="test-requeue", exclusive=True, auto_delete=True)
+
+ self.subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1)
+ queue = self.client.queue("consumer_tag")
+
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="One"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Two"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Three"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Four"))
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Five"))
+
+ msg1 = queue.get(timeout=1)
+ msg2 = queue.get(timeout=1)
+ msg3 = queue.get(timeout=1)
+ msg4 = queue.get(timeout=1)
+ msg5 = queue.get(timeout=1)
+
+ self.assertEqual("One", msg1.content.body)
+ self.assertEqual("Two", msg2.content.body)
+ self.assertEqual("Three", msg3.content.body)
+ self.assertEqual("Four", msg4.content.body)
+ self.assertEqual("Five", msg5.content.body)
+
+ msg2.complete(cumulative=True) #One and Two
+ msg4.complete(cumulative=False) #Four
+
+ channel.message_cancel(destination="consumer_tag")
+
+ #publish a new message
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Six"))
+ #requeue unacked messages (Three and Five)
+ channel.message_recover(requeue=True)
+
+ self.subscribe(queue="test-requeue", destination="consumer_tag")
+ queue2 = self.client.queue("consumer_tag")
+
+ msg3b = queue2.get(timeout=1)
+ msg5b = queue2.get(timeout=1)
+
+ self.assertEqual("Three", msg3b.content.body)
+ self.assertEqual("Five", msg5b.content.body)
+
+ self.assertEqual(True, msg3b.content['redelivered'])
+ self.assertEqual(True, msg5b.content['redelivered'])
+
+ self.assertEqual("Six", queue2.get(timeout=1).content.body)
+
+ try:
+ extra = queue2.get(timeout=1)
+ self.fail("Got unexpected message in second queue: " + extra.content.body)
+ except Empty: None
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in original queue: " + extra.content.body)
+ except Empty: None
+
+
+ def test_qos_prefetch_count(self):
+ """
+ Test that the prefetch count specified is honoured
+ """
+ #setup: declare queue and subscribe
+ channel = self.channel
+ channel.queue_declare(queue="test-prefetch-count", exclusive=True, auto_delete=True)
+ subscription = self.subscribe(queue="test-prefetch-count", destination="consumer_tag", confirm_mode=1)
+ queue = self.client.queue("consumer_tag")
+
+ #set prefetch to 5:
+ channel.message_qos(prefetch_count=5)
+
+ #publish 10 messages:
+ for i in range(1, 11):
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-count"}, body="Message %d" % i))
+
+ #only 5 messages should have been delivered:
+ for i in range(1, 6):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
+ except Empty: None
+
+ #ack messages and check that the next set arrive ok:
+ msg.complete()
+
+ for i in range(6, 11):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+
+ msg.complete()
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
+ except Empty: None
+
+
+
+ def test_qos_prefetch_size(self):
+ """
+ Test that the prefetch size specified is honoured
+ """
+ #setup: declare queue and subscribe
+ channel = self.channel
+ channel.queue_declare(queue="test-prefetch-size", exclusive=True, auto_delete=True)
+ subscription = self.subscribe(queue="test-prefetch-size", destination="consumer_tag", confirm_mode=1)
+ queue = self.client.queue("consumer_tag")
+
+ #set prefetch to 50 bytes (each message is 9 or 10 bytes):
+ channel.message_qos(prefetch_size=50)
+
+ #publish 10 messages:
+ for i in range(1, 11):
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body="Message %d" % i))
+
+ #only 5 messages should have been delivered (i.e. 45 bytes worth):
+ for i in range(1, 6):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
+ except Empty: None
+
+ #ack messages and check that the next set arrive ok:
+ msg.complete()
+
+ for i in range(6, 11):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+
+ msg.complete()
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
+ except Empty: None
+
+ #make sure that a single oversized message still gets delivered
+ large = "abcdefghijklmnopqrstuvwxyz"
+ large = large + "-" + large;
+ channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body=large))
+ msg = queue.get(timeout=1)
+ self.assertEqual(large, msg.content.body)
+
+ def test_reject(self):
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True, alternate_exchange="amq.fanout")
+ channel.queue_declare(queue = "r", exclusive=True, auto_delete=True)
+ channel.queue_bind(queue = "r", exchange = "amq.fanout")
+
+ self.subscribe(queue = "q", destination = "consumer", confirm_mode = 1)
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body="blah, blah"))
+ msg = self.client.queue("consumer").get(timeout = 1)
+ self.assertEquals(msg.content.body, "blah, blah")
+ channel.message_reject([msg.command_id, msg.command_id])
+
+ self.subscribe(queue = "r", destination = "checker")
+ msg = self.client.queue("checker").get(timeout = 1)
+ self.assertEquals(msg.content.body, "blah, blah")
+
+ def test_credit_flow_messages(self):
+ """
+ Test basic credit based flow control with unit = message
+ """
+ #declare an exclusive queue
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ #create consumer (for now that defaults to infinite credit)
+ channel.message_subscribe(queue = "q", destination = "c")
+ channel.message_flow_mode(mode = 0, destination = "c")
+ #send batch of messages to queue
+ for i in range(1, 11):
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
+
+ #set message credit to finite amount (less than enough for all messages)
+ channel.message_flow(unit = 0, value = 5, destination = "c")
+ #set infinite byte credit
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
+ #check that expected number were received
+ q = self.client.queue("c")
+ for i in range(1, 6):
+ self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i)
+ self.assertEmpty(q)
+
+ #increase credit again and check more are received
+ for i in range(6, 11):
+ channel.message_flow(unit = 0, value = 1, destination = "c")
+ self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i)
+ self.assertEmpty(q)
+
+ def test_credit_flow_bytes(self):
+ """
+ Test basic credit based flow control with unit = bytes
+ """
+ #declare an exclusive queue
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ #create consumer (for now that defaults to infinite credit)
+ channel.message_subscribe(queue = "q", destination = "c")
+ channel.message_flow_mode(mode = 0, destination = "c")
+ #send batch of messages to queue
+ for i in range(10):
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
+
+ #each message is currently interpreted as requiring msg_size bytes of credit
+ msg_size = 35
+
+ #set byte credit to finite amount (less than enough for all messages)
+ channel.message_flow(unit = 1, value = msg_size*5, destination = "c")
+ #set infinite message credit
+ channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c")
+ #check that expected number were received
+ q = self.client.queue("c")
+ for i in range(5):
+ self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
+ self.assertEmpty(q)
+
+ #increase credit again and check more are received
+ for i in range(5):
+ channel.message_flow(unit = 1, value = msg_size, destination = "c")
+ self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
+ self.assertEmpty(q)
+
+
+ def test_window_flow_messages(self):
+ """
+ Test basic window based flow control with unit = message
+ """
+ #declare an exclusive queue
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ #create consumer (for now that defaults to infinite credit)
+ channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
+ channel.message_flow_mode(mode = 1, destination = "c")
+ #send batch of messages to queue
+ for i in range(1, 11):
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
+
+ #set message credit to finite amount (less than enough for all messages)
+ channel.message_flow(unit = 0, value = 5, destination = "c")
+ #set infinite byte credit
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
+ #check that expected number were received
+ q = self.client.queue("c")
+ for i in range(1, 6):
+ msg = q.get(timeout = 1)
+ self.assertDataEquals(channel, msg, "Message %d" % i)
+ self.assertEmpty(q)
+
+ #acknowledge messages and check more are received
+ msg.complete(cumulative=True)
+ for i in range(6, 11):
+ self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i)
+ self.assertEmpty(q)
+
+
+ def test_window_flow_bytes(self):
+ """
+ Test basic window based flow control with unit = bytes
+ """
+ #declare an exclusive queue
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ #create consumer (for now that defaults to infinite credit)
+ channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
+ channel.message_flow_mode(mode = 1, destination = "c")
+ #send batch of messages to queue
+ for i in range(10):
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
+
+ #each message is currently interpreted as requiring msg_size bytes of credit
+ msg_size = 40
+
+ #set byte credit to finite amount (less than enough for all messages)
+ channel.message_flow(unit = 1, value = msg_size*5, destination = "c")
+ #set infinite message credit
+ channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c")
+ #check that expected number were received
+ q = self.client.queue("c")
+ msgs = []
+ for i in range(5):
+ msg = q.get(timeout = 1)
+ msgs.append(msg)
+ self.assertDataEquals(channel, msg, "abcdefgh")
+ self.assertEmpty(q)
+
+ #ack each message individually and check more are received
+ for i in range(5):
+ msg = msgs.pop()
+ msg.complete(cumulative=False)
+ self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
+ self.assertEmpty(q)
+
+ def test_subscribe_not_acquired(self):
+ """
+ Test the not-acquired modes works as expected for a simple case
+ """
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ for i in range(1, 6):
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
+
+ self.subscribe(queue = "q", destination = "a", acquire_mode = 1)
+ self.subscribe(queue = "q", destination = "b", acquire_mode = 1)
+
+ for i in range(6, 11):
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
+
+ #both subscribers should see all messages
+ qA = self.client.queue("a")
+ qB = self.client.queue("b")
+ for i in range(1, 11):
+ for q in [qA, qB]:
+ msg = q.get(timeout = 1)
+ self.assertEquals("Message %s" % i, msg.content.body)
+ msg.complete()
+
+ #messages should still be on the queue:
+ self.assertEquals(10, channel.queue_query(queue = "q").message_count)
+
+ def test_acquire(self):
+ """
+ Test explicit acquire function
+ """
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "acquire me"))
+
+ self.subscribe(queue = "q", destination = "a", acquire_mode = 1, confirm_mode = 1)
+ msg = self.client.queue("a").get(timeout = 1)
+ #message should still be on the queue:
+ self.assertEquals(1, channel.queue_query(queue = "q").message_count)
+
+ channel.message_acquire([msg.command_id, msg.command_id])
+ #check that we get notification (i.e. message_acquired)
+ response = channel.control_queue.get(timeout=1)
+ self.assertEquals(response.transfers, [msg.command_id, msg.command_id])
+ #message should have been removed from the queue:
+ self.assertEquals(0, channel.queue_query(queue = "q").message_count)
+ msg.complete()
+
+
+
+
+ def test_release(self):
+ """
+ Test explicit release function
+ """
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "release me"))
+
+ self.subscribe(queue = "q", destination = "a", acquire_mode = 0, confirm_mode = 1)
+ msg = self.client.queue("a").get(timeout = 1)
+ channel.message_cancel(destination = "a")
+ channel.message_release([msg.command_id, msg.command_id])
+ msg.complete()
+
+ #message should not have been removed from the queue:
+ self.assertEquals(1, channel.queue_query(queue = "q").message_count)
+
+ def test_release_ordering(self):
+ """
+ Test order of released messages is as expected
+ """
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ for i in range (1, 11):
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "released message %s" % (i)))
+
+ channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1)
+ channel.message_flow(unit = 0, value = 10, destination = "a")
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ queue = self.client.queue("a")
+ first = queue.get(timeout = 1)
+ for i in range (2, 10):
+ self.assertEquals("released message %s" % (i), queue.get(timeout = 1).content.body)
+ last = queue.get(timeout = 1)
+ self.assertEmpty(queue)
+ channel.message_release([first.command_id, last.command_id])
+ last.complete()#will re-allocate credit, as in window mode
+ for i in range (1, 11):
+ self.assertEquals("released message %s" % (i), queue.get(timeout = 1).content.body)
+
+ def test_ranged_ack(self):
+ """
+ Test acking of messages ranges
+ """
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ for i in range (1, 11):
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message %s" % (i)))
+
+ channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1)
+ channel.message_flow(unit = 0, value = 10, destination = "a")
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ queue = self.client.queue("a")
+ for i in range (1, 11):
+ self.assertEquals("message %s" % (i), queue.get(timeout = 1).content.body)
+ self.assertEmpty(queue)
+
+ #ack all but the third message (command id 2)
+ channel.execution_complete(cumulative_execution_mark=0xFFFFFFFF, ranged_execution_set=[0,1,3,6,7,7,8,9])
+ channel.message_recover()
+ self.assertEquals("message 3", queue.get(timeout = 1).content.body)
+ self.assertEmpty(queue)
+
+ def test_subscribe_not_acquired_2(self):
+ channel = self.channel
+
+ #publish some messages
+ self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ for i in range(1, 11):
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i)))
+
+ #consume some of them
+ channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1)
+ channel.message_flow_mode(mode = 0, destination = "a")
+ channel.message_flow(unit = 0, value = 5, destination = "a")
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+
+ queue = self.client.queue("a")
+ for i in range(1, 6):
+ msg = queue.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.content.body)
+ msg.complete()
+ self.assertEmpty(queue)
+
+ #now create a not-acquired subscriber
+ channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1)
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
+
+ #check it gets those not consumed
+ queue = self.client.queue("b")
+ channel.message_flow(unit = 0, value = 1, destination = "b")
+ for i in range(6, 11):
+ msg = queue.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.content.body)
+ msg.complete()
+ channel.message_flow(unit = 0, value = 1, destination = "b")
+ self.assertEmpty(queue)
+
+ #check all 'browsed' messages are still on the queue
+ self.assertEqual(5, channel.queue_query(queue="q").message_count)
+
+ def test_subscribe_not_acquired_3(self):
+ channel = self.channel
+
+ #publish some messages
+ self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ for i in range(1, 11):
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i)))
+
+ #create a not-acquired subscriber
+ channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1)
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ channel.message_flow(unit = 0, value = 10, destination = "a")
+
+ #browse through messages
+ queue = self.client.queue("a")
+ for i in range(1, 11):
+ msg = queue.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.content.body)
+ if (i % 2):
+ #try to acquire every second message
+ channel.message_acquire([msg.command_id, msg.command_id])
+ #check that acquire succeeds
+ response = channel.control_queue.get(timeout=1)
+ self.assertEquals(response.transfers, [msg.command_id, msg.command_id])
+ msg.complete()
+ self.assertEmpty(queue)
+
+ #create a second not-acquired subscriber
+ channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1)
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
+ channel.message_flow(unit = 0, value = 1, destination = "b")
+ #check it gets those not consumed
+ queue = self.client.queue("b")
+ for i in [2,4,6,8,10]:
+ msg = queue.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.content.body)
+ msg.complete()
+ channel.message_flow(unit = 0, value = 1, destination = "b")
+ self.assertEmpty(queue)
+
+ #check all 'browsed' messages are still on the queue
+ self.assertEqual(5, channel.queue_query(queue="q").message_count)
+
+ def test_release_unacquired(self):
+ channel = self.channel
+
+ #create queue
+ self.queue_declare(queue = "q", exclusive=True, auto_delete=True, durable=True)
+
+ #send message
+ channel.message_transfer(content=Content(properties={'routing_key' : "q", 'delivery_mode':2}, body = "my-message"))
+
+ #create two 'browsers'
+ channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1)
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ channel.message_flow(unit = 0, value = 10, destination = "a")
+ queueA = self.client.queue("a")
+
+ channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1)
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
+ channel.message_flow(unit = 0, value = 10, destination = "b")
+ queueB = self.client.queue("b")
+
+ #have each browser release the message
+ msgA = queueA.get(timeout = 1)
+ channel.message_release([msgA.command_id, msgA.command_id])
+
+ msgB = queueB.get(timeout = 1)
+ channel.message_release([msgB.command_id, msgB.command_id])
+
+ #cancel browsers
+ channel.message_cancel(destination = "a")
+ channel.message_cancel(destination = "b")
+
+ #create consumer
+ channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1, acquire_mode=0)
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
+ channel.message_flow(unit = 0, value = 10, destination = "c")
+ queueC = self.client.queue("c")
+ #consume the message then ack it
+ msgC = queueC.get(timeout = 1)
+ msgC.complete()
+ #ensure there are no other messages
+ self.assertEmpty(queueC)
+
+ def test_no_size(self):
+ self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+
+ ch = self.channel
+ ch.message_transfer(content=SizelessContent(properties={'routing_key' : "q"}, body="message-body"))
+
+ ch.message_subscribe(queue = "q", destination="d", confirm_mode = 0)
+ ch.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "d")
+ ch.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "d")
+
+ queue = self.client.queue("d")
+ msg = queue.get(timeout = 3)
+ self.assertEquals("message-body", msg.content.body)
+
+ def assertDataEquals(self, channel, msg, expected):
+ self.assertEquals(expected, msg.content.body)
+
+ def assertEmpty(self, queue):
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Queue not empty, contains: " + extra.content.body)
+ except Empty: None
+
+class SizelessContent(Content):
+
+ def size(self):
+ return None
diff --git a/python/tests_0-10_preview/persistence.py b/python/tests_0-10_preview/persistence.py
new file mode 100644
index 0000000000..ad578474eb
--- /dev/null
+++ b/python/tests_0-10_preview/persistence.py
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+
+class PersistenceTests(TestBase):
+ def test_delete_queue_after_publish(self):
+ channel = self.channel
+ channel.synchronous = False
+
+ #create queue
+ channel.queue_declare(queue = "q", auto_delete=True, durable=True)
+
+ #send message
+ for i in range(1, 10):
+ channel.message_transfer(content=Content(properties={'routing_key' : "q", 'delivery_mode':2}, body = "my-message"))
+
+ channel.synchronous = True
+ #explicitly delete queue
+ channel.queue_delete(queue = "q")
+
+ def test_ack_message_from_deleted_queue(self):
+ channel = self.channel
+ channel.synchronous = False
+
+ #create queue
+ channel.queue_declare(queue = "q", auto_delete=True, durable=True)
+
+ #send message
+ channel.message_transfer(content=Content(properties={'routing_key' : "q", 'delivery_mode':2}, body = "my-message"))
+
+ #create consumer
+ channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=0)
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ channel.message_flow(unit = 0, value = 10, destination = "a")
+ queue = self.client.queue("a")
+
+ #consume the message, cancel subscription (triggering auto-delete), then ack it
+ msg = queue.get(timeout = 5)
+ channel.message_cancel(destination = "a")
+ msg.complete()
+
+ def test_queue_deletion(self):
+ channel = self.channel
+ channel.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True)
+ channel.queue_bind(exchange="amq.topic", queue="durable-subscriber-queue", routing_key="xyz")
+ channel.message_transfer(destination= "amq.topic", content=Content(properties={'routing_key' : "xyz", 'delivery_mode':2}, body = "my-message"))
+ channel.queue_delete(queue = "durable-subscriber-queue")
+
diff --git a/python/tests_0-10_preview/query.py b/python/tests_0-10_preview/query.py
new file mode 100644
index 0000000000..eba2ee6dd1
--- /dev/null
+++ b/python/tests_0-10_preview/query.py
@@ -0,0 +1,227 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+
+class QueryTests(TestBase):
+ """Tests for various query methods introduced in 0-10 and available in 0-9 for preview"""
+
+ def test_exchange_query(self):
+ """
+ Test that the exchange_query method works as expected
+ """
+ channel = self.channel
+ #check returned type for the standard exchanges
+ self.assert_type("direct", channel.exchange_query(name="amq.direct"))
+ self.assert_type("topic", channel.exchange_query(name="amq.topic"))
+ self.assert_type("fanout", channel.exchange_query(name="amq.fanout"))
+ self.assert_type("headers", channel.exchange_query(name="amq.match"))
+ self.assert_type("direct", channel.exchange_query(name=""))
+ #declare an exchange
+ channel.exchange_declare(exchange="my-test-exchange", type= "direct", durable=False)
+ #check that the result of a query is as expected
+ response = channel.exchange_query(name="my-test-exchange")
+ self.assert_type("direct", response)
+ self.assertEqual(False, response.durable)
+ self.assertEqual(False, response.not_found)
+ #delete the exchange
+ channel.exchange_delete(exchange="my-test-exchange")
+ #check that the query now reports not-found
+ self.assertEqual(True, channel.exchange_query(name="my-test-exchange").not_found)
+
+ def assert_type(self, expected_type, response):
+ self.assertEqual(expected_type, response.__getattr__("type"))
+
+ def test_binding_query_direct(self):
+ """
+ Test that the binding_query method works as expected with the direct exchange
+ """
+ self.binding_query_with_key("amq.direct")
+
+ def test_binding_query_topic(self):
+ """
+ Test that the binding_query method works as expected with the direct exchange
+ """
+ self.binding_query_with_key("amq.topic")
+
+ def binding_query_with_key(self, exchange_name):
+ channel = self.channel
+ #setup: create two queues
+ channel.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
+ channel.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
+
+ channel.queue_bind(exchange=exchange_name, queue="used-queue", routing_key="used-key")
+
+ # test detection of any binding to specific queue
+ response = channel.binding_query(exchange=exchange_name, queue="used-queue")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.queue_not_matched)
+
+ # test detection of specific binding to any queue
+ response = channel.binding_query(exchange=exchange_name, routing_key="used-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.key_not_matched)
+
+ # test detection of specific binding to specific queue
+ response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="used-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.queue_not_matched)
+ self.assertEqual(False, response.key_not_matched)
+
+ # test unmatched queue, unspecified binding
+ response = channel.binding_query(exchange=exchange_name, queue="unused-queue")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+
+ # test unspecified queue, unmatched binding
+ response = channel.binding_query(exchange=exchange_name, routing_key="unused-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.key_not_matched)
+
+ # test matched queue, unmatched binding
+ response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="unused-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.queue_not_matched)
+ self.assertEqual(True, response.key_not_matched)
+
+ # test unmatched queue, matched binding
+ response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="used-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+ self.assertEqual(False, response.key_not_matched)
+
+ # test unmatched queue, unmatched binding
+ response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="unused-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+ self.assertEqual(True, response.key_not_matched)
+
+ #test exchange not found
+ self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found)
+
+ #test queue not found
+ self.assertEqual(True, channel.binding_query(exchange=exchange_name, queue="unknown-queue").queue_not_found)
+
+
+ def test_binding_query_fanout(self):
+ """
+ Test that the binding_query method works as expected with fanout exchange
+ """
+ channel = self.channel
+ #setup
+ channel.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
+ channel.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
+ channel.queue_bind(exchange="amq.fanout", queue="used-queue")
+
+ # test detection of any binding to specific queue
+ response = channel.binding_query(exchange="amq.fanout", queue="used-queue")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.queue_not_matched)
+
+ # test unmatched queue, unspecified binding
+ response = channel.binding_query(exchange="amq.fanout", queue="unused-queue")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+
+ #test exchange not found
+ self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found)
+
+ #test queue not found
+ self.assertEqual(True, channel.binding_query(exchange="amq.fanout", queue="unknown-queue").queue_not_found)
+
+ def test_binding_query_header(self):
+ """
+ Test that the binding_query method works as expected with headers exchanges
+ """
+ channel = self.channel
+ #setup
+ channel.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
+ channel.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
+ channel.queue_bind(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"} )
+
+ # test detection of any binding to specific queue
+ response = channel.binding_query(exchange="amq.match", queue="used-queue")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.queue_not_matched)
+
+ # test detection of specific binding to any queue
+ response = channel.binding_query(exchange="amq.match", arguments={"x-match":"all", "a":"A"})
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.args_not_matched)
+
+ # test detection of specific binding to specific queue
+ response = channel.binding_query(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"})
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.queue_not_matched)
+ self.assertEqual(False, response.args_not_matched)
+
+ # test unmatched queue, unspecified binding
+ response = channel.binding_query(exchange="amq.match", queue="unused-queue")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+
+ # test unspecified queue, unmatched binding
+ response = channel.binding_query(exchange="amq.match", arguments={"x-match":"all", "b":"B"})
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.args_not_matched)
+
+ # test matched queue, unmatched binding
+ response = channel.binding_query(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "b":"B"})
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.queue_not_matched)
+ self.assertEqual(True, response.args_not_matched)
+
+ # test unmatched queue, matched binding
+ response = channel.binding_query(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "a":"A"})
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+ self.assertEqual(False, response.args_not_matched)
+
+ # test unmatched queue, unmatched binding
+ response = channel.binding_query(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "b":"B"})
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+ self.assertEqual(True, response.args_not_matched)
+
+ #test exchange not found
+ self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found)
+
+ #test queue not found
+ self.assertEqual(True, channel.binding_query(exchange="amq.match", queue="unknown-queue").queue_not_found)
+
diff --git a/python/tests_0-10_preview/queue.py b/python/tests_0-10_preview/queue.py
new file mode 100644
index 0000000000..7b3590d11b
--- /dev/null
+++ b/python/tests_0-10_preview/queue.py
@@ -0,0 +1,338 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+
+class QueueTests(TestBase):
+ """Tests for 'methods' on the amqp queue 'class'"""
+
+ def test_purge(self):
+ """
+ Test that the purge method removes messages from the queue
+ """
+ channel = self.channel
+ #setup, declare a queue and add some messages to it:
+ channel.exchange_declare(exchange="test-exchange", type="direct")
+ channel.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
+ channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
+ channel.message_transfer(destination="test-exchange", content=Content("one", properties={'routing_key':"key"}))
+ channel.message_transfer(destination="test-exchange", content=Content("two", properties={'routing_key':"key"}))
+ channel.message_transfer(destination="test-exchange", content=Content("three", properties={'routing_key':"key"}))
+
+ #check that the queue now reports 3 messages:
+ channel.queue_declare(queue="test-queue")
+ reply = channel.queue_query(queue="test-queue")
+ self.assertEqual(3, reply.message_count)
+
+ #now do the purge, then test that three messages are purged and the count drops to 0
+ channel.queue_purge(queue="test-queue");
+ reply = channel.queue_query(queue="test-queue")
+ self.assertEqual(0, reply.message_count)
+
+ #send a further message and consume it, ensuring that the other messages are really gone
+ channel.message_transfer(destination="test-exchange", content=Content("four", properties={'routing_key':"key"}))
+ self.subscribe(queue="test-queue", destination="tag")
+ queue = self.client.queue("tag")
+ msg = queue.get(timeout=1)
+ self.assertEqual("four", msg.content.body)
+
+ #check error conditions (use new channels):
+ channel = self.client.channel(2)
+ channel.session_open()
+ try:
+ #queue specified but doesn't exist:
+ channel.queue_purge(queue="invalid-queue")
+ self.fail("Expected failure when purging non-existent queue")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+ channel = self.client.channel(3)
+ channel.session_open()
+ try:
+ #queue not specified and none previously declared for channel:
+ channel.queue_purge()
+ self.fail("Expected failure when purging unspecified queue")
+ except Closed, e:
+ self.assertConnectionException(530, e.args[0])
+
+ #cleanup
+ other = self.connect()
+ channel = other.channel(1)
+ channel.session_open()
+ channel.exchange_delete(exchange="test-exchange")
+
+ def test_declare_exclusive(self):
+ """
+ Test that the exclusive field is honoured in queue.declare
+ """
+ # TestBase.setUp has already opened channel(1)
+ c1 = self.channel
+ # Here we open a second separate connection:
+ other = self.connect()
+ c2 = other.channel(1)
+ c2.session_open()
+
+ #declare an exclusive queue:
+ c1.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True)
+ try:
+ #other connection should not be allowed to declare this:
+ c2.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True)
+ self.fail("Expected second exclusive queue_declare to raise a channel exception")
+ except Closed, e:
+ self.assertChannelException(405, e.args[0])
+
+
+ def test_declare_passive(self):
+ """
+ Test that the passive field is honoured in queue.declare
+ """
+ channel = self.channel
+ #declare an exclusive queue:
+ channel.queue_declare(queue="passive-queue-1", exclusive=True, auto_delete=True)
+ channel.queue_declare(queue="passive-queue-1", passive=True)
+ try:
+ #other connection should not be allowed to declare this:
+ channel.queue_declare(queue="passive-queue-2", passive=True)
+ self.fail("Expected passive declaration of non-existant queue to raise a channel exception")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+
+ def test_bind(self):
+ """
+ Test various permutations of the queue.bind method
+ """
+ channel = self.channel
+ channel.queue_declare(queue="queue-1", exclusive=True, auto_delete=True)
+
+ #straightforward case, both exchange & queue exist so no errors expected:
+ channel.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1")
+
+ #use the queue name where the routing key is not specified:
+ channel.queue_bind(queue="queue-1", exchange="amq.direct")
+
+ #try and bind to non-existant exchange
+ try:
+ channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1")
+ self.fail("Expected bind to non-existant exchange to fail")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+ #need to reopen a channel:
+ channel = self.client.channel(2)
+ channel.session_open()
+
+ #try and bind non-existant queue:
+ try:
+ channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1")
+ self.fail("Expected bind of non-existant queue to fail")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+ def test_unbind_direct(self):
+ self.unbind_test(exchange="amq.direct", routing_key="key")
+
+ def test_unbind_topic(self):
+ self.unbind_test(exchange="amq.topic", routing_key="key")
+
+ def test_unbind_fanout(self):
+ self.unbind_test(exchange="amq.fanout")
+
+ def test_unbind_headers(self):
+ self.unbind_test(exchange="amq.match", args={ "x-match":"all", "a":"b"}, headers={"a":"b"})
+
+ def unbind_test(self, exchange, routing_key="", args=None, headers={}):
+ #bind two queues and consume from them
+ channel = self.channel
+
+ channel.queue_declare(queue="queue-1", exclusive=True, auto_delete=True)
+ channel.queue_declare(queue="queue-2", exclusive=True, auto_delete=True)
+
+ self.subscribe(queue="queue-1", destination="queue-1")
+ self.subscribe(queue="queue-2", destination="queue-2")
+
+ queue1 = self.client.queue("queue-1")
+ queue2 = self.client.queue("queue-2")
+
+ channel.queue_bind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
+ channel.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args)
+
+ #send a message that will match both bindings
+ channel.message_transfer(destination=exchange,
+ content=Content("one", properties={'routing_key':routing_key, 'application_headers':headers}))
+
+ #unbind first queue
+ channel.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
+
+ #send another message
+ channel.message_transfer(destination=exchange,
+ content=Content("two", properties={'routing_key':routing_key, 'application_headers':headers}))
+
+ #check one queue has both messages and the other has only one
+ self.assertEquals("one", queue1.get(timeout=1).content.body)
+ try:
+ msg = queue1.get(timeout=1)
+ self.fail("Got extra message: %s" % msg.content.body)
+ except Empty: pass
+
+ self.assertEquals("one", queue2.get(timeout=1).content.body)
+ self.assertEquals("two", queue2.get(timeout=1).content.body)
+ try:
+ msg = queue2.get(timeout=1)
+ self.fail("Got extra message: " + msg)
+ except Empty: pass
+
+
+ def test_delete_simple(self):
+ """
+ Test core queue deletion behaviour
+ """
+ channel = self.channel
+
+ #straight-forward case:
+ channel.queue_declare(queue="delete-me")
+ channel.message_transfer(content=Content("a", properties={'routing_key':"delete-me"}))
+ channel.message_transfer(content=Content("b", properties={'routing_key':"delete-me"}))
+ channel.message_transfer(content=Content("c", properties={'routing_key':"delete-me"}))
+ channel.queue_delete(queue="delete-me")
+ #check that it has gone be declaring passively
+ try:
+ channel.queue_declare(queue="delete-me", passive=True)
+ self.fail("Queue has not been deleted")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+ #check attempted deletion of non-existant queue is handled correctly:
+ channel = self.client.channel(2)
+ channel.session_open()
+ try:
+ channel.queue_delete(queue="i-dont-exist", if_empty=True)
+ self.fail("Expected delete of non-existant queue to fail")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+
+
+ def test_delete_ifempty(self):
+ """
+ Test that if_empty field of queue_delete is honoured
+ """
+ channel = self.channel
+
+ #create a queue and add a message to it (use default binding):
+ channel.queue_declare(queue="delete-me-2")
+ channel.queue_declare(queue="delete-me-2", passive=True)
+ channel.message_transfer(content=Content("message", properties={'routing_key':"delete-me-2"}))
+
+ #try to delete, but only if empty:
+ try:
+ channel.queue_delete(queue="delete-me-2", if_empty=True)
+ self.fail("Expected delete if_empty to fail for non-empty queue")
+ except Closed, e:
+ self.assertChannelException(406, e.args[0])
+
+ #need new channel now:
+ channel = self.client.channel(2)
+ channel.session_open()
+
+ #empty queue:
+ self.subscribe(channel, destination="consumer_tag", queue="delete-me-2")
+ queue = self.client.queue("consumer_tag")
+ msg = queue.get(timeout=1)
+ self.assertEqual("message", msg.content.body)
+ channel.message_cancel(destination="consumer_tag")
+
+ #retry deletion on empty queue:
+ channel.queue_delete(queue="delete-me-2", if_empty=True)
+
+ #check that it has gone by declaring passively:
+ try:
+ channel.queue_declare(queue="delete-me-2", passive=True)
+ self.fail("Queue has not been deleted")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+ def test_delete_ifunused(self):
+ """
+ Test that if_unused field of queue_delete is honoured
+ """
+ channel = self.channel
+
+ #create a queue and register a consumer:
+ channel.queue_declare(queue="delete-me-3")
+ channel.queue_declare(queue="delete-me-3", passive=True)
+ self.subscribe(destination="consumer_tag", queue="delete-me-3")
+
+ #need new channel now:
+ channel2 = self.client.channel(2)
+ channel2.session_open()
+ #try to delete, but only if empty:
+ try:
+ channel2.queue_delete(queue="delete-me-3", if_unused=True)
+ self.fail("Expected delete if_unused to fail for queue with existing consumer")
+ except Closed, e:
+ self.assertChannelException(406, e.args[0])
+
+
+ channel.message_cancel(destination="consumer_tag")
+ channel.queue_delete(queue="delete-me-3", if_unused=True)
+ #check that it has gone by declaring passively:
+ try:
+ channel.queue_declare(queue="delete-me-3", passive=True)
+ self.fail("Queue has not been deleted")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+
+ def test_autodelete_shared(self):
+ """
+ Test auto-deletion (of non-exclusive queues)
+ """
+ channel = self.channel
+ other = self.connect()
+ channel2 = other.channel(1)
+ channel2.session_open()
+
+ channel.queue_declare(queue="auto-delete-me", auto_delete=True)
+
+ #consume from both channels
+ reply = channel.basic_consume(queue="auto-delete-me")
+ channel2.basic_consume(queue="auto-delete-me")
+
+ #implicit cancel
+ channel2.session_close()
+
+ #check it is still there
+ channel.queue_declare(queue="auto-delete-me", passive=True)
+
+ #explicit cancel => queue is now unused again:
+ channel.basic_cancel(consumer_tag=reply.consumer_tag)
+
+ #NOTE: this assumes there is no timeout in use
+
+ #check that it has gone be declaring passively
+ try:
+ channel.queue_declare(queue="auto-delete-me", passive=True)
+ self.fail("Expected queue to have been deleted")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+
diff --git a/python/tests_0-10_preview/testlib.py b/python/tests_0-10_preview/testlib.py
new file mode 100644
index 0000000000..a0355c4ce0
--- /dev/null
+++ b/python/tests_0-10_preview/testlib.py
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Tests for the testlib itself.
+#
+
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+from Queue import Empty
+
+import sys
+from traceback import *
+
+def mytrace(frame, event, arg):
+ print_stack(frame);
+ print "===="
+ return mytrace
+
+class TestBaseTest(TestBase):
+ """Verify TestBase functions work as expected"""
+
+ def testAssertEmptyPass(self):
+ """Test assert empty works"""
+ self.queue_declare(queue="empty")
+ q = self.consume("empty")
+ self.assertEmpty(q)
+ try:
+ q.get(timeout=1)
+ self.fail("Queue is not empty.")
+ except Empty: None # Ignore
+
+ def testAssertEmptyFail(self):
+ self.queue_declare(queue="full")
+ q = self.consume("full")
+ self.channel.message_transfer(content=Content("", properties={'routing_key':"full"}))
+ try:
+ self.assertEmpty(q);
+ self.fail("assertEmpty did not assert on non-empty queue")
+ except AssertionError: None # Ignore
+
+ def testMessageProperties(self):
+ """Verify properties are passed with message"""
+ props={"x":1, "y":2}
+ self.queue_declare(queue="q")
+ q = self.consume("q")
+ self.assertPublishGet(q, routing_key="q", properties=props)
+
+
+
diff --git a/python/tests_0-10_preview/tx.py b/python/tests_0-10_preview/tx.py
new file mode 100644
index 0000000000..3fd1065af3
--- /dev/null
+++ b/python/tests_0-10_preview/tx.py
@@ -0,0 +1,231 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+
+class TxTests(TestBase):
+ """
+ Tests for 'methods' on the amqp tx 'class'
+ """
+
+ def test_commit(self):
+ """
+ Test that commited publishes are delivered and commited acks are not re-delivered
+ """
+ channel2 = self.client.channel(2)
+ channel2.session_open()
+ self.perform_txn_work(channel2, "tx-commit-a", "tx-commit-b", "tx-commit-c")
+ channel2.tx_commit()
+ channel2.session_close()
+
+ #use a different channel with new subscriptions to ensure
+ #there is no redelivery of acked messages:
+ channel = self.channel
+ channel.tx_select()
+
+ self.subscribe(channel, queue="tx-commit-a", destination="qa", confirm_mode=1)
+ queue_a = self.client.queue("qa")
+
+ self.subscribe(channel, queue="tx-commit-b", destination="qb", confirm_mode=1)
+ queue_b = self.client.queue("qb")
+
+ self.subscribe(channel, queue="tx-commit-c", destination="qc", confirm_mode=1)
+ queue_c = self.client.queue("qc")
+
+ #check results
+ for i in range(1, 5):
+ msg = queue_c.get(timeout=1)
+ self.assertEqual("TxMessage %d" % i, msg.content.body)
+ msg.complete()
+
+ msg = queue_b.get(timeout=1)
+ self.assertEqual("TxMessage 6", msg.content.body)
+ msg.complete()
+
+ msg = queue_a.get(timeout=1)
+ self.assertEqual("TxMessage 7", msg.content.body)
+ msg.complete()
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.content.body)
+ except Empty: None
+
+ #cleanup
+ channel.tx_commit()
+
+ def test_auto_rollback(self):
+ """
+ Test that a channel closed with an open transaction is effectively rolled back
+ """
+ channel2 = self.client.channel(2)
+ channel2.session_open()
+ queue_a, queue_b, queue_c = self.perform_txn_work(channel2, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c")
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.content.body)
+ except Empty: None
+
+ channel2.session_close()
+ channel = self.channel
+ channel.tx_select()
+
+ self.subscribe(channel, queue="tx-autorollback-a", destination="qa", confirm_mode=1)
+ queue_a = self.client.queue("qa")
+
+ self.subscribe(channel, queue="tx-autorollback-b", destination="qb", confirm_mode=1)
+ queue_b = self.client.queue("qb")
+
+ self.subscribe(channel, queue="tx-autorollback-c", destination="qc", confirm_mode=1)
+ queue_c = self.client.queue("qc")
+
+ #check results
+ for i in range(1, 5):
+ msg = queue_a.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+ msg.complete()
+
+ msg = queue_b.get(timeout=1)
+ self.assertEqual("Message 6", msg.content.body)
+ msg.complete()
+
+ msg = queue_c.get(timeout=1)
+ self.assertEqual("Message 7", msg.content.body)
+ msg.complete()
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.content.body)
+ except Empty: None
+
+ #cleanup
+ channel.tx_commit()
+
+ def test_rollback(self):
+ """
+ Test that rolled back publishes are not delivered and rolled back acks are re-delivered
+ """
+ channel = self.channel
+ queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c")
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.content.body)
+ except Empty: None
+
+ #stop subscriptions (ensures no delivery occurs during rollback as messages are requeued)
+ for d in ["sub_a", "sub_b", "sub_c"]:
+ channel.message_stop(destination=d)
+
+ channel.tx_rollback()
+
+ #restart susbcriptions
+ for d in ["sub_a", "sub_b", "sub_c"]:
+ channel.message_flow(destination=d, unit=0, value=0xFFFFFFFF)
+ channel.message_flow(destination=d, unit=1, value=0xFFFFFFFF)
+
+ #check results
+ for i in range(1, 5):
+ msg = queue_a.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+ msg.complete()
+
+ msg = queue_b.get(timeout=1)
+ self.assertEqual("Message 6", msg.content.body)
+ msg.complete()
+
+ msg = queue_c.get(timeout=1)
+ self.assertEqual("Message 7", msg.content.body)
+ msg.complete()
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.content.body)
+ except Empty: None
+
+ #cleanup
+ channel.tx_commit()
+
+ def perform_txn_work(self, channel, name_a, name_b, name_c):
+ """
+ Utility method that does some setup and some work under a transaction. Used for testing both
+ commit and rollback
+ """
+ #setup:
+ channel.queue_declare(queue=name_a, exclusive=True, auto_delete=True)
+ channel.queue_declare(queue=name_b, exclusive=True, auto_delete=True)
+ channel.queue_declare(queue=name_c, exclusive=True, auto_delete=True)
+
+ key = "my_key_" + name_b
+ topic = "my_topic_" + name_c
+
+ channel.queue_bind(queue=name_b, exchange="amq.direct", routing_key=key)
+ channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic)
+
+ for i in range(1, 5):
+ channel.message_transfer(content=Content(properties={'routing_key':name_a, 'message_id':"msg%d" % i}, body="Message %d" % i))
+
+ channel.message_transfer(destination="amq.direct",
+ content=Content(properties={'routing_key':key, 'message_id':"msg6"}, body="Message 6"))
+ channel.message_transfer(destination="amq.topic",
+ content=Content(properties={'routing_key':topic, 'message_id':"msg7"}, body="Message 7"))
+
+ channel.tx_select()
+
+ #consume and ack messages
+ self.subscribe(channel, queue=name_a, destination="sub_a", confirm_mode=1)
+ queue_a = self.client.queue("sub_a")
+ for i in range(1, 5):
+ msg = queue_a.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+
+ msg.complete()
+
+ self.subscribe(channel, queue=name_b, destination="sub_b", confirm_mode=1)
+ queue_b = self.client.queue("sub_b")
+ msg = queue_b.get(timeout=1)
+ self.assertEqual("Message 6", msg.content.body)
+ msg.complete()
+
+ sub_c = self.subscribe(channel, queue=name_c, destination="sub_c", confirm_mode=1)
+ queue_c = self.client.queue("sub_c")
+ msg = queue_c.get(timeout=1)
+ self.assertEqual("Message 7", msg.content.body)
+ msg.complete()
+
+ #publish messages
+ for i in range(1, 5):
+ channel.message_transfer(destination="amq.topic",
+ content=Content(properties={'routing_key':topic, 'message_id':"tx-msg%d" % i},
+ body="TxMessage %d" % i))
+
+ channel.message_transfer(destination="amq.direct",
+ content=Content(properties={'routing_key':key, 'message_id':"tx-msg6"},
+ body="TxMessage 6"))
+ channel.message_transfer(content=Content(properties={'routing_key':name_a, 'message_id':"tx-msg7"},
+ body="TxMessage 7"))
+ return queue_a, queue_b, queue_c