diff options
30 files changed, 3585 insertions, 206 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h index 6e92f89706..3314ec6be3 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.h +++ b/cpp/src/qpid/broker/BrokerAdapter.h @@ -83,6 +83,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations Exchange010Handler* getExchange010Handler() { throw framing::NotImplementedException("Class not implemented"); } Queue010Handler* getQueue010Handler() { throw framing::NotImplementedException("Class not implemented"); } Message010Handler* getMessage010Handler() { throw framing::NotImplementedException("Class not implemented"); } + Execution010Handler* getExecution010Handler() { throw framing::NotImplementedException("Class not implemented"); } // Handlers no longer implemented in BrokerAdapter: #define BADHANDLER() assert(0); throw framing::NotImplementedException("") diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index 15c29ed482..5ab0dd84a9 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -37,7 +37,8 @@ SessionAdapter::SessionAdapter(SemanticState& s) : HandlerImpl(s), exchangeImpl(s), queueImpl(s), - messageImpl(s) + messageImpl(s), + executionImpl(s) {} @@ -377,6 +378,28 @@ void SessionAdapter::MessageHandlerImpl::acquire(const SequenceSet& transfers) } */ + +void SessionAdapter::ExecutionHandlerImpl::sync() +{ + //TODO +} + +void SessionAdapter::ExecutionHandlerImpl::result(uint32_t /*commandId*/, const string& /*value*/) +{ + //TODO +} + +void SessionAdapter::ExecutionHandlerImpl::exception(uint16_t /*errorCode*/, + uint32_t /*commandId*/, + uint8_t /*classCode*/, + uint8_t /*commandCode*/, + uint8_t /*fieldIndex*/, + const std::string& /*description*/, + const framing::FieldTable& /*errorInfo*/) +{ + //TODO +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h index 0dd3529359..c2d61392d7 100644 --- a/cpp/src/qpid/broker/SessionAdapter.h +++ b/cpp/src/qpid/broker/SessionAdapter.h @@ -55,6 +55,7 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations Message010Handler* getMessage010Handler(){ return &messageImpl; } Exchange010Handler* getExchange010Handler(){ return &exchangeImpl; } Queue010Handler* getQueue010Handler(){ return &queueImpl; } + Execution010Handler* getExecution010Handler(){ return &executionImpl; } BasicHandler* getBasicHandler() { throw framing::NotImplementedException("Class not implemented"); } @@ -172,9 +173,27 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations }; + class ExecutionHandlerImpl : public Execution010Handler, public HandlerImpl + { + public: + ExecutionHandlerImpl(SemanticState& session) : HandlerImpl(session) {} + + void sync(); + void result(uint32_t commandId, const string& value); + void exception(uint16_t errorCode, + uint32_t commandId, + uint8_t classCode, + uint8_t commandCode, + uint8_t fieldIndex, + const std::string& description, + const framing::FieldTable& errorInfo); + + }; + ExchangeHandlerImpl exchangeImpl; QueueHandlerImpl queueImpl; MessageHandlerImpl messageImpl; + ExecutionHandlerImpl executionImpl; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index 919a3e6ee8..3baa3a89a7 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -69,12 +69,6 @@ void SessionHandler::handleIn(AMQFrame& f) { QPID_MSG("Channel " << channel.get() << " is not open")); } } - } catch(const ChannelException& e) { - ignoring=true; // Ignore trailing frames sent by client. - session->detach(); - session.reset(); - //TODO: implement new exception handling mechanism - //peerSession.closed(e.code, e.what()); }catch(const ConnectionException& e){ connection.close(e.code, e.what(), classId(m), methodId(m)); }catch(const std::exception& e){ @@ -83,6 +77,12 @@ void SessionHandler::handleIn(AMQFrame& f) { } } +void SessionHandler::destroy() { + ignoring=true; // Ignore trailing frames sent by client. + session->detach(); + session.reset(); +} + void SessionHandler::handleOut(AMQFrame& f) { channel.handle(f); // Send it. if (session->sent(f)) diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h index 4b031f2951..fa013a1c15 100644 --- a/cpp/src/qpid/broker/SessionHandler.h +++ b/cpp/src/qpid/broker/SessionHandler.h @@ -70,6 +70,7 @@ class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler, void localSuspend(); void detach() { localSuspend(); } void sendCompletion(); + void destroy(); protected: void handleIn(framing::AMQFrame&); diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 571b8848ae..ceaa70db18 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -168,16 +168,16 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, return status; } -void SessionState::handleCommand(framing::AMQMethodBody* method) +void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber& id) { - SequenceNumber id = nextIn++; + id = nextIn++; Invoker::Result invocation = invoke(adapter, *method); completed.add(id); if (!invocation.wasHandled()) { throw NotImplementedException("Not implemented"); } else if (invocation.hasResult()) { - getProxy().getExecution().result(id.getValue(), invocation.getResult()); + getProxy().getExecution010().result(id, invocation.getResult()); } if (method->isSync()) { sendCompletion(); @@ -185,16 +185,18 @@ void SessionState::handleCommand(framing::AMQMethodBody* method) //TODO: if window gets too large send unsolicited completion } -void SessionState::handleContent(AMQFrame& frame) +void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id) { intrusive_ptr<Message> msg(msgBuilder.getMessage()); - if (!msg) {//start of frameset will be indicated by frame flags - SequenceNumber id = nextIn++; + if (frame.getBof() && frame.getBos()) {//start of frameset + id = nextIn++; msgBuilder.start(id); msg = msgBuilder.getMessage(); + } else { + id = msg->getCommandId(); } msgBuilder.handle(frame); - if (frame.getEof() && frame.getEos()) {//end of frameset will be indicated by frame flags + if (frame.getEof() && frame.getEos()) {//end of frameset msg->setPublisher(&getConnection()); semanticState.handle(msg); msgBuilder.end(); @@ -210,16 +212,27 @@ void SessionState::handle(AMQFrame& frame) { received(frame); - //TODO: make command handling more uniform, regardless of whether - //commands carry content. (For now, assume all single frame - //assmblies are non-content bearing and all content-bearing - //assmeblies will have more than one frame): - if (frame.getBof() && frame.getEof()) { - handleCommand(frame.getMethod()); - } else { - handleContent(frame); + SequenceNumber commandId; + try { + //TODO: make command handling more uniform, regardless of whether + //commands carry content. (For now, assume all single frame + //assemblies are non-content bearing and all content-bearing + //assemblies will have more than one frame): + if (frame.getBof() && frame.getEof()) { + handleCommand(frame.getMethod(), commandId); + } else { + handleContent(frame, commandId); + } + } catch(const ChannelException& e) { + //TODO: better implementation of new exception handling mechanism + AMQMethodBody* m = frame.getMethod(); + if (m) { + getProxy().getExecution010().exception(e.code, commandId, m->amqpClassId(), m->amqpMethodId(), 0, e.what(), FieldTable()); + } else { + getProxy().getExecution010().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable()); + } + handler->destroy(); } - } DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index c936edee21..ecf0b41a7a 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -93,8 +93,6 @@ class SessionState : public framing::SessionState, void activateOutput(); void handle(framing::AMQFrame& frame); - void handleCommand(framing::AMQMethodBody* method); - void handleContent(framing::AMQFrame& frame); void complete(const framing::SequenceSet& ranges); void sendCompletion(); @@ -138,8 +136,10 @@ class SessionState : public framing::SessionState, RangedOperation ackOp; management::Session::shared_ptr mgmtObject; + void handleCommand(framing::AMQMethodBody* method, framing::SequenceNumber& id); + void handleContent(framing::AMQFrame& frame, framing::SequenceNumber& id); - friend class SessionManager; + friend class SessionManager; }; diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp index 3750f4a9a8..eeb658600d 100644 --- a/cpp/src/qpid/framing/AMQFrame.cpp +++ b/cpp/src/qpid/framing/AMQFrame.cpp @@ -49,12 +49,15 @@ uint32_t AMQFrame::frameOverhead() { void AMQFrame::encode(Buffer& buffer) const { + //set track first (controls on track 0, everything else on 1): + uint8_t track = getBody()->type() ? 1 : 0; + uint8_t flags = (bof ? 0x08 : 0) | (eof ? 0x04 : 0) | (bos ? 0x02 : 0) | (eos ? 0x01 : 0); buffer.putOctet(flags); buffer.putOctet(getBody()->type()); buffer.putShort(size() - 1); // Don't include end marker (it's not part of the frame itself) buffer.putOctet(0); - buffer.putOctet(0x0f & subchannel); + buffer.putOctet(0x0f & track); buffer.putShort(channel); buffer.putLong(0); body->encode(buffer); diff --git a/cpp/src/tests/python_tests b/cpp/src/tests/python_tests index d9754ed0fb..73b3b970c1 100755 --- a/cpp/src/tests/python_tests +++ b/cpp/src/tests/python_tests @@ -1,7 +1,7 @@ #!/bin/sh # Run the python tests. if test -d ../../../python ; then - cd ../../../python && ./run-tests -v -s ../specs/amqp.0-10-preview.xml -I cpp_failing_0-10.txt -b localhost:$QPID_PORT $PYTHON_TESTS + cd ../../../python && ./run-tests -v -s ../specs/amqp.0-10-preview.xml -I cpp_failing_0-10_preview.txt -b localhost:$QPID_PORT $PYTHON_TESTS && ./run-tests --skip-self-test -v -s "0-10" -I cpp_failing_0-10.txt -b localhost:$QPID_PORT $PYTHON_TESTS else echo Warning: python tests not found. fi diff --git a/cpp/xml/extra.xml b/cpp/xml/extra.xml index 789887ae81..b838a466c4 100644 --- a/cpp/xml/extra.xml +++ b/cpp/xml/extra.xml @@ -582,6 +582,33 @@ </class> +<class name="execution010" index="3"> + <method name = "sync" index="1"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <chassis name="client" implement="MUST" /> + </method> + <method name = "result" index="2"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <chassis name="client" implement="MUST" /> + <field name="command-id" domain="command-id"/> + <field name="value" domain="long-struct"/> + </method> + <method name = "exception" index="3"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <chassis name="client" implement="MUST" /> + <field name="error-code" domain="short"/> + <field name="command-id" domain="long"/> + <field name="class-code" domain="octet"/> + <field name="command-code" domain="octet"/> + <field name="field-index" domain="octet"/> + <field name="description" domain="mediumstr"/> + <field name="error-info" domain="table"/> + </method> +</class> + <class name="message010" index="4"> <doc>blah, blah</doc> <method name = "transfer" index="1"> @@ -708,7 +735,7 @@ <field name="binding-key" domain="shortstr"/> <field name="arguments" domain="table"/> <result> - <struct size="long" type="1"> + <struct size="long" type="2"> <field name="exchange-not-found" domain="bit"/> <field name="queue-not-found" domain="bit"/> <field name="queue-not-matched" domain="bit"/> diff --git a/python/cpp_failing_0-10.txt b/python/cpp_failing_0-10.txt index 77031cad08..974c165ae9 100644 --- a/python/cpp_failing_0-10.txt +++ b/python/cpp_failing_0-10.txt @@ -1,6 +1,97 @@ tests.codec.FieldTableTestCase.test_field_table_decode tests.codec.FieldTableTestCase.test_field_table_multiple_name_value_pair tests.codec.FieldTableTestCase.test_field_table_name_value_pair -tests_0-10.alternate_exchange.AlternateExchangeTests.test_immediate +tests_0-10.query.QueryTests.test_exchange_bound_header +tests_0-10.persistence.PersistenceTests.test_ack_message_from_deleted_queue +tests_0-10.persistence.PersistenceTests.test_delete_queue_after_publish +tests_0-10.persistence.PersistenceTests.test_queue_deletion +tests_0-10.tx.TxTests.test_auto_rollback +tests_0-10.tx.TxTests.test_commit +tests_0-10.tx.TxTests.test_rollback +tests_0-10.execution.ExecutionTests.test_flush +tests_0-10.alternate_exchange.AlternateExchangeTests.test_delete_while_used_by_exchange +tests_0-10.alternate_exchange.AlternateExchangeTests.test_delete_while_used_by_queue +tests_0-10.alternate_exchange.AlternateExchangeTests.test_queue_delete +tests_0-10.alternate_exchange.AlternateExchangeTests.test_unroutable +tests_0-10.exchange.DeclareMethodPassiveFieldNotFoundRuleTests.test +tests_0-10.exchange.DefaultExchangeRuleTests.testDefaultExchange +tests_0-10.exchange.ExchangeTests.testHeadersBindNoMatchArg +tests_0-10.exchange.HeadersExchangeTests.testMatchAll +tests_0-10.exchange.HeadersExchangeTests.testMatchAny +tests_0-10.exchange.MiscellaneousErrorsTests.testDifferentDeclaredType +tests_0-10.exchange.MiscellaneousErrorsTests.testTypeNotKnown +tests_0-10.exchange.RecommendedTypesRuleTests.testDirect +tests_0-10.exchange.RecommendedTypesRuleTests.testFanout +tests_0-10.exchange.RecommendedTypesRuleTests.testHeaders +tests_0-10.exchange.RecommendedTypesRuleTests.testTopic +tests_0-10.exchange.RequiredInstancesRuleTests.testAmqDirect +tests_0-10.exchange.RequiredInstancesRuleTests.testAmqFanOut +tests_0-10.exchange.RequiredInstancesRuleTests.testAmqMatch +tests_0-10.exchange.RequiredInstancesRuleTests.testAmqTopic +tests_0-10.dtx.DtxTests.test_bad_resume +tests_0-10.dtx.DtxTests.test_end +tests_0-10.dtx.DtxTests.test_end_suspend_and_fail +tests_0-10.dtx.DtxTests.test_end_unknown_xid +tests_0-10.dtx.DtxTests.test_forget_xid_on_completion +tests_0-10.dtx.DtxTests.test_get_timeout +tests_0-10.dtx.DtxTests.test_implicit_end +tests_0-10.dtx.DtxTests.test_invalid_commit_one_phase_false +tests_0-10.dtx.DtxTests.test_invalid_commit_one_phase_true +tests_0-10.dtx.DtxTests.test_recover +tests_0-10.dtx.DtxTests.test_select_required +tests_0-10.dtx.DtxTests.test_set_timeout +tests_0-10.dtx.DtxTests.test_simple_commit +tests_0-10.dtx.DtxTests.test_simple_prepare_commit +tests_0-10.dtx.DtxTests.test_simple_prepare_rollback +tests_0-10.dtx.DtxTests.test_simple_rollback +tests_0-10.dtx.DtxTests.test_start_already_known +tests_0-10.dtx.DtxTests.test_start_join +tests_0-10.dtx.DtxTests.test_start_join_and_resume +tests_0-10.dtx.DtxTests.test_suspend_resume +tests_0-10.dtx.DtxTests.test_suspend_start_end_resume +tests_0-10.message.MessageTests.test_ack +tests_0-10.message.MessageTests.test_acquire +tests_0-10.message.MessageTests.test_cancel +tests_0-10.message.MessageTests.test_consume_exclusive +tests_0-10.message.MessageTests.test_consume_no_local +tests_0-10.message.MessageTests.test_consume_no_local_awkward +tests_0-10.message.MessageTests.test_consume_queue_errors +tests_0-10.message.MessageTests.test_consume_unique_consumers +tests_0-10.message.MessageTests.test_credit_flow_bytes +tests_0-10.message.MessageTests.test_credit_flow_messages +tests_0-10.message.MessageTests.test_no_size +tests_0-10.message.MessageTests.test_qos_prefetch_count +tests_0-10.message.MessageTests.test_qos_prefetch_size +tests_0-10.message.MessageTests.test_ranged_ack +tests_0-10.message.MessageTests.test_recover +tests_0-10.message.MessageTests.test_recover_requeue +tests_0-10.message.MessageTests.test_reject +tests_0-10.message.MessageTests.test_release +tests_0-10.message.MessageTests.test_release_ordering +tests_0-10.message.MessageTests.test_release_unacquired +tests_0-10.message.MessageTests.test_subscribe_not_acquired +tests_0-10.message.MessageTests.test_subscribe_not_acquired_2 +tests_0-10.message.MessageTests.test_subscribe_not_acquired_3 +tests_0-10.message.MessageTests.test_window_flow_bytes +tests_0-10.message.MessageTests.test_window_flow_messages +tests_0-10.testlib.TestBaseTest.testAssertEmptyFail +tests_0-10.testlib.TestBaseTest.testAssertEmptyPass +tests_0-10.testlib.TestBaseTest.testMessageProperties +tests_0-10.queue.QueueTests.test_autodelete_shared +tests_0-10.queue.QueueTests.test_bind +tests_0-10.queue.QueueTests.test_declare_exclusive +tests_0-10.queue.QueueTests.test_declare_passive +tests_0-10.queue.QueueTests.test_delete_ifempty +tests_0-10.queue.QueueTests.test_delete_ifunused +tests_0-10.queue.QueueTests.test_delete_simple +tests_0-10.queue.QueueTests.test_purge +tests_0-10.queue.QueueTests.test_unbind_direct +tests_0-10.queue.QueueTests.test_unbind_fanout +tests_0-10.queue.QueueTests.test_unbind_headers +tests_0-10.queue.QueueTests.test_unbind_topic tests_0-10.broker.BrokerTests.test_closed_channel - +tests_0-10.broker.BrokerTests.test_ack_and_no_ack +tests_0-10.broker.BrokerTests.test_invalid_channel +tests_0-10.broker.BrokerTests.test_simple_delivery_queued +tests_0-10.broker.BrokerTests.test_simple_delivery_immediate +tests_0-10.example.ExampleTest.test_example diff --git a/python/cpp_failing_0-10_preview.txt b/python/cpp_failing_0-10_preview.txt new file mode 100644 index 0000000000..91c2a7fce4 --- /dev/null +++ b/python/cpp_failing_0-10_preview.txt @@ -0,0 +1,6 @@ +tests.codec.FieldTableTestCase.test_field_table_decode +tests.codec.FieldTableTestCase.test_field_table_multiple_name_value_pair +tests.codec.FieldTableTestCase.test_field_table_name_value_pair +tests_0-10_preview.alternate_exchange.AlternateExchangeTests.test_immediate +tests_0-10_preview.broker.BrokerTests.test_closed_channel + diff --git a/python/qpid/codec010.py b/python/qpid/codec010.py index 5894981fc6..2dcba4e917 100644 --- a/python/qpid/codec010.py +++ b/python/qpid/codec010.py @@ -111,6 +111,11 @@ class Codec(Packer): def write_str8(self, s): self.write_vbin8(s.encode("utf8")) + def read_str16(self): + return self.read_vbin16().decode("utf8") + def write_str16(self, s): + self.write_vbin16(s.encode("utf8")) + def read_vbin16(self): return self.read(self.read_uint16()) @@ -125,9 +130,10 @@ class Codec(Packer): self.write(b) def write_map(self, m): - pass + self.write_uint32(0) #hack def read_map(self): - pass + size = self.read_uint32() #hack + self.read(size) #hack def write_array(self, a): pass diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py index 5174fe10f4..e8e54b3a56 100644 --- a/python/qpid/testlib.py +++ b/python/qpid/testlib.py @@ -29,6 +29,11 @@ from getopt import getopt, GetoptError from qpid.content import Content from qpid.message import Message +#0-10 support +from qpid.connection010 import Connection +from qpid.spec010 import load +from qpid.util import connect + def findmodules(root): """Find potential python modules under directory root""" found = [] @@ -125,23 +130,29 @@ Options: if opt in ("-S", "--skip-self-test"): self.skip_self_test = True if opt in ("-F", "--spec-folder"): TestRunner.SPEC_FOLDER = value # Abbreviations for default settings. - if (self.specfile == "0-8"): - self.specfile = self.get_spec_file("amqp.0-8.xml") - elif (self.specfile == "0-9"): - self.specfile = self.get_spec_file("amqp.0-9.xml") - self.errata.append(self.get_spec_file("amqp-errata.0-9.xml")) - - if (self.specfile == None): - self._die("No XML specification provided") - print "Using specification from:", self.specfile - self.spec = qpid.spec.load(self.specfile, *self.errata) + if (self.specfile == "0-10"): + self.spec = load(self.get_spec_file("amqp.0-10.xml")) + else: + if (self.specfile == "0-8"): + self.specfile = self.get_spec_file("amqp.0-8.xml") + elif (self.specfile == "0-9"): + self.specfile = self.get_spec_file("amqp.0-9.xml") + self.errata.append(self.get_spec_file("amqp-errata.0-9.xml")) + + if (self.specfile == None): + self._die("No XML specification provided") + print "Using specification from:", self.specfile + + self.spec = qpid.spec.load(self.specfile, *self.errata) if len(self.tests) == 0: if not self.skip_self_test: self.tests=findmodules("tests") if self.use08spec(): self.tests+=findmodules("tests_0-8") - elif (self.spec.major == 0 and self.spec.minor == 10) or (self.spec.major == 99 and self.spec.minor == 0): + elif (self.spec.major == 99 and self.spec.minor == 0): + self.tests+=findmodules("tests_0-10_preview") + elif (self.spec.major == 0 and self.spec.minor == 10): self.tests+=findmodules("tests_0-10") else: self.tests+=findmodules("tests_0-9") @@ -330,3 +341,17 @@ class TestBase(unittest.TestCase): self.assertEqual("close", message.method.name) self.assertEqual(expectedCode, message.reply_code) +class TestBase010(unittest.TestCase): + """ + Base class for Qpid test cases. using the final 0-10 spec + """ + + def setUp(self): + spec = testrunner.spec + self.conn = Connection(connect(testrunner.host, testrunner.port), spec) + self.conn.start(timeout=10) + self.session = self.conn.session("test-session", timeout=10) + + def tearDown(self): + self.session.close(timeout=10) + self.conn.close(timeout=10) diff --git a/python/tests_0-10/alternate_exchange.py b/python/tests_0-10/alternate_exchange.py index 83f8d85811..225b3cfb69 100644 --- a/python/tests_0-10/alternate_exchange.py +++ b/python/tests_0-10/alternate_exchange.py @@ -93,34 +93,6 @@ class AlternateExchangeTests(TestBase): self.assertEqual("Three", dlq.get(timeout=1).content.body) self.assertEmpty(dlq) - - def test_immediate(self): - """ - Test that messages in a queue being deleted are delivered to the alternate-exchange if specified - """ - channel = self.channel - #set up a 'dead letter queue': - channel.exchange_declare(exchange="dlq", type="fanout") - channel.queue_declare(queue="immediate", exclusive=True, auto_delete=True) - channel.queue_bind(exchange="dlq", queue="immediate") - self.subscribe(destination="dlq", queue="immediate") - dlq = self.client.queue("dlq") - - #create a queue using the dlq as its alternate exchange: - channel.queue_declare(queue="no-consumers", alternate_exchange="dlq", exclusive=True, auto_delete=True) - #send it some messages: - #TODO: WE HAVE LOST THE IMMEDIATE FLAG; FIX THIS ONCE ITS BACK - channel.message_transfer(content=Content("no one wants me", properties={'routing_key':"no-consumers"})) - - #check the messages were delivered to the dlq: - self.assertEqual("no one wants me", dlq.get(timeout=1).content.body) - self.assertEmpty(dlq) - - #cleanup: - channel.queue_delete(queue="no-consumers") - channel.exchange_delete(exchange="dlq") - - def test_delete_while_used_by_queue(self): """ Ensure an exchange still in use as an alternate-exchange for a diff --git a/python/tests_0-10/broker.py b/python/tests_0-10/broker.py index 99936ba742..bfecb5c166 100644 --- a/python/tests_0-10/broker.py +++ b/python/tests_0-10/broker.py @@ -19,9 +19,10 @@ from qpid.client import Closed from qpid.queue import Empty from qpid.content import Content -from qpid.testlib import testrunner, TestBase +from qpid.testlib import TestBase010 +from qpid.datatypes import Message -class BrokerTests(TestBase): +class BrokerTests(TestBase010): """Tests for basic Broker functionality""" def test_ack_and_no_ack(self): @@ -57,37 +58,36 @@ class BrokerTests(TestBase): """ Test simple message delivery where consume is issued before publish """ - channel = self.channel - self.exchange_declare(channel, exchange="test-exchange", type="direct") - self.queue_declare(channel, queue="test-queue") - channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") + session = self.session + session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) + session.exchange_bind(queue="test-queue", exchange="amq.fanout") consumer_tag = "tag1" - self.subscribe(queue="test-queue", destination=consumer_tag) - queue = self.client.queue(consumer_tag) + session.message_subscribe(queue="test-queue", destination=consumer_tag) + session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = consumer_tag) + session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = consumer_tag) + queue = session.incoming(consumer_tag) body = "Immediate Delivery" - channel.message_transfer(destination="test-exchange", content = Content(body, properties = {"routing_key" : "key"})) + session.message_transfer("amq.fanout", None, None, Message(body)) msg = queue.get(timeout=5) self.assert_(msg.content.body == body) - # TODO: Ensure we fail if immediate=True and there's no consumer. - - def test_simple_delivery_queued(self): """ Test basic message delivery where publish is issued before consume (i.e. requires queueing of the message) """ - channel = self.channel - self.exchange_declare(channel, exchange="test-exchange", type="direct") - self.queue_declare(channel, queue="test-queue") - channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") + session = self.session + session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) + session.exchange_bind(queue="test-queue", exchange="amq.fanout") body = "Queued Delivery" - channel.message_transfer(destination="test-exchange", content = Content(body, properties = {"routing_key" : "key"})) + session.message_transfer("amq.fanout", None, None, Message(body)) consumer_tag = "tag1" - self.subscribe(queue="test-queue", destination=consumer_tag) - queue = self.client.queue(consumer_tag) + session.message_subscribe(queue="test-queue", destination=consumer_tag) + session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = consumer_tag) + session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = consumer_tag) + queue = session.incoming(consumer_tag) msg = queue.get(timeout=5) self.assert_(msg.content.body == body) diff --git a/python/tests_0-10/query.py b/python/tests_0-10/query.py index eba2ee6dd1..0bbfb079cc 100644 --- a/python/tests_0-10/query.py +++ b/python/tests_0-10/query.py @@ -19,209 +19,212 @@ from qpid.client import Client, Closed from qpid.queue import Empty from qpid.content import Content -from qpid.testlib import testrunner, TestBase +from qpid.testlib import TestBase010 -class QueryTests(TestBase): - """Tests for various query methods introduced in 0-10 and available in 0-9 for preview""" +class QueryTests(TestBase010): + """Tests for various query methods""" + + def test_queue_query(self): + session = self.session + session.queue_declare(queue="my-queue", exclusive=True) + result = session.queue_query(queue="my-queue") + self.assertEqual("my-queue", result.queue) def test_exchange_query(self): """ Test that the exchange_query method works as expected """ - channel = self.channel + session = self.session #check returned type for the standard exchanges - self.assert_type("direct", channel.exchange_query(name="amq.direct")) - self.assert_type("topic", channel.exchange_query(name="amq.topic")) - self.assert_type("fanout", channel.exchange_query(name="amq.fanout")) - self.assert_type("headers", channel.exchange_query(name="amq.match")) - self.assert_type("direct", channel.exchange_query(name="")) + self.assertEqual("direct", session.exchange_query(name="amq.direct").type) + self.assertEqual("topic", session.exchange_query(name="amq.topic").type) + self.assertEqual("fanout", session.exchange_query(name="amq.fanout").type) + self.assertEqual("headers", session.exchange_query(name="amq.match").type) + self.assertEqual("direct", session.exchange_query(name="").type) #declare an exchange - channel.exchange_declare(exchange="my-test-exchange", type= "direct", durable=False) + session.exchange_declare(exchange="my-test-exchange", type= "direct", durable=False) #check that the result of a query is as expected - response = channel.exchange_query(name="my-test-exchange") - self.assert_type("direct", response) - self.assertEqual(False, response.durable) - self.assertEqual(False, response.not_found) + response = session.exchange_query(name="my-test-exchange") + self.assertEqual("direct", response.type) + self.assert_(not response.durable) + self.assert_(not response.not_found) #delete the exchange - channel.exchange_delete(exchange="my-test-exchange") + session.exchange_delete(exchange="my-test-exchange") #check that the query now reports not-found - self.assertEqual(True, channel.exchange_query(name="my-test-exchange").not_found) - - def assert_type(self, expected_type, response): - self.assertEqual(expected_type, response.__getattr__("type")) + self.assert_(session.exchange_query(name="my-test-exchange").not_found) - def test_binding_query_direct(self): + def test_exchange_bound_direct(self): """ - Test that the binding_query method works as expected with the direct exchange + Test that the exchange_bound method works as expected with the direct exchange """ - self.binding_query_with_key("amq.direct") + self.exchange_bound_with_key("amq.direct") - def test_binding_query_topic(self): + def test_exchange_bound_topic(self): """ - Test that the binding_query method works as expected with the direct exchange + Test that the exchange_bound method works as expected with the direct exchange """ - self.binding_query_with_key("amq.topic") + self.exchange_bound_with_key("amq.topic") - def binding_query_with_key(self, exchange_name): - channel = self.channel + def exchange_bound_with_key(self, exchange_name): + session = self.session #setup: create two queues - channel.queue_declare(queue="used-queue", exclusive=True, auto_delete=True) - channel.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True) + session.queue_declare(queue="used-queue", exclusive=True, auto_delete=True) + session.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True) - channel.queue_bind(exchange=exchange_name, queue="used-queue", routing_key="used-key") + session.exchange_bind(exchange=exchange_name, queue="used-queue", binding_key="used-key") # test detection of any binding to specific queue - response = channel.binding_query(exchange=exchange_name, queue="used-queue") - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - self.assertEqual(False, response.queue_not_matched) + response = session.exchange_bound(exchange=exchange_name, queue="used-queue") + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assert_(not response.queue_not_matched) # test detection of specific binding to any queue - response = channel.binding_query(exchange=exchange_name, routing_key="used-key") - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - self.assertEqual(False, response.key_not_matched) + response = session.exchange_bound(exchange=exchange_name, binding_key="used-key") + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assert_(not response.key_not_matched) # test detection of specific binding to specific queue - response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="used-key") - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - self.assertEqual(False, response.queue_not_matched) - self.assertEqual(False, response.key_not_matched) + response = session.exchange_bound(exchange=exchange_name, queue="used-queue", binding_key="used-key") + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assert_(not response.queue_not_matched) + self.assert_(not response.key_not_matched) # test unmatched queue, unspecified binding - response = channel.binding_query(exchange=exchange_name, queue="unused-queue") - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) + response = session.exchange_bound(exchange=exchange_name, queue="unused-queue") + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) self.assertEqual(True, response.queue_not_matched) # test unspecified queue, unmatched binding - response = channel.binding_query(exchange=exchange_name, routing_key="unused-key") - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) + response = session.exchange_bound(exchange=exchange_name, binding_key="unused-key") + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) self.assertEqual(True, response.key_not_matched) # test matched queue, unmatched binding - response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="unused-key") - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - self.assertEqual(False, response.queue_not_matched) + response = session.exchange_bound(exchange=exchange_name, queue="used-queue", binding_key="unused-key") + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assert_(not response.queue_not_matched) self.assertEqual(True, response.key_not_matched) # test unmatched queue, matched binding - response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="used-key") - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) + response = session.exchange_bound(exchange=exchange_name, queue="unused-queue", binding_key="used-key") + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) self.assertEqual(True, response.queue_not_matched) - self.assertEqual(False, response.key_not_matched) + self.assert_(not response.key_not_matched) # test unmatched queue, unmatched binding - response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="unused-key") - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) + response = session.exchange_bound(exchange=exchange_name, queue="unused-queue", binding_key="unused-key") + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) self.assertEqual(True, response.queue_not_matched) self.assertEqual(True, response.key_not_matched) #test exchange not found - self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found) + self.assertEqual(True, session.exchange_bound(exchange="unknown-exchange").exchange_not_found) #test queue not found - self.assertEqual(True, channel.binding_query(exchange=exchange_name, queue="unknown-queue").queue_not_found) + self.assertEqual(True, session.exchange_bound(exchange=exchange_name, queue="unknown-queue").queue_not_found) - def test_binding_query_fanout(self): + def test_exchange_bound_fanout(self): """ - Test that the binding_query method works as expected with fanout exchange + Test that the exchange_bound method works as expected with fanout exchange """ - channel = self.channel + session = self.session #setup - channel.queue_declare(queue="used-queue", exclusive=True, auto_delete=True) - channel.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True) - channel.queue_bind(exchange="amq.fanout", queue="used-queue") + session.queue_declare(queue="used-queue", exclusive=True, auto_delete=True) + session.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True) + session.exchange_bind(exchange="amq.fanout", queue="used-queue") # test detection of any binding to specific queue - response = channel.binding_query(exchange="amq.fanout", queue="used-queue") - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - self.assertEqual(False, response.queue_not_matched) + response = session.exchange_bound(exchange="amq.fanout", queue="used-queue") + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assert_(not response.queue_not_matched) # test unmatched queue, unspecified binding - response = channel.binding_query(exchange="amq.fanout", queue="unused-queue") - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) + response = session.exchange_bound(exchange="amq.fanout", queue="unused-queue") + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) self.assertEqual(True, response.queue_not_matched) #test exchange not found - self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found) + self.assertEqual(True, session.exchange_bound(exchange="unknown-exchange").exchange_not_found) #test queue not found - self.assertEqual(True, channel.binding_query(exchange="amq.fanout", queue="unknown-queue").queue_not_found) + self.assertEqual(True, session.exchange_bound(exchange="amq.fanout", queue="unknown-queue").queue_not_found) - def test_binding_query_header(self): + def test_exchange_bound_header(self): """ - Test that the binding_query method works as expected with headers exchanges + Test that the exchange_bound method works as expected with headers exchanges """ - channel = self.channel + session = self.session #setup - channel.queue_declare(queue="used-queue", exclusive=True, auto_delete=True) - channel.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True) - channel.queue_bind(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"} ) + session.queue_declare(queue="used-queue", exclusive=True, auto_delete=True) + session.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True) + session.exchange_bind(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"} ) # test detection of any binding to specific queue - response = channel.binding_query(exchange="amq.match", queue="used-queue") - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - self.assertEqual(False, response.queue_not_matched) + response = session.exchange_bound(exchange="amq.match", queue="used-queue") + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assert_(not response.queue_not_matched) # test detection of specific binding to any queue - response = channel.binding_query(exchange="amq.match", arguments={"x-match":"all", "a":"A"}) - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - self.assertEqual(False, response.args_not_matched) + response = session.exchange_bound(exchange="amq.match", arguments={"x-match":"all", "a":"A"}) + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assert_(not response.args_not_matched) # test detection of specific binding to specific queue - response = channel.binding_query(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"}) - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - self.assertEqual(False, response.queue_not_matched) - self.assertEqual(False, response.args_not_matched) + response = session.exchange_bound(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"}) + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assert_(not response.queue_not_matched) + self.assert_(not response.args_not_matched) # test unmatched queue, unspecified binding - response = channel.binding_query(exchange="amq.match", queue="unused-queue") - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) + response = session.exchange_bound(exchange="amq.match", queue="unused-queue") + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) self.assertEqual(True, response.queue_not_matched) # test unspecified queue, unmatched binding - response = channel.binding_query(exchange="amq.match", arguments={"x-match":"all", "b":"B"}) - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) + response = session.exchange_bound(exchange="amq.match", arguments={"x-match":"all", "b":"B"}) + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) self.assertEqual(True, response.args_not_matched) # test matched queue, unmatched binding - response = channel.binding_query(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "b":"B"}) - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - self.assertEqual(False, response.queue_not_matched) + response = session.exchange_bound(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "b":"B"}) + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assert_(not response.queue_not_matched) self.assertEqual(True, response.args_not_matched) # test unmatched queue, matched binding - response = channel.binding_query(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "a":"A"}) - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) + response = session.exchange_bound(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "a":"A"}) + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) self.assertEqual(True, response.queue_not_matched) - self.assertEqual(False, response.args_not_matched) + self.assert_(not response.args_not_matched) # test unmatched queue, unmatched binding - response = channel.binding_query(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "b":"B"}) - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) + response = session.exchange_bound(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "b":"B"}) + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) self.assertEqual(True, response.queue_not_matched) self.assertEqual(True, response.args_not_matched) #test exchange not found - self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found) + self.assertEqual(True, session.exchange_bound(exchange="unknown-exchange").exchange_not_found) #test queue not found - self.assertEqual(True, channel.binding_query(exchange="amq.match", queue="unknown-queue").queue_not_found) + self.assertEqual(True, session.exchange_bound(exchange="amq.match", queue="unknown-queue").queue_not_found) diff --git a/python/tests_0-10_preview/__init__.py b/python/tests_0-10_preview/__init__.py new file mode 100644 index 0000000000..fe96d9e122 --- /dev/null +++ b/python/tests_0-10_preview/__init__.py @@ -0,0 +1,32 @@ +# Do not delete - marks this directory as a python package. + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from alternate_exchange import * +from broker import * +from dtx import * +from example import * +from exchange import * +from execution import * +from message import * +from query import * +from queue import * +from testlib import * +from tx import * diff --git a/python/tests_0-10_preview/alternate_exchange.py b/python/tests_0-10_preview/alternate_exchange.py new file mode 100644 index 0000000000..83f8d85811 --- /dev/null +++ b/python/tests_0-10_preview/alternate_exchange.py @@ -0,0 +1,179 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import testrunner, TestBase + +class AlternateExchangeTests(TestBase): + """ + Tests for the new mechanism for message returns introduced in 0-10 + and available in 0-9 for preview + """ + + def test_unroutable(self): + """ + Test that unroutable messages are delivered to the alternate-exchange if specified + """ + channel = self.channel + #create an exchange with an alternate defined + channel.exchange_declare(exchange="secondary", type="fanout") + channel.exchange_declare(exchange="primary", type="direct", alternate_exchange="secondary") + + #declare, bind (to the alternate exchange) and consume from a queue for 'returned' messages + channel.queue_declare(queue="returns", exclusive=True, auto_delete=True) + channel.queue_bind(queue="returns", exchange="secondary") + self.subscribe(destination="a", queue="returns") + returned = self.client.queue("a") + + #declare, bind (to the primary exchange) and consume from a queue for 'processed' messages + channel.queue_declare(queue="processed", exclusive=True, auto_delete=True) + channel.queue_bind(queue="processed", exchange="primary", routing_key="my-key") + self.subscribe(destination="b", queue="processed") + processed = self.client.queue("b") + + #publish to the primary exchange + #...one message that makes it to the 'processed' queue: + channel.message_transfer(destination="primary", content=Content("Good", properties={'routing_key':"my-key"})) + #...and one that does not: + channel.message_transfer(destination="primary", content=Content("Bad", properties={'routing_key':"unused-key"})) + + #delete the exchanges + channel.exchange_delete(exchange="primary") + channel.exchange_delete(exchange="secondary") + + #verify behaviour + self.assertEqual("Good", processed.get(timeout=1).content.body) + self.assertEqual("Bad", returned.get(timeout=1).content.body) + self.assertEmpty(processed) + self.assertEmpty(returned) + + def test_queue_delete(self): + """ + Test that messages in a queue being deleted are delivered to the alternate-exchange if specified + """ + channel = self.channel + #set up a 'dead letter queue': + channel.exchange_declare(exchange="dlq", type="fanout") + channel.queue_declare(queue="deleted", exclusive=True, auto_delete=True) + channel.queue_bind(exchange="dlq", queue="deleted") + self.subscribe(destination="dlq", queue="deleted") + dlq = self.client.queue("dlq") + + #create a queue using the dlq as its alternate exchange: + channel.queue_declare(queue="delete-me", alternate_exchange="dlq") + #send it some messages: + channel.message_transfer(content=Content("One", properties={'routing_key':"delete-me"})) + channel.message_transfer(content=Content("Two", properties={'routing_key':"delete-me"})) + channel.message_transfer(content=Content("Three", properties={'routing_key':"delete-me"})) + #delete it: + channel.queue_delete(queue="delete-me") + #delete the dlq exchange: + channel.exchange_delete(exchange="dlq") + + #check the messages were delivered to the dlq: + self.assertEqual("One", dlq.get(timeout=1).content.body) + self.assertEqual("Two", dlq.get(timeout=1).content.body) + self.assertEqual("Three", dlq.get(timeout=1).content.body) + self.assertEmpty(dlq) + + + def test_immediate(self): + """ + Test that messages in a queue being deleted are delivered to the alternate-exchange if specified + """ + channel = self.channel + #set up a 'dead letter queue': + channel.exchange_declare(exchange="dlq", type="fanout") + channel.queue_declare(queue="immediate", exclusive=True, auto_delete=True) + channel.queue_bind(exchange="dlq", queue="immediate") + self.subscribe(destination="dlq", queue="immediate") + dlq = self.client.queue("dlq") + + #create a queue using the dlq as its alternate exchange: + channel.queue_declare(queue="no-consumers", alternate_exchange="dlq", exclusive=True, auto_delete=True) + #send it some messages: + #TODO: WE HAVE LOST THE IMMEDIATE FLAG; FIX THIS ONCE ITS BACK + channel.message_transfer(content=Content("no one wants me", properties={'routing_key':"no-consumers"})) + + #check the messages were delivered to the dlq: + self.assertEqual("no one wants me", dlq.get(timeout=1).content.body) + self.assertEmpty(dlq) + + #cleanup: + channel.queue_delete(queue="no-consumers") + channel.exchange_delete(exchange="dlq") + + + def test_delete_while_used_by_queue(self): + """ + Ensure an exchange still in use as an alternate-exchange for a + queue can't be deleted + """ + channel = self.channel + channel.exchange_declare(exchange="alternate", type="fanout") + channel.queue_declare(queue="q", exclusive=True, auto_delete=True, alternate_exchange="alternate") + try: + channel.exchange_delete(exchange="alternate") + self.fail("Expected deletion of in-use alternate-exchange to fail") + except Closed, e: + #cleanup: + other = self.connect() + channel = other.channel(1) + channel.session_open() + channel.exchange_delete(exchange="alternate") + channel.session_close() + other.close() + + self.assertConnectionException(530, e.args[0]) + + + + def test_delete_while_used_by_exchange(self): + """ + Ensure an exchange still in use as an alternate-exchange for + another exchange can't be deleted + """ + channel = self.channel + channel.exchange_declare(exchange="alternate", type="fanout") + channel.exchange_declare(exchange="e", type="fanout", alternate_exchange="alternate") + try: + channel.exchange_delete(exchange="alternate") + #cleanup: + channel.exchange_delete(exchange="e") + self.fail("Expected deletion of in-use alternate-exchange to fail") + except Closed, e: + #cleanup: + other = self.connect() + channel = other.channel(1) + channel.session_open() + channel.exchange_delete(exchange="e") + channel.exchange_delete(exchange="alternate") + channel.session_close() + other.close() + + self.assertConnectionException(530, e.args[0]) + + + def assertEmpty(self, queue): + try: + msg = queue.get(timeout=1) + self.fail("Queue not empty: " + msg) + except Empty: None + diff --git a/python/tests_0-10_preview/broker.py b/python/tests_0-10_preview/broker.py new file mode 100644 index 0000000000..99936ba742 --- /dev/null +++ b/python/tests_0-10_preview/broker.py @@ -0,0 +1,111 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import testrunner, TestBase + +class BrokerTests(TestBase): + """Tests for basic Broker functionality""" + + def test_ack_and_no_ack(self): + """ + First, this test tries to receive a message with a no-ack + consumer. Second, this test tries to explicitly receive and + acknowledge a message with an acknowledging consumer. + """ + ch = self.channel + self.queue_declare(ch, queue = "myqueue") + + # No ack consumer + ctag = "tag1" + self.subscribe(ch, queue = "myqueue", destination = ctag) + body = "test no-ack" + ch.message_transfer(content = Content(body, properties = {"routing_key" : "myqueue"})) + msg = self.client.queue(ctag).get(timeout = 5) + self.assert_(msg.content.body == body) + + # Acknowledging consumer + self.queue_declare(ch, queue = "otherqueue") + ctag = "tag2" + self.subscribe(ch, queue = "otherqueue", destination = ctag, confirm_mode = 1) + ch.message_flow(destination=ctag, unit=0, value=0xFFFFFFFF) + ch.message_flow(destination=ctag, unit=1, value=0xFFFFFFFF) + body = "test ack" + ch.message_transfer(content = Content(body, properties = {"routing_key" : "otherqueue"})) + msg = self.client.queue(ctag).get(timeout = 5) + msg.complete() + self.assert_(msg.content.body == body) + + def test_simple_delivery_immediate(self): + """ + Test simple message delivery where consume is issued before publish + """ + channel = self.channel + self.exchange_declare(channel, exchange="test-exchange", type="direct") + self.queue_declare(channel, queue="test-queue") + channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") + consumer_tag = "tag1" + self.subscribe(queue="test-queue", destination=consumer_tag) + queue = self.client.queue(consumer_tag) + + body = "Immediate Delivery" + channel.message_transfer(destination="test-exchange", content = Content(body, properties = {"routing_key" : "key"})) + msg = queue.get(timeout=5) + self.assert_(msg.content.body == body) + + # TODO: Ensure we fail if immediate=True and there's no consumer. + + + def test_simple_delivery_queued(self): + """ + Test basic message delivery where publish is issued before consume + (i.e. requires queueing of the message) + """ + channel = self.channel + self.exchange_declare(channel, exchange="test-exchange", type="direct") + self.queue_declare(channel, queue="test-queue") + channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") + body = "Queued Delivery" + channel.message_transfer(destination="test-exchange", content = Content(body, properties = {"routing_key" : "key"})) + + consumer_tag = "tag1" + self.subscribe(queue="test-queue", destination=consumer_tag) + queue = self.client.queue(consumer_tag) + msg = queue.get(timeout=5) + self.assert_(msg.content.body == body) + + def test_invalid_channel(self): + channel = self.client.channel(200) + try: + channel.queue_declare(exclusive=True) + self.fail("Expected error on queue_declare for invalid channel") + except Closed, e: + self.assertConnectionException(504, e.args[0]) + + def test_closed_channel(self): + channel = self.client.channel(200) + channel.session_open() + channel.session_close() + try: + channel.queue_declare(exclusive=True) + self.fail("Expected error on queue_declare for closed channel") + except Closed, e: + if isinstance(e.args[0], str): self.fail(e) + self.assertConnectionException(504, e.args[0]) diff --git a/python/tests_0-10_preview/dtx.py b/python/tests_0-10_preview/dtx.py new file mode 100644 index 0000000000..f84f91c75a --- /dev/null +++ b/python/tests_0-10_preview/dtx.py @@ -0,0 +1,645 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import testrunner, TestBase +from struct import pack, unpack +from time import sleep + +class DtxTests(TestBase): + """ + Tests for the amqp dtx related classes. + + Tests of the form test_simple_xxx test the basic transactional + behaviour. The approach here is to 'swap' a message from one queue + to another by consuming and re-publishing in the same + transaction. That transaction is then completed in different ways + and the appropriate result verified. + + The other tests enforce more specific rules and behaviour on a + per-method or per-field basis. + """ + + XA_RBROLLBACK = 1 + XA_RBTIMEOUT = 2 + XA_OK = 0 + tx_counter = 0 + + def reset_channel(self): + self.channel.session_close() + self.channel = self.client.channel(self.channel.id + 1) + self.channel.session_open() + + def test_simple_commit(self): + """ + Test basic one-phase commit behaviour. + """ + channel = self.channel + tx = self.xid("my-xid") + self.txswap(tx, "commit") + + #neither queue should have any messages accessible + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(0, "queue-b") + + #commit + self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=True).status) + + #should close and reopen channel to ensure no unacked messages are held + self.reset_channel() + + #check result + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(1, "queue-b") + self.assertMessageId("commit", "queue-b") + + def test_simple_prepare_commit(self): + """ + Test basic two-phase commit behaviour. + """ + channel = self.channel + tx = self.xid("my-xid") + self.txswap(tx, "prepare-commit") + + #prepare + self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).status) + + #neither queue should have any messages accessible + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(0, "queue-b") + + #commit + self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=False).status) + + self.reset_channel() + + #check result + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(1, "queue-b") + self.assertMessageId("prepare-commit", "queue-b") + + + def test_simple_rollback(self): + """ + Test basic rollback behaviour. + """ + channel = self.channel + tx = self.xid("my-xid") + self.txswap(tx, "rollback") + + #neither queue should have any messages accessible + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(0, "queue-b") + + #rollback + self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status) + + self.reset_channel() + + #check result + self.assertMessageCount(1, "queue-a") + self.assertMessageCount(0, "queue-b") + self.assertMessageId("rollback", "queue-a") + + def test_simple_prepare_rollback(self): + """ + Test basic rollback behaviour after the transaction has been prepared. + """ + channel = self.channel + tx = self.xid("my-xid") + self.txswap(tx, "prepare-rollback") + + #prepare + self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).status) + + #neither queue should have any messages accessible + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(0, "queue-b") + + #rollback + self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status) + + self.reset_channel() + + #check result + self.assertMessageCount(1, "queue-a") + self.assertMessageCount(0, "queue-b") + self.assertMessageId("prepare-rollback", "queue-a") + + def test_select_required(self): + """ + check that an error is flagged if select is not issued before + start or end + """ + channel = self.channel + tx = self.xid("dummy") + try: + channel.dtx_demarcation_start(xid=tx) + + #if we get here we have failed, but need to do some cleanup: + channel.dtx_demarcation_end(xid=tx) + channel.dtx_coordination_rollback(xid=tx) + self.fail("Channel not selected for use with dtx, expected exception!") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def test_start_already_known(self): + """ + Verify that an attempt to start an association with a + transaction that is already known is not allowed (unless the + join flag is set). + """ + #create two channels on different connection & select them for use with dtx: + channel1 = self.channel + channel1.dtx_demarcation_select() + + other = self.connect() + channel2 = other.channel(1) + channel2.session_open() + channel2.dtx_demarcation_select() + + #create a xid + tx = self.xid("dummy") + #start work on one channel under that xid: + channel1.dtx_demarcation_start(xid=tx) + #then start on the other without the join set + failed = False + try: + channel2.dtx_demarcation_start(xid=tx) + except Closed, e: + failed = True + error = e + + #cleanup: + if not failed: + channel2.dtx_demarcation_end(xid=tx) + other.close() + channel1.dtx_demarcation_end(xid=tx) + channel1.dtx_coordination_rollback(xid=tx) + + #verification: + if failed: self.assertConnectionException(503, e.args[0]) + else: self.fail("Xid already known, expected exception!") + + def test_forget_xid_on_completion(self): + """ + Verify that a xid is 'forgotten' - and can therefore be used + again - once it is completed. + """ + #do some transactional work & complete the transaction + self.test_simple_commit() + # channel has been reset, so reselect for use with dtx + self.channel.dtx_demarcation_select() + + #start association for the same xid as the previously completed txn + tx = self.xid("my-xid") + self.channel.dtx_demarcation_start(xid=tx) + self.channel.dtx_demarcation_end(xid=tx) + self.channel.dtx_coordination_rollback(xid=tx) + + def test_start_join_and_resume(self): + """ + Ensure the correct error is signalled when both the join and + resume flags are set on starting an association between a + channel and a transcation. + """ + channel = self.channel + channel.dtx_demarcation_select() + tx = self.xid("dummy") + try: + channel.dtx_demarcation_start(xid=tx, join=True, resume=True) + #failed, but need some cleanup: + channel.dtx_demarcation_end(xid=tx) + channel.dtx_coordination_rollback(xid=tx) + self.fail("Join and resume both set, expected exception!") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def test_start_join(self): + """ + Verify 'join' behaviour, where a channel is associated with a + transaction that is already associated with another channel. + """ + #create two channels & select them for use with dtx: + channel1 = self.channel + channel1.dtx_demarcation_select() + + channel2 = self.client.channel(2) + channel2.session_open() + channel2.dtx_demarcation_select() + + #setup + channel1.queue_declare(queue="one", exclusive=True, auto_delete=True) + channel1.queue_declare(queue="two", exclusive=True, auto_delete=True) + channel1.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage")) + channel1.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage")) + + #create a xid + tx = self.xid("dummy") + #start work on one channel under that xid: + channel1.dtx_demarcation_start(xid=tx) + #then start on the other with the join flag set + channel2.dtx_demarcation_start(xid=tx, join=True) + + #do work through each channel + self.swap(channel1, "one", "two")#swap 'a' from 'one' to 'two' + self.swap(channel2, "two", "one")#swap 'b' from 'two' to 'one' + + #mark end on both channels + channel1.dtx_demarcation_end(xid=tx) + channel2.dtx_demarcation_end(xid=tx) + + #commit and check + channel1.dtx_coordination_commit(xid=tx, one_phase=True) + self.assertMessageCount(1, "one") + self.assertMessageCount(1, "two") + self.assertMessageId("a", "two") + self.assertMessageId("b", "one") + + + def test_suspend_resume(self): + """ + Test suspension and resumption of an association + """ + channel = self.channel + channel.dtx_demarcation_select() + + #setup + channel.queue_declare(queue="one", exclusive=True, auto_delete=True) + channel.queue_declare(queue="two", exclusive=True, auto_delete=True) + channel.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage")) + channel.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage")) + + tx = self.xid("dummy") + + channel.dtx_demarcation_start(xid=tx) + self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two' + channel.dtx_demarcation_end(xid=tx, suspend=True) + + channel.dtx_demarcation_start(xid=tx, resume=True) + self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one' + channel.dtx_demarcation_end(xid=tx) + + #commit and check + channel.dtx_coordination_commit(xid=tx, one_phase=True) + self.assertMessageCount(1, "one") + self.assertMessageCount(1, "two") + self.assertMessageId("a", "two") + self.assertMessageId("b", "one") + + def test_suspend_start_end_resume(self): + """ + Test suspension and resumption of an association with work + done on another transaction when the first transaction is + suspended + """ + channel = self.channel + channel.dtx_demarcation_select() + + #setup + channel.queue_declare(queue="one", exclusive=True, auto_delete=True) + channel.queue_declare(queue="two", exclusive=True, auto_delete=True) + channel.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage")) + channel.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage")) + + tx = self.xid("dummy") + + channel.dtx_demarcation_start(xid=tx) + self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two' + channel.dtx_demarcation_end(xid=tx, suspend=True) + + channel.dtx_demarcation_start(xid=tx, resume=True) + self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one' + channel.dtx_demarcation_end(xid=tx) + + #commit and check + channel.dtx_coordination_commit(xid=tx, one_phase=True) + self.assertMessageCount(1, "one") + self.assertMessageCount(1, "two") + self.assertMessageId("a", "two") + self.assertMessageId("b", "one") + + def test_end_suspend_and_fail(self): + """ + Verify that the correct error is signalled if the suspend and + fail flag are both set when disassociating a transaction from + the channel + """ + channel = self.channel + channel.dtx_demarcation_select() + tx = self.xid("suspend_and_fail") + channel.dtx_demarcation_start(xid=tx) + try: + channel.dtx_demarcation_end(xid=tx, suspend=True, fail=True) + self.fail("Suspend and fail both set, expected exception!") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + #cleanup + other = self.connect() + channel = other.channel(1) + channel.session_open() + channel.dtx_coordination_rollback(xid=tx) + channel.session_close() + other.close() + + + def test_end_unknown_xid(self): + """ + Verifies that the correct exception is thrown when an attempt + is made to end the association for a xid not previously + associated with the channel + """ + channel = self.channel + channel.dtx_demarcation_select() + tx = self.xid("unknown-xid") + try: + channel.dtx_demarcation_end(xid=tx) + self.fail("Attempted to end association with unknown xid, expected exception!") + except Closed, e: + #FYI: this is currently *not* the exception specified, but I think the spec is wrong! Confirming... + self.assertConnectionException(503, e.args[0]) + + def test_end(self): + """ + Verify that the association is terminated by end and subsequent + operations are non-transactional + """ + channel = self.client.channel(2) + channel.session_open() + channel.queue_declare(queue="tx-queue", exclusive=True, auto_delete=True) + + #publish a message under a transaction + channel.dtx_demarcation_select() + tx = self.xid("dummy") + channel.dtx_demarcation_start(xid=tx) + channel.message_transfer(content=Content(properties={'routing_key':"tx-queue", 'message_id':"one"}, body="DtxMessage")) + channel.dtx_demarcation_end(xid=tx) + + #now that association with txn is ended, publish another message + channel.message_transfer(content=Content(properties={'routing_key':"tx-queue", 'message_id':"two"}, body="DtxMessage")) + + #check the second message is available, but not the first + self.assertMessageCount(1, "tx-queue") + self.subscribe(channel, queue="tx-queue", destination="results", confirm_mode=1) + msg = self.client.queue("results").get(timeout=1) + self.assertEqual("two", msg.content['message_id']) + channel.message_cancel(destination="results") + #ack the message then close the channel + msg.complete() + channel.session_close() + + channel = self.channel + #commit the transaction and check that the first message (and + #only the first message) is then delivered + channel.dtx_coordination_commit(xid=tx, one_phase=True) + self.assertMessageCount(1, "tx-queue") + self.assertMessageId("one", "tx-queue") + + def test_invalid_commit_one_phase_true(self): + """ + Test that a commit with one_phase = True is rejected if the + transaction in question has already been prepared. + """ + other = self.connect() + tester = other.channel(1) + tester.session_open() + tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True) + tester.dtx_demarcation_select() + tx = self.xid("dummy") + tester.dtx_demarcation_start(xid=tx) + tester.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever")) + tester.dtx_demarcation_end(xid=tx) + tester.dtx_coordination_prepare(xid=tx) + failed = False + try: + tester.dtx_coordination_commit(xid=tx, one_phase=True) + except Closed, e: + failed = True + error = e + + if failed: + self.channel.dtx_coordination_rollback(xid=tx) + self.assertConnectionException(503, e.args[0]) + else: + tester.session_close() + other.close() + self.fail("Invalid use of one_phase=True, expected exception!") + + def test_invalid_commit_one_phase_false(self): + """ + Test that a commit with one_phase = False is rejected if the + transaction in question has not yet been prepared. + """ + """ + Test that a commit with one_phase = True is rejected if the + transaction in question has already been prepared. + """ + other = self.connect() + tester = other.channel(1) + tester.session_open() + tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True) + tester.dtx_demarcation_select() + tx = self.xid("dummy") + tester.dtx_demarcation_start(xid=tx) + tester.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever")) + tester.dtx_demarcation_end(xid=tx) + failed = False + try: + tester.dtx_coordination_commit(xid=tx, one_phase=False) + except Closed, e: + failed = True + error = e + + if failed: + self.channel.dtx_coordination_rollback(xid=tx) + self.assertConnectionException(503, e.args[0]) + else: + tester.session_close() + other.close() + self.fail("Invalid use of one_phase=False, expected exception!") + + def test_implicit_end(self): + """ + Test that an association is implicitly ended when the channel + is closed (whether by exception or explicit client request) + and the transaction in question is marked as rollback only. + """ + channel1 = self.channel + channel2 = self.client.channel(2) + channel2.session_open() + + #setup: + channel2.queue_declare(queue="dummy", exclusive=True, auto_delete=True) + channel2.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever")) + tx = self.xid("dummy") + + channel2.dtx_demarcation_select() + channel2.dtx_demarcation_start(xid=tx) + channel2.message_subscribe(queue="dummy", destination="dummy", confirm_mode=1) + channel2.message_flow(destination="dummy", unit=0, value=1) + channel2.message_flow(destination="dummy", unit=1, value=0xFFFFFFFF) + self.client.queue("dummy").get(timeout=1).complete() + channel2.message_cancel(destination="dummy") + channel2.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever")) + channel2.session_close() + + self.assertEqual(self.XA_RBROLLBACK, channel1.dtx_coordination_prepare(xid=tx).status) + channel1.dtx_coordination_rollback(xid=tx) + + def test_get_timeout(self): + """ + Check that get-timeout returns the correct value, (and that a + transaction with a timeout can complete normally) + """ + channel = self.channel + tx = self.xid("dummy") + + channel.dtx_demarcation_select() + channel.dtx_demarcation_start(xid=tx) + self.assertEqual(0, channel.dtx_coordination_get_timeout(xid=tx).timeout) + channel.dtx_coordination_set_timeout(xid=tx, timeout=60) + self.assertEqual(60, channel.dtx_coordination_get_timeout(xid=tx).timeout) + self.assertEqual(self.XA_OK, channel.dtx_demarcation_end(xid=tx).status) + self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status) + + def test_set_timeout(self): + """ + Test the timeout of a transaction results in the expected + behaviour + """ + #open new channel to allow self.channel to be used in checking te queue + channel = self.client.channel(2) + channel.session_open() + #setup: + tx = self.xid("dummy") + channel.queue_declare(queue="queue-a", exclusive=True, auto_delete=True) + channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True) + channel.message_transfer(content=Content(properties={'routing_key':"queue-a", 'message_id':"timeout"}, body="DtxMessage")) + + channel.dtx_demarcation_select() + channel.dtx_demarcation_start(xid=tx) + self.swap(channel, "queue-a", "queue-b") + channel.dtx_coordination_set_timeout(xid=tx, timeout=2) + sleep(3) + #check that the work has been rolled back already + self.assertMessageCount(1, "queue-a") + self.assertMessageCount(0, "queue-b") + self.assertMessageId("timeout", "queue-a") + #check the correct codes are returned when we try to complete the txn + self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_demarcation_end(xid=tx).status) + self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_coordination_rollback(xid=tx).status) + + + + def test_recover(self): + """ + Test basic recover behaviour + """ + channel = self.channel + + channel.dtx_demarcation_select() + channel.queue_declare(queue="dummy", exclusive=True, auto_delete=True) + + prepared = [] + for i in range(1, 10): + tx = self.xid("tx%s" % (i)) + channel.dtx_demarcation_start(xid=tx) + channel.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="message%s" % (i))) + channel.dtx_demarcation_end(xid=tx) + if i in [2, 5, 6, 8]: + channel.dtx_coordination_prepare(xid=tx) + prepared.append(tx) + else: + channel.dtx_coordination_rollback(xid=tx) + + xids = channel.dtx_coordination_recover().in_doubt + + #rollback the prepared transactions returned by recover + for x in xids: + channel.dtx_coordination_rollback(xid=x) + + #validate against the expected list of prepared transactions + actual = set(xids) + expected = set(prepared) + intersection = actual.intersection(expected) + + if intersection != expected: + missing = expected.difference(actual) + extra = actual.difference(expected) + for x in missing: + channel.dtx_coordination_rollback(xid=x) + self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra)) + + def test_bad_resume(self): + """ + Test that a resume on a session not selected for use with dtx fails + """ + channel = self.channel + try: + channel.dtx_demarcation_start(resume=True) + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def xid(self, txid): + DtxTests.tx_counter += 1 + branchqual = "v%s" % DtxTests.tx_counter + return pack('!LBB', 0, len(txid), len(branchqual)) + txid + branchqual + + def txswap(self, tx, id): + channel = self.channel + #declare two queues: + channel.queue_declare(queue="queue-a", exclusive=True, auto_delete=True) + channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True) + #put message with specified id on one queue: + channel.message_transfer(content=Content(properties={'routing_key':"queue-a", 'message_id':id}, body="DtxMessage")) + + #start the transaction: + channel.dtx_demarcation_select() + self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_start(xid=tx).status) + + #'swap' the message from one queue to the other, under that transaction: + self.swap(self.channel, "queue-a", "queue-b") + + #mark the end of the transactional work: + self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_end(xid=tx).status) + + def swap(self, channel, src, dest): + #consume from src: + channel.message_subscribe(destination="temp-swap", queue=src, confirm_mode=1) + channel.message_flow(destination="temp-swap", unit=0, value=1) + channel.message_flow(destination="temp-swap", unit=1, value=0xFFFFFFFF) + msg = self.client.queue("temp-swap").get(timeout=1) + channel.message_cancel(destination="temp-swap") + msg.complete(); + + #re-publish to dest + channel.message_transfer(content=Content(properties={'routing_key':dest, 'message_id':msg.content['message_id']}, + body=msg.content.body)) + + def assertMessageCount(self, expected, queue): + self.assertEqual(expected, self.channel.queue_query(queue=queue).message_count) + + def assertMessageId(self, expected, queue): + self.channel.message_subscribe(queue=queue, destination="results") + self.channel.message_flow(destination="results", unit=0, value=1) + self.channel.message_flow(destination="results", unit=1, value=0xFFFFFFFF) + self.assertEqual(expected, self.client.queue("results").get(timeout=1).content['message_id']) + self.channel.message_cancel(destination="results") diff --git a/python/tests_0-10_preview/example.py b/python/tests_0-10_preview/example.py new file mode 100644 index 0000000000..da5ee2441f --- /dev/null +++ b/python/tests_0-10_preview/example.py @@ -0,0 +1,95 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.content import Content +from qpid.testlib import testrunner, TestBase + +class ExampleTest (TestBase): + """ + An example Qpid test, illustrating the unittest framework and the + python Qpid client. The test class must inherit TestBase. The + test code uses the Qpid client to interact with a qpid broker and + verify it behaves as expected. + """ + + def test_example(self): + """ + An example test. Note that test functions must start with 'test_' + to be recognized by the test framework. + """ + + # By inheriting TestBase, self.client is automatically connected + # and self.channel is automatically opened as channel(1) + # Other channel methods mimic the protocol. + channel = self.channel + + # Now we can send regular commands. If you want to see what the method + # arguments mean or what other commands are available, you can use the + # python builtin help() method. For example: + #help(chan) + #help(chan.exchange_declare) + + # If you want browse the available protocol methods without being + # connected to a live server you can use the amqp-doc utility: + # + # Usage amqp-doc [<options>] <spec> [<pattern_1> ... <pattern_n>] + # + # Options: + # -e, --regexp use regex instead of glob when matching + + # Now that we know what commands are available we can use them to + # interact with the server. + + # Here we use ordinal arguments. + self.exchange_declare(channel, 0, "test", "direct") + + # Here we use keyword arguments. + self.queue_declare(channel, queue="test-queue") + channel.queue_bind(queue="test-queue", exchange="test", routing_key="key") + + # Call Channel.basic_consume to register as a consumer. + # All the protocol methods return a message object. The message object + # has fields corresponding to the reply method fields, plus a content + # field that is filled if the reply includes content. In this case the + # interesting field is the consumer_tag. + channel.message_subscribe(queue="test-queue", destination="consumer_tag") + channel.message_flow(destination="consumer_tag", unit=0, value=0xFFFFFFFF) + channel.message_flow(destination="consumer_tag", unit=1, value=0xFFFFFFFF) + + # We can use the Client.queue(...) method to access the queue + # corresponding to our consumer_tag. + queue = self.client.queue("consumer_tag") + + # Now lets publish a message and see if our consumer gets it. To do + # this we need to import the Content class. + sent = Content("Hello World!") + sent["routing_key"] = "key" + channel.message_transfer(destination="test", content=sent) + + # Now we'll wait for the message to arrive. We can use the timeout + # argument in case the server hangs. By default queue.get() will wait + # until a message arrives or the connection to the server dies. + msg = queue.get(timeout=10) + + # And check that we got the right response with assertEqual + self.assertEqual(sent.body, msg.content.body) + + # Now acknowledge the message. + msg.complete() + diff --git a/python/tests_0-10_preview/exchange.py b/python/tests_0-10_preview/exchange.py new file mode 100644 index 0000000000..86c39b7736 --- /dev/null +++ b/python/tests_0-10_preview/exchange.py @@ -0,0 +1,335 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +Tests for exchange behaviour. + +Test classes ending in 'RuleTests' are derived from rules in amqp.xml. +""" + +import Queue, logging +from qpid.testlib import TestBase +from qpid.content import Content +from qpid.client import Closed + + +class StandardExchangeVerifier: + """Verifies standard exchange behavior. + + Used as base class for classes that test standard exchanges.""" + + def verifyDirectExchange(self, ex): + """Verify that ex behaves like a direct exchange.""" + self.queue_declare(queue="q") + self.channel.queue_bind(queue="q", exchange=ex, routing_key="k") + self.assertPublishConsume(exchange=ex, queue="q", routing_key="k") + try: + self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk") + self.fail("Expected Empty exception") + except Queue.Empty: None # Expected + + def verifyFanOutExchange(self, ex): + """Verify that ex behaves like a fanout exchange.""" + self.queue_declare(queue="q") + self.channel.queue_bind(queue="q", exchange=ex) + self.queue_declare(queue="p") + self.channel.queue_bind(queue="p", exchange=ex) + for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex) + + def verifyTopicExchange(self, ex): + """Verify that ex behaves like a topic exchange""" + self.queue_declare(queue="a") + self.channel.queue_bind(queue="a", exchange=ex, routing_key="a.#.b.*") + q = self.consume("a") + self.assertPublishGet(q, ex, "a.b.x") + self.assertPublishGet(q, ex, "a.x.b.x") + self.assertPublishGet(q, ex, "a.x.x.b.x") + # Shouldn't match + self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b"})) + self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b.x.y"})) + self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"x.a.b.x"})) + self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b"})) + self.assert_(q.empty()) + + def verifyHeadersExchange(self, ex): + """Verify that ex is a headers exchange""" + self.queue_declare(queue="q") + self.channel.queue_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} ) + q = self.consume("q") + headers = {"name":"fred", "age":3} + self.assertPublishGet(q, exchange=ex, properties=headers) + self.channel.message_transfer(destination=ex) # No headers, won't deliver + self.assertEmpty(q); + + +class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier): + """ + The server SHOULD implement these standard exchange types: topic, headers. + + Client attempts to declare an exchange with each of these standard types. + """ + + def testDirect(self): + """Declare and test a direct exchange""" + self.exchange_declare(0, exchange="d", type="direct") + self.verifyDirectExchange("d") + + def testFanout(self): + """Declare and test a fanout exchange""" + self.exchange_declare(0, exchange="f", type="fanout") + self.verifyFanOutExchange("f") + + def testTopic(self): + """Declare and test a topic exchange""" + self.exchange_declare(0, exchange="t", type="topic") + self.verifyTopicExchange("t") + + def testHeaders(self): + """Declare and test a headers exchange""" + self.exchange_declare(0, exchange="h", type="headers") + self.verifyHeadersExchange("h") + + +class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier): + """ + The server MUST, in each virtual host, pre-declare an exchange instance + for each standard exchange type that it implements, where the name of the + exchange instance is amq. followed by the exchange type name. + + Client creates a temporary queue and attempts to bind to each required + exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if + those types are defined). + """ + def testAmqDirect(self): self.verifyDirectExchange("amq.direct") + + def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout") + + def testAmqTopic(self): self.verifyTopicExchange("amq.topic") + + def testAmqMatch(self): self.verifyHeadersExchange("amq.match") + +class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier): + """ + The server MUST predeclare a direct exchange to act as the default exchange + for content Publish methods and for default queue bindings. + + Client checks that the default exchange is active by specifying a queue + binding with no exchange name, and publishing a message with a suitable + routing key but without specifying the exchange name, then ensuring that + the message arrives in the queue correctly. + """ + def testDefaultExchange(self): + # Test automatic binding by queue name. + self.queue_declare(queue="d") + self.assertPublishConsume(queue="d", routing_key="d") + # Test explicit bind to default queue + self.verifyDirectExchange("") + + +# TODO aconway 2006-09-27: Fill in empty tests: + +class DefaultAccessRuleTests(TestBase): + """ + The server MUST NOT allow clients to access the default exchange except + by specifying an empty exchange name in the Queue.Bind and content Publish + methods. + """ + +class ExtensionsRuleTests(TestBase): + """ + The server MAY implement other exchange types as wanted. + """ + + +class DeclareMethodMinimumRuleTests(TestBase): + """ + The server SHOULD support a minimum of 16 exchanges per virtual host and + ideally, impose no limit except as defined by available resources. + + The client creates as many exchanges as it can until the server reports + an error; the number of exchanges successfuly created must be at least + sixteen. + """ + + +class DeclareMethodTicketFieldValidityRuleTests(TestBase): + """ + The client MUST provide a valid access ticket giving "active" access to + the realm in which the exchange exists or will be created, or "passive" + access if the if-exists flag is set. + + Client creates access ticket with wrong access rights and attempts to use + in this method. + """ + + +class DeclareMethodExchangeFieldReservedRuleTests(TestBase): + """ + Exchange names starting with "amq." are reserved for predeclared and + standardised exchanges. The client MUST NOT attempt to create an exchange + starting with "amq.". + + + """ + + +class DeclareMethodTypeFieldTypedRuleTests(TestBase): + """ + Exchanges cannot be redeclared with different types. The client MUST not + attempt to redeclare an existing exchange with a different type than used + in the original Exchange.Declare method. + + + """ + + +class DeclareMethodTypeFieldSupportRuleTests(TestBase): + """ + The client MUST NOT attempt to create an exchange with a type that the + server does not support. + + + """ + + +class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase): + """ + If set, and the exchange does not already exist, the server MUST raise a + channel exception with reply code 404 (not found). + """ + def test(self): + try: + self.channel.exchange_declare(exchange="humpty_dumpty", passive=True) + self.fail("Expected 404 for passive declaration of unknown exchange.") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + +class DeclareMethodDurableFieldSupportRuleTests(TestBase): + """ + The server MUST support both durable and transient exchanges. + + + """ + + +class DeclareMethodDurableFieldStickyRuleTests(TestBase): + """ + The server MUST ignore the durable field if the exchange already exists. + + + """ + + +class DeclareMethodAutoDeleteFieldStickyRuleTests(TestBase): + """ + The server MUST ignore the auto-delete field if the exchange already + exists. + + + """ + + +class DeleteMethodTicketFieldValidityRuleTests(TestBase): + """ + The client MUST provide a valid access ticket giving "active" access + rights to the exchange's access realm. + + Client creates access ticket with wrong access rights and attempts to use + in this method. + """ + + +class DeleteMethodExchangeFieldExistsRuleTests(TestBase): + """ + The client MUST NOT attempt to delete an exchange that does not exist. + """ + + +class HeadersExchangeTests(TestBase): + """ + Tests for headers exchange functionality. + """ + def setUp(self): + TestBase.setUp(self) + self.queue_declare(queue="q") + self.q = self.consume("q") + + def myAssertPublishGet(self, headers): + self.assertPublishGet(self.q, exchange="amq.match", properties=headers) + + def myBasicPublish(self, headers): + self.channel.message_transfer(destination="amq.match", content=Content("foobar", properties={'application_headers':headers})) + + def testMatchAll(self): + self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3}) + self.myAssertPublishGet({"name":"fred", "age":3}) + self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"}) + + # None of these should match + self.myBasicPublish({}) + self.myBasicPublish({"name":"barney"}) + self.myBasicPublish({"name":10}) + self.myBasicPublish({"name":"fred", "age":2}) + self.assertEmpty(self.q) + + def testMatchAny(self): + self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3}) + self.myAssertPublishGet({"name":"fred"}) + self.myAssertPublishGet({"name":"fred", "ignoreme":10}) + self.myAssertPublishGet({"ignoreme":10, "age":3}) + + # Wont match + self.myBasicPublish({}) + self.myBasicPublish({"irrelevant":0}) + self.assertEmpty(self.q) + + +class MiscellaneousErrorsTests(TestBase): + """ + Test some miscellaneous error conditions + """ + def testTypeNotKnown(self): + try: + self.channel.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type") + self.fail("Expected 503 for declaration of unknown exchange type.") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def testDifferentDeclaredType(self): + self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="direct") + try: + self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="topic") + self.fail("Expected 530 for redeclaration of exchange with different type.") + except Closed, e: + self.assertConnectionException(530, e.args[0]) + #cleanup + other = self.connect() + c2 = other.channel(1) + c2.session_open() + c2.exchange_delete(exchange="test_different_declared_type_exchange") + +class ExchangeTests(TestBase): + def testHeadersBindNoMatchArg(self): + self.channel.queue_declare(queue="q", exclusive=True, auto_delete=True) + try: + self.channel.queue_bind(queue="q", exchange="amq.match", arguments={"name":"fred" , "age":3} ) + self.fail("Expected failure for missing x-match arg.") + except Closed, e: + self.assertConnectionException(541, e.args[0]) diff --git a/python/tests_0-10_preview/execution.py b/python/tests_0-10_preview/execution.py new file mode 100644 index 0000000000..3ff6d8ea65 --- /dev/null +++ b/python/tests_0-10_preview/execution.py @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.content import Content +from qpid.testlib import testrunner, TestBase + +class ExecutionTests (TestBase): + def test_flush(self): + channel = self.channel + for i in [1, 2, 3]: + channel.message_transfer( + content=Content(properties={'routing_key':str(i)})) + assert(channel.completion.wait(channel.completion.command_id, timeout=1)) diff --git a/python/tests_0-10_preview/message.py b/python/tests_0-10_preview/message.py new file mode 100644 index 0000000000..a3d32bdb2d --- /dev/null +++ b/python/tests_0-10_preview/message.py @@ -0,0 +1,834 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import testrunner, TestBase +from qpid.reference import Reference, ReferenceId + +class MessageTests(TestBase): + """Tests for 'methods' on the amqp message 'class'""" + + def test_consume_no_local(self): + """ + Test that the no_local flag is honoured in the consume method + """ + channel = self.channel + #setup, declare two queues: + channel.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True) + channel.queue_declare(queue="test-queue-1b", exclusive=True, auto_delete=True) + #establish two consumers one of which excludes delivery of locally sent messages + self.subscribe(destination="local_included", queue="test-queue-1a") + self.subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True) + + #send a message + channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1a"}, body="consume_no_local")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1b"}, body="consume_no_local")) + + #check the queues of the two consumers + excluded = self.client.queue("local_excluded") + included = self.client.queue("local_included") + msg = included.get(timeout=1) + self.assertEqual("consume_no_local", msg.content.body) + try: + excluded.get(timeout=1) + self.fail("Received locally published message though no_local=true") + except Empty: None + + def test_consume_no_local_awkward(self): + + """ + If an exclusive queue gets a no-local delivered to it, that + message could 'block' delivery of subsequent messages or it + could be left on the queue, possibly never being consumed + (this is the case for example in the qpid JMS mapping of + topics). This test excercises a Qpid C++ broker hack that + deletes such messages. + """ + + channel = self.channel + #setup: + channel.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) + #establish consumer which excludes delivery of locally sent messages + self.subscribe(destination="local_excluded", queue="test-queue", no_local=True) + + #send a 'local' message + channel.message_transfer(content=Content(properties={'routing_key' : "test-queue"}, body="local")) + + #send a non local message + other = self.connect() + channel2 = other.channel(1) + channel2.session_open() + channel2.message_transfer(content=Content(properties={'routing_key' : "test-queue"}, body="foreign")) + channel2.session_close() + other.close() + + #check that the second message only is delivered + excluded = self.client.queue("local_excluded") + msg = excluded.get(timeout=1) + self.assertEqual("foreign", msg.content.body) + try: + excluded.get(timeout=1) + self.fail("Received extra message") + except Empty: None + #check queue is empty + self.assertEqual(0, channel.queue_query(queue="test-queue").message_count) + + + def test_consume_exclusive(self): + """ + Test that the exclusive flag is honoured in the consume method + """ + channel = self.channel + #setup, declare a queue: + channel.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True) + + #check that an exclusive consumer prevents other consumer being created: + self.subscribe(destination="first", queue="test-queue-2", exclusive=True) + try: + self.subscribe(destination="second", queue="test-queue-2") + self.fail("Expected consume request to fail due to previous exclusive consumer") + except Closed, e: + self.assertChannelException(403, e.args[0]) + + #open new channel and cleanup last consumer: + channel = self.client.channel(2) + channel.session_open() + + #check that an exclusive consumer cannot be created if a consumer already exists: + self.subscribe(channel, destination="first", queue="test-queue-2") + try: + self.subscribe(destination="second", queue="test-queue-2", exclusive=True) + self.fail("Expected exclusive consume request to fail due to previous consumer") + except Closed, e: + self.assertChannelException(403, e.args[0]) + + def test_consume_queue_errors(self): + """ + Test error conditions associated with the queue field of the consume method: + """ + channel = self.channel + try: + #queue specified but doesn't exist: + self.subscribe(queue="invalid-queue", destination="") + self.fail("Expected failure when consuming from non-existent queue") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + channel = self.client.channel(2) + channel.session_open() + try: + #queue not specified and none previously declared for channel: + self.subscribe(channel, queue="", destination="") + self.fail("Expected failure when consuming from unspecified queue") + except Closed, e: + self.assertConnectionException(530, e.args[0]) + + def test_consume_unique_consumers(self): + """ + Ensure unique consumer tags are enforced + """ + channel = self.channel + #setup, declare a queue: + channel.queue_declare(queue="test-queue-3", exclusive=True, auto_delete=True) + + #check that attempts to use duplicate tags are detected and prevented: + self.subscribe(destination="first", queue="test-queue-3") + try: + self.subscribe(destination="first", queue="test-queue-3") + self.fail("Expected consume request to fail due to non-unique tag") + except Closed, e: + self.assertConnectionException(530, e.args[0]) + + def test_cancel(self): + """ + Test compliance of the basic.cancel method + """ + channel = self.channel + #setup, declare a queue: + channel.queue_declare(queue="test-queue-4", exclusive=True, auto_delete=True) + self.subscribe(destination="my-consumer", queue="test-queue-4") + channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="One")) + + #cancel should stop messages being delivered + channel.message_cancel(destination="my-consumer") + channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="Two")) + myqueue = self.client.queue("my-consumer") + msg = myqueue.get(timeout=1) + self.assertEqual("One", msg.content.body) + try: + msg = myqueue.get(timeout=1) + self.fail("Got message after cancellation: " + msg) + except Empty: None + + #cancellation of non-existant consumers should be handled without error + channel.message_cancel(destination="my-consumer") + channel.message_cancel(destination="this-never-existed") + + + def test_ack(self): + """ + Test basic ack/recover behaviour + """ + channel = self.channel + channel.queue_declare(queue="test-ack-queue", exclusive=True, auto_delete=True) + + self.subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1) + queue = self.client.queue("consumer_tag") + + channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="One")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Two")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Three")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Four")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Five")) + + msg1 = queue.get(timeout=1) + msg2 = queue.get(timeout=1) + msg3 = queue.get(timeout=1) + msg4 = queue.get(timeout=1) + msg5 = queue.get(timeout=1) + + self.assertEqual("One", msg1.content.body) + self.assertEqual("Two", msg2.content.body) + self.assertEqual("Three", msg3.content.body) + self.assertEqual("Four", msg4.content.body) + self.assertEqual("Five", msg5.content.body) + + msg2.complete(cumulative=True)#One and Two + msg4.complete(cumulative=False) + + channel.message_recover(requeue=False) + + msg3b = queue.get(timeout=1) + msg5b = queue.get(timeout=1) + + self.assertEqual("Three", msg3b.content.body) + self.assertEqual("Five", msg5b.content.body) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message: " + extra.content.body) + except Empty: None + + + def test_recover(self): + """ + Test recover behaviour + """ + channel = self.channel + channel.queue_declare(queue="queue-a", exclusive=True, auto_delete=True) + channel.queue_bind(exchange="amq.fanout", queue="queue-a") + channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True) + channel.queue_bind(exchange="amq.fanout", queue="queue-b") + + self.subscribe(queue="queue-a", destination="unconfirmed", confirm_mode=1) + self.subscribe(queue="queue-b", destination="confirmed", confirm_mode=0) + confirmed = self.client.queue("confirmed") + unconfirmed = self.client.queue("unconfirmed") + + data = ["One", "Two", "Three", "Four", "Five"] + for d in data: + channel.message_transfer(destination="amq.fanout", content=Content(body=d)) + + for q in [confirmed, unconfirmed]: + for d in data: + self.assertEqual(d, q.get(timeout=1).content.body) + self.assertEmpty(q) + + channel.message_recover(requeue=False) + + self.assertEmpty(confirmed) + + while len(data): + msg = None + for d in data: + msg = unconfirmed.get(timeout=1) + self.assertEqual(d, msg.content.body) + self.assertEqual(True, msg.content['redelivered']) + self.assertEmpty(unconfirmed) + data.remove(msg.content.body) + msg.complete(cumulative=False) + channel.message_recover(requeue=False) + + + def test_recover_requeue(self): + """ + Test requeing on recovery + """ + channel = self.channel + channel.queue_declare(queue="test-requeue", exclusive=True, auto_delete=True) + + self.subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1) + queue = self.client.queue("consumer_tag") + + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="One")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Two")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Three")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Four")) + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Five")) + + msg1 = queue.get(timeout=1) + msg2 = queue.get(timeout=1) + msg3 = queue.get(timeout=1) + msg4 = queue.get(timeout=1) + msg5 = queue.get(timeout=1) + + self.assertEqual("One", msg1.content.body) + self.assertEqual("Two", msg2.content.body) + self.assertEqual("Three", msg3.content.body) + self.assertEqual("Four", msg4.content.body) + self.assertEqual("Five", msg5.content.body) + + msg2.complete(cumulative=True) #One and Two + msg4.complete(cumulative=False) #Four + + channel.message_cancel(destination="consumer_tag") + + #publish a new message + channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Six")) + #requeue unacked messages (Three and Five) + channel.message_recover(requeue=True) + + self.subscribe(queue="test-requeue", destination="consumer_tag") + queue2 = self.client.queue("consumer_tag") + + msg3b = queue2.get(timeout=1) + msg5b = queue2.get(timeout=1) + + self.assertEqual("Three", msg3b.content.body) + self.assertEqual("Five", msg5b.content.body) + + self.assertEqual(True, msg3b.content['redelivered']) + self.assertEqual(True, msg5b.content['redelivered']) + + self.assertEqual("Six", queue2.get(timeout=1).content.body) + + try: + extra = queue2.get(timeout=1) + self.fail("Got unexpected message in second queue: " + extra.content.body) + except Empty: None + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in original queue: " + extra.content.body) + except Empty: None + + + def test_qos_prefetch_count(self): + """ + Test that the prefetch count specified is honoured + """ + #setup: declare queue and subscribe + channel = self.channel + channel.queue_declare(queue="test-prefetch-count", exclusive=True, auto_delete=True) + subscription = self.subscribe(queue="test-prefetch-count", destination="consumer_tag", confirm_mode=1) + queue = self.client.queue("consumer_tag") + + #set prefetch to 5: + channel.message_qos(prefetch_count=5) + + #publish 10 messages: + for i in range(1, 11): + channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-count"}, body="Message %d" % i)) + + #only 5 messages should have been delivered: + for i in range(1, 6): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 6th message in original queue: " + extra.content.body) + except Empty: None + + #ack messages and check that the next set arrive ok: + msg.complete() + + for i in range(6, 11): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + msg.complete() + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 11th message in original queue: " + extra.content.body) + except Empty: None + + + + def test_qos_prefetch_size(self): + """ + Test that the prefetch size specified is honoured + """ + #setup: declare queue and subscribe + channel = self.channel + channel.queue_declare(queue="test-prefetch-size", exclusive=True, auto_delete=True) + subscription = self.subscribe(queue="test-prefetch-size", destination="consumer_tag", confirm_mode=1) + queue = self.client.queue("consumer_tag") + + #set prefetch to 50 bytes (each message is 9 or 10 bytes): + channel.message_qos(prefetch_size=50) + + #publish 10 messages: + for i in range(1, 11): + channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body="Message %d" % i)) + + #only 5 messages should have been delivered (i.e. 45 bytes worth): + for i in range(1, 6): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 6th message in original queue: " + extra.content.body) + except Empty: None + + #ack messages and check that the next set arrive ok: + msg.complete() + + for i in range(6, 11): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + msg.complete() + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 11th message in original queue: " + extra.content.body) + except Empty: None + + #make sure that a single oversized message still gets delivered + large = "abcdefghijklmnopqrstuvwxyz" + large = large + "-" + large; + channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body=large)) + msg = queue.get(timeout=1) + self.assertEqual(large, msg.content.body) + + def test_reject(self): + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True, auto_delete=True, alternate_exchange="amq.fanout") + channel.queue_declare(queue = "r", exclusive=True, auto_delete=True) + channel.queue_bind(queue = "r", exchange = "amq.fanout") + + self.subscribe(queue = "q", destination = "consumer", confirm_mode = 1) + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body="blah, blah")) + msg = self.client.queue("consumer").get(timeout = 1) + self.assertEquals(msg.content.body, "blah, blah") + channel.message_reject([msg.command_id, msg.command_id]) + + self.subscribe(queue = "r", destination = "checker") + msg = self.client.queue("checker").get(timeout = 1) + self.assertEquals(msg.content.body, "blah, blah") + + def test_credit_flow_messages(self): + """ + Test basic credit based flow control with unit = message + """ + #declare an exclusive queue + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True, auto_delete=True) + #create consumer (for now that defaults to infinite credit) + channel.message_subscribe(queue = "q", destination = "c") + channel.message_flow_mode(mode = 0, destination = "c") + #send batch of messages to queue + for i in range(1, 11): + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) + + #set message credit to finite amount (less than enough for all messages) + channel.message_flow(unit = 0, value = 5, destination = "c") + #set infinite byte credit + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c") + #check that expected number were received + q = self.client.queue("c") + for i in range(1, 6): + self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i) + self.assertEmpty(q) + + #increase credit again and check more are received + for i in range(6, 11): + channel.message_flow(unit = 0, value = 1, destination = "c") + self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i) + self.assertEmpty(q) + + def test_credit_flow_bytes(self): + """ + Test basic credit based flow control with unit = bytes + """ + #declare an exclusive queue + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True, auto_delete=True) + #create consumer (for now that defaults to infinite credit) + channel.message_subscribe(queue = "q", destination = "c") + channel.message_flow_mode(mode = 0, destination = "c") + #send batch of messages to queue + for i in range(10): + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) + + #each message is currently interpreted as requiring msg_size bytes of credit + msg_size = 35 + + #set byte credit to finite amount (less than enough for all messages) + channel.message_flow(unit = 1, value = msg_size*5, destination = "c") + #set infinite message credit + channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c") + #check that expected number were received + q = self.client.queue("c") + for i in range(5): + self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh") + self.assertEmpty(q) + + #increase credit again and check more are received + for i in range(5): + channel.message_flow(unit = 1, value = msg_size, destination = "c") + self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh") + self.assertEmpty(q) + + + def test_window_flow_messages(self): + """ + Test basic window based flow control with unit = message + """ + #declare an exclusive queue + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True, auto_delete=True) + #create consumer (for now that defaults to infinite credit) + channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1) + channel.message_flow_mode(mode = 1, destination = "c") + #send batch of messages to queue + for i in range(1, 11): + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) + + #set message credit to finite amount (less than enough for all messages) + channel.message_flow(unit = 0, value = 5, destination = "c") + #set infinite byte credit + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c") + #check that expected number were received + q = self.client.queue("c") + for i in range(1, 6): + msg = q.get(timeout = 1) + self.assertDataEquals(channel, msg, "Message %d" % i) + self.assertEmpty(q) + + #acknowledge messages and check more are received + msg.complete(cumulative=True) + for i in range(6, 11): + self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i) + self.assertEmpty(q) + + + def test_window_flow_bytes(self): + """ + Test basic window based flow control with unit = bytes + """ + #declare an exclusive queue + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True, auto_delete=True) + #create consumer (for now that defaults to infinite credit) + channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1) + channel.message_flow_mode(mode = 1, destination = "c") + #send batch of messages to queue + for i in range(10): + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) + + #each message is currently interpreted as requiring msg_size bytes of credit + msg_size = 40 + + #set byte credit to finite amount (less than enough for all messages) + channel.message_flow(unit = 1, value = msg_size*5, destination = "c") + #set infinite message credit + channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c") + #check that expected number were received + q = self.client.queue("c") + msgs = [] + for i in range(5): + msg = q.get(timeout = 1) + msgs.append(msg) + self.assertDataEquals(channel, msg, "abcdefgh") + self.assertEmpty(q) + + #ack each message individually and check more are received + for i in range(5): + msg = msgs.pop() + msg.complete(cumulative=False) + self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh") + self.assertEmpty(q) + + def test_subscribe_not_acquired(self): + """ + Test the not-acquired modes works as expected for a simple case + """ + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True, auto_delete=True) + for i in range(1, 6): + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i)) + + self.subscribe(queue = "q", destination = "a", acquire_mode = 1) + self.subscribe(queue = "q", destination = "b", acquire_mode = 1) + + for i in range(6, 11): + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i)) + + #both subscribers should see all messages + qA = self.client.queue("a") + qB = self.client.queue("b") + for i in range(1, 11): + for q in [qA, qB]: + msg = q.get(timeout = 1) + self.assertEquals("Message %s" % i, msg.content.body) + msg.complete() + + #messages should still be on the queue: + self.assertEquals(10, channel.queue_query(queue = "q").message_count) + + def test_acquire(self): + """ + Test explicit acquire function + """ + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True, auto_delete=True) + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "acquire me")) + + self.subscribe(queue = "q", destination = "a", acquire_mode = 1, confirm_mode = 1) + msg = self.client.queue("a").get(timeout = 1) + #message should still be on the queue: + self.assertEquals(1, channel.queue_query(queue = "q").message_count) + + channel.message_acquire([msg.command_id, msg.command_id]) + #check that we get notification (i.e. message_acquired) + response = channel.control_queue.get(timeout=1) + self.assertEquals(response.transfers, [msg.command_id, msg.command_id]) + #message should have been removed from the queue: + self.assertEquals(0, channel.queue_query(queue = "q").message_count) + msg.complete() + + + + + def test_release(self): + """ + Test explicit release function + """ + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True, auto_delete=True) + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "release me")) + + self.subscribe(queue = "q", destination = "a", acquire_mode = 0, confirm_mode = 1) + msg = self.client.queue("a").get(timeout = 1) + channel.message_cancel(destination = "a") + channel.message_release([msg.command_id, msg.command_id]) + msg.complete() + + #message should not have been removed from the queue: + self.assertEquals(1, channel.queue_query(queue = "q").message_count) + + def test_release_ordering(self): + """ + Test order of released messages is as expected + """ + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True, auto_delete=True) + for i in range (1, 11): + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "released message %s" % (i))) + + channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1) + channel.message_flow(unit = 0, value = 10, destination = "a") + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") + queue = self.client.queue("a") + first = queue.get(timeout = 1) + for i in range (2, 10): + self.assertEquals("released message %s" % (i), queue.get(timeout = 1).content.body) + last = queue.get(timeout = 1) + self.assertEmpty(queue) + channel.message_release([first.command_id, last.command_id]) + last.complete()#will re-allocate credit, as in window mode + for i in range (1, 11): + self.assertEquals("released message %s" % (i), queue.get(timeout = 1).content.body) + + def test_ranged_ack(self): + """ + Test acking of messages ranges + """ + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True, auto_delete=True) + for i in range (1, 11): + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message %s" % (i))) + + channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1) + channel.message_flow(unit = 0, value = 10, destination = "a") + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") + queue = self.client.queue("a") + for i in range (1, 11): + self.assertEquals("message %s" % (i), queue.get(timeout = 1).content.body) + self.assertEmpty(queue) + + #ack all but the third message (command id 2) + channel.execution_complete(cumulative_execution_mark=0xFFFFFFFF, ranged_execution_set=[0,1,3,6,7,7,8,9]) + channel.message_recover() + self.assertEquals("message 3", queue.get(timeout = 1).content.body) + self.assertEmpty(queue) + + def test_subscribe_not_acquired_2(self): + channel = self.channel + + #publish some messages + self.queue_declare(queue = "q", exclusive=True, auto_delete=True) + for i in range(1, 11): + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i))) + + #consume some of them + channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1) + channel.message_flow_mode(mode = 0, destination = "a") + channel.message_flow(unit = 0, value = 5, destination = "a") + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") + + queue = self.client.queue("a") + for i in range(1, 6): + msg = queue.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.content.body) + msg.complete() + self.assertEmpty(queue) + + #now create a not-acquired subscriber + channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1) + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") + + #check it gets those not consumed + queue = self.client.queue("b") + channel.message_flow(unit = 0, value = 1, destination = "b") + for i in range(6, 11): + msg = queue.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.content.body) + msg.complete() + channel.message_flow(unit = 0, value = 1, destination = "b") + self.assertEmpty(queue) + + #check all 'browsed' messages are still on the queue + self.assertEqual(5, channel.queue_query(queue="q").message_count) + + def test_subscribe_not_acquired_3(self): + channel = self.channel + + #publish some messages + self.queue_declare(queue = "q", exclusive=True, auto_delete=True) + for i in range(1, 11): + channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i))) + + #create a not-acquired subscriber + channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1) + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") + channel.message_flow(unit = 0, value = 10, destination = "a") + + #browse through messages + queue = self.client.queue("a") + for i in range(1, 11): + msg = queue.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.content.body) + if (i % 2): + #try to acquire every second message + channel.message_acquire([msg.command_id, msg.command_id]) + #check that acquire succeeds + response = channel.control_queue.get(timeout=1) + self.assertEquals(response.transfers, [msg.command_id, msg.command_id]) + msg.complete() + self.assertEmpty(queue) + + #create a second not-acquired subscriber + channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1) + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") + channel.message_flow(unit = 0, value = 1, destination = "b") + #check it gets those not consumed + queue = self.client.queue("b") + for i in [2,4,6,8,10]: + msg = queue.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.content.body) + msg.complete() + channel.message_flow(unit = 0, value = 1, destination = "b") + self.assertEmpty(queue) + + #check all 'browsed' messages are still on the queue + self.assertEqual(5, channel.queue_query(queue="q").message_count) + + def test_release_unacquired(self): + channel = self.channel + + #create queue + self.queue_declare(queue = "q", exclusive=True, auto_delete=True, durable=True) + + #send message + channel.message_transfer(content=Content(properties={'routing_key' : "q", 'delivery_mode':2}, body = "my-message")) + + #create two 'browsers' + channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1) + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") + channel.message_flow(unit = 0, value = 10, destination = "a") + queueA = self.client.queue("a") + + channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1) + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") + channel.message_flow(unit = 0, value = 10, destination = "b") + queueB = self.client.queue("b") + + #have each browser release the message + msgA = queueA.get(timeout = 1) + channel.message_release([msgA.command_id, msgA.command_id]) + + msgB = queueB.get(timeout = 1) + channel.message_release([msgB.command_id, msgB.command_id]) + + #cancel browsers + channel.message_cancel(destination = "a") + channel.message_cancel(destination = "b") + + #create consumer + channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1, acquire_mode=0) + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c") + channel.message_flow(unit = 0, value = 10, destination = "c") + queueC = self.client.queue("c") + #consume the message then ack it + msgC = queueC.get(timeout = 1) + msgC.complete() + #ensure there are no other messages + self.assertEmpty(queueC) + + def test_no_size(self): + self.queue_declare(queue = "q", exclusive=True, auto_delete=True) + + ch = self.channel + ch.message_transfer(content=SizelessContent(properties={'routing_key' : "q"}, body="message-body")) + + ch.message_subscribe(queue = "q", destination="d", confirm_mode = 0) + ch.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "d") + ch.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "d") + + queue = self.client.queue("d") + msg = queue.get(timeout = 3) + self.assertEquals("message-body", msg.content.body) + + def assertDataEquals(self, channel, msg, expected): + self.assertEquals(expected, msg.content.body) + + def assertEmpty(self, queue): + try: + extra = queue.get(timeout=1) + self.fail("Queue not empty, contains: " + extra.content.body) + except Empty: None + +class SizelessContent(Content): + + def size(self): + return None diff --git a/python/tests_0-10_preview/persistence.py b/python/tests_0-10_preview/persistence.py new file mode 100644 index 0000000000..ad578474eb --- /dev/null +++ b/python/tests_0-10_preview/persistence.py @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import testrunner, TestBase + +class PersistenceTests(TestBase): + def test_delete_queue_after_publish(self): + channel = self.channel + channel.synchronous = False + + #create queue + channel.queue_declare(queue = "q", auto_delete=True, durable=True) + + #send message + for i in range(1, 10): + channel.message_transfer(content=Content(properties={'routing_key' : "q", 'delivery_mode':2}, body = "my-message")) + + channel.synchronous = True + #explicitly delete queue + channel.queue_delete(queue = "q") + + def test_ack_message_from_deleted_queue(self): + channel = self.channel + channel.synchronous = False + + #create queue + channel.queue_declare(queue = "q", auto_delete=True, durable=True) + + #send message + channel.message_transfer(content=Content(properties={'routing_key' : "q", 'delivery_mode':2}, body = "my-message")) + + #create consumer + channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=0) + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") + channel.message_flow(unit = 0, value = 10, destination = "a") + queue = self.client.queue("a") + + #consume the message, cancel subscription (triggering auto-delete), then ack it + msg = queue.get(timeout = 5) + channel.message_cancel(destination = "a") + msg.complete() + + def test_queue_deletion(self): + channel = self.channel + channel.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True) + channel.queue_bind(exchange="amq.topic", queue="durable-subscriber-queue", routing_key="xyz") + channel.message_transfer(destination= "amq.topic", content=Content(properties={'routing_key' : "xyz", 'delivery_mode':2}, body = "my-message")) + channel.queue_delete(queue = "durable-subscriber-queue") + diff --git a/python/tests_0-10_preview/query.py b/python/tests_0-10_preview/query.py new file mode 100644 index 0000000000..eba2ee6dd1 --- /dev/null +++ b/python/tests_0-10_preview/query.py @@ -0,0 +1,227 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import testrunner, TestBase + +class QueryTests(TestBase): + """Tests for various query methods introduced in 0-10 and available in 0-9 for preview""" + + def test_exchange_query(self): + """ + Test that the exchange_query method works as expected + """ + channel = self.channel + #check returned type for the standard exchanges + self.assert_type("direct", channel.exchange_query(name="amq.direct")) + self.assert_type("topic", channel.exchange_query(name="amq.topic")) + self.assert_type("fanout", channel.exchange_query(name="amq.fanout")) + self.assert_type("headers", channel.exchange_query(name="amq.match")) + self.assert_type("direct", channel.exchange_query(name="")) + #declare an exchange + channel.exchange_declare(exchange="my-test-exchange", type= "direct", durable=False) + #check that the result of a query is as expected + response = channel.exchange_query(name="my-test-exchange") + self.assert_type("direct", response) + self.assertEqual(False, response.durable) + self.assertEqual(False, response.not_found) + #delete the exchange + channel.exchange_delete(exchange="my-test-exchange") + #check that the query now reports not-found + self.assertEqual(True, channel.exchange_query(name="my-test-exchange").not_found) + + def assert_type(self, expected_type, response): + self.assertEqual(expected_type, response.__getattr__("type")) + + def test_binding_query_direct(self): + """ + Test that the binding_query method works as expected with the direct exchange + """ + self.binding_query_with_key("amq.direct") + + def test_binding_query_topic(self): + """ + Test that the binding_query method works as expected with the direct exchange + """ + self.binding_query_with_key("amq.topic") + + def binding_query_with_key(self, exchange_name): + channel = self.channel + #setup: create two queues + channel.queue_declare(queue="used-queue", exclusive=True, auto_delete=True) + channel.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True) + + channel.queue_bind(exchange=exchange_name, queue="used-queue", routing_key="used-key") + + # test detection of any binding to specific queue + response = channel.binding_query(exchange=exchange_name, queue="used-queue") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(False, response.queue_not_matched) + + # test detection of specific binding to any queue + response = channel.binding_query(exchange=exchange_name, routing_key="used-key") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(False, response.key_not_matched) + + # test detection of specific binding to specific queue + response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="used-key") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(False, response.queue_not_matched) + self.assertEqual(False, response.key_not_matched) + + # test unmatched queue, unspecified binding + response = channel.binding_query(exchange=exchange_name, queue="unused-queue") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(True, response.queue_not_matched) + + # test unspecified queue, unmatched binding + response = channel.binding_query(exchange=exchange_name, routing_key="unused-key") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(True, response.key_not_matched) + + # test matched queue, unmatched binding + response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="unused-key") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(False, response.queue_not_matched) + self.assertEqual(True, response.key_not_matched) + + # test unmatched queue, matched binding + response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="used-key") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(True, response.queue_not_matched) + self.assertEqual(False, response.key_not_matched) + + # test unmatched queue, unmatched binding + response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="unused-key") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(True, response.queue_not_matched) + self.assertEqual(True, response.key_not_matched) + + #test exchange not found + self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found) + + #test queue not found + self.assertEqual(True, channel.binding_query(exchange=exchange_name, queue="unknown-queue").queue_not_found) + + + def test_binding_query_fanout(self): + """ + Test that the binding_query method works as expected with fanout exchange + """ + channel = self.channel + #setup + channel.queue_declare(queue="used-queue", exclusive=True, auto_delete=True) + channel.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True) + channel.queue_bind(exchange="amq.fanout", queue="used-queue") + + # test detection of any binding to specific queue + response = channel.binding_query(exchange="amq.fanout", queue="used-queue") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(False, response.queue_not_matched) + + # test unmatched queue, unspecified binding + response = channel.binding_query(exchange="amq.fanout", queue="unused-queue") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(True, response.queue_not_matched) + + #test exchange not found + self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found) + + #test queue not found + self.assertEqual(True, channel.binding_query(exchange="amq.fanout", queue="unknown-queue").queue_not_found) + + def test_binding_query_header(self): + """ + Test that the binding_query method works as expected with headers exchanges + """ + channel = self.channel + #setup + channel.queue_declare(queue="used-queue", exclusive=True, auto_delete=True) + channel.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True) + channel.queue_bind(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"} ) + + # test detection of any binding to specific queue + response = channel.binding_query(exchange="amq.match", queue="used-queue") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(False, response.queue_not_matched) + + # test detection of specific binding to any queue + response = channel.binding_query(exchange="amq.match", arguments={"x-match":"all", "a":"A"}) + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(False, response.args_not_matched) + + # test detection of specific binding to specific queue + response = channel.binding_query(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"}) + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(False, response.queue_not_matched) + self.assertEqual(False, response.args_not_matched) + + # test unmatched queue, unspecified binding + response = channel.binding_query(exchange="amq.match", queue="unused-queue") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(True, response.queue_not_matched) + + # test unspecified queue, unmatched binding + response = channel.binding_query(exchange="amq.match", arguments={"x-match":"all", "b":"B"}) + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(True, response.args_not_matched) + + # test matched queue, unmatched binding + response = channel.binding_query(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "b":"B"}) + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(False, response.queue_not_matched) + self.assertEqual(True, response.args_not_matched) + + # test unmatched queue, matched binding + response = channel.binding_query(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "a":"A"}) + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(True, response.queue_not_matched) + self.assertEqual(False, response.args_not_matched) + + # test unmatched queue, unmatched binding + response = channel.binding_query(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "b":"B"}) + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(True, response.queue_not_matched) + self.assertEqual(True, response.args_not_matched) + + #test exchange not found + self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found) + + #test queue not found + self.assertEqual(True, channel.binding_query(exchange="amq.match", queue="unknown-queue").queue_not_found) + diff --git a/python/tests_0-10_preview/queue.py b/python/tests_0-10_preview/queue.py new file mode 100644 index 0000000000..7b3590d11b --- /dev/null +++ b/python/tests_0-10_preview/queue.py @@ -0,0 +1,338 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import testrunner, TestBase + +class QueueTests(TestBase): + """Tests for 'methods' on the amqp queue 'class'""" + + def test_purge(self): + """ + Test that the purge method removes messages from the queue + """ + channel = self.channel + #setup, declare a queue and add some messages to it: + channel.exchange_declare(exchange="test-exchange", type="direct") + channel.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) + channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") + channel.message_transfer(destination="test-exchange", content=Content("one", properties={'routing_key':"key"})) + channel.message_transfer(destination="test-exchange", content=Content("two", properties={'routing_key':"key"})) + channel.message_transfer(destination="test-exchange", content=Content("three", properties={'routing_key':"key"})) + + #check that the queue now reports 3 messages: + channel.queue_declare(queue="test-queue") + reply = channel.queue_query(queue="test-queue") + self.assertEqual(3, reply.message_count) + + #now do the purge, then test that three messages are purged and the count drops to 0 + channel.queue_purge(queue="test-queue"); + reply = channel.queue_query(queue="test-queue") + self.assertEqual(0, reply.message_count) + + #send a further message and consume it, ensuring that the other messages are really gone + channel.message_transfer(destination="test-exchange", content=Content("four", properties={'routing_key':"key"})) + self.subscribe(queue="test-queue", destination="tag") + queue = self.client.queue("tag") + msg = queue.get(timeout=1) + self.assertEqual("four", msg.content.body) + + #check error conditions (use new channels): + channel = self.client.channel(2) + channel.session_open() + try: + #queue specified but doesn't exist: + channel.queue_purge(queue="invalid-queue") + self.fail("Expected failure when purging non-existent queue") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + channel = self.client.channel(3) + channel.session_open() + try: + #queue not specified and none previously declared for channel: + channel.queue_purge() + self.fail("Expected failure when purging unspecified queue") + except Closed, e: + self.assertConnectionException(530, e.args[0]) + + #cleanup + other = self.connect() + channel = other.channel(1) + channel.session_open() + channel.exchange_delete(exchange="test-exchange") + + def test_declare_exclusive(self): + """ + Test that the exclusive field is honoured in queue.declare + """ + # TestBase.setUp has already opened channel(1) + c1 = self.channel + # Here we open a second separate connection: + other = self.connect() + c2 = other.channel(1) + c2.session_open() + + #declare an exclusive queue: + c1.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True) + try: + #other connection should not be allowed to declare this: + c2.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True) + self.fail("Expected second exclusive queue_declare to raise a channel exception") + except Closed, e: + self.assertChannelException(405, e.args[0]) + + + def test_declare_passive(self): + """ + Test that the passive field is honoured in queue.declare + """ + channel = self.channel + #declare an exclusive queue: + channel.queue_declare(queue="passive-queue-1", exclusive=True, auto_delete=True) + channel.queue_declare(queue="passive-queue-1", passive=True) + try: + #other connection should not be allowed to declare this: + channel.queue_declare(queue="passive-queue-2", passive=True) + self.fail("Expected passive declaration of non-existant queue to raise a channel exception") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + + def test_bind(self): + """ + Test various permutations of the queue.bind method + """ + channel = self.channel + channel.queue_declare(queue="queue-1", exclusive=True, auto_delete=True) + + #straightforward case, both exchange & queue exist so no errors expected: + channel.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1") + + #use the queue name where the routing key is not specified: + channel.queue_bind(queue="queue-1", exchange="amq.direct") + + #try and bind to non-existant exchange + try: + channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1") + self.fail("Expected bind to non-existant exchange to fail") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + #need to reopen a channel: + channel = self.client.channel(2) + channel.session_open() + + #try and bind non-existant queue: + try: + channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1") + self.fail("Expected bind of non-existant queue to fail") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + def test_unbind_direct(self): + self.unbind_test(exchange="amq.direct", routing_key="key") + + def test_unbind_topic(self): + self.unbind_test(exchange="amq.topic", routing_key="key") + + def test_unbind_fanout(self): + self.unbind_test(exchange="amq.fanout") + + def test_unbind_headers(self): + self.unbind_test(exchange="amq.match", args={ "x-match":"all", "a":"b"}, headers={"a":"b"}) + + def unbind_test(self, exchange, routing_key="", args=None, headers={}): + #bind two queues and consume from them + channel = self.channel + + channel.queue_declare(queue="queue-1", exclusive=True, auto_delete=True) + channel.queue_declare(queue="queue-2", exclusive=True, auto_delete=True) + + self.subscribe(queue="queue-1", destination="queue-1") + self.subscribe(queue="queue-2", destination="queue-2") + + queue1 = self.client.queue("queue-1") + queue2 = self.client.queue("queue-2") + + channel.queue_bind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) + channel.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args) + + #send a message that will match both bindings + channel.message_transfer(destination=exchange, + content=Content("one", properties={'routing_key':routing_key, 'application_headers':headers})) + + #unbind first queue + channel.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) + + #send another message + channel.message_transfer(destination=exchange, + content=Content("two", properties={'routing_key':routing_key, 'application_headers':headers})) + + #check one queue has both messages and the other has only one + self.assertEquals("one", queue1.get(timeout=1).content.body) + try: + msg = queue1.get(timeout=1) + self.fail("Got extra message: %s" % msg.content.body) + except Empty: pass + + self.assertEquals("one", queue2.get(timeout=1).content.body) + self.assertEquals("two", queue2.get(timeout=1).content.body) + try: + msg = queue2.get(timeout=1) + self.fail("Got extra message: " + msg) + except Empty: pass + + + def test_delete_simple(self): + """ + Test core queue deletion behaviour + """ + channel = self.channel + + #straight-forward case: + channel.queue_declare(queue="delete-me") + channel.message_transfer(content=Content("a", properties={'routing_key':"delete-me"})) + channel.message_transfer(content=Content("b", properties={'routing_key':"delete-me"})) + channel.message_transfer(content=Content("c", properties={'routing_key':"delete-me"})) + channel.queue_delete(queue="delete-me") + #check that it has gone be declaring passively + try: + channel.queue_declare(queue="delete-me", passive=True) + self.fail("Queue has not been deleted") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + #check attempted deletion of non-existant queue is handled correctly: + channel = self.client.channel(2) + channel.session_open() + try: + channel.queue_delete(queue="i-dont-exist", if_empty=True) + self.fail("Expected delete of non-existant queue to fail") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + + + def test_delete_ifempty(self): + """ + Test that if_empty field of queue_delete is honoured + """ + channel = self.channel + + #create a queue and add a message to it (use default binding): + channel.queue_declare(queue="delete-me-2") + channel.queue_declare(queue="delete-me-2", passive=True) + channel.message_transfer(content=Content("message", properties={'routing_key':"delete-me-2"})) + + #try to delete, but only if empty: + try: + channel.queue_delete(queue="delete-me-2", if_empty=True) + self.fail("Expected delete if_empty to fail for non-empty queue") + except Closed, e: + self.assertChannelException(406, e.args[0]) + + #need new channel now: + channel = self.client.channel(2) + channel.session_open() + + #empty queue: + self.subscribe(channel, destination="consumer_tag", queue="delete-me-2") + queue = self.client.queue("consumer_tag") + msg = queue.get(timeout=1) + self.assertEqual("message", msg.content.body) + channel.message_cancel(destination="consumer_tag") + + #retry deletion on empty queue: + channel.queue_delete(queue="delete-me-2", if_empty=True) + + #check that it has gone by declaring passively: + try: + channel.queue_declare(queue="delete-me-2", passive=True) + self.fail("Queue has not been deleted") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + def test_delete_ifunused(self): + """ + Test that if_unused field of queue_delete is honoured + """ + channel = self.channel + + #create a queue and register a consumer: + channel.queue_declare(queue="delete-me-3") + channel.queue_declare(queue="delete-me-3", passive=True) + self.subscribe(destination="consumer_tag", queue="delete-me-3") + + #need new channel now: + channel2 = self.client.channel(2) + channel2.session_open() + #try to delete, but only if empty: + try: + channel2.queue_delete(queue="delete-me-3", if_unused=True) + self.fail("Expected delete if_unused to fail for queue with existing consumer") + except Closed, e: + self.assertChannelException(406, e.args[0]) + + + channel.message_cancel(destination="consumer_tag") + channel.queue_delete(queue="delete-me-3", if_unused=True) + #check that it has gone by declaring passively: + try: + channel.queue_declare(queue="delete-me-3", passive=True) + self.fail("Queue has not been deleted") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + + def test_autodelete_shared(self): + """ + Test auto-deletion (of non-exclusive queues) + """ + channel = self.channel + other = self.connect() + channel2 = other.channel(1) + channel2.session_open() + + channel.queue_declare(queue="auto-delete-me", auto_delete=True) + + #consume from both channels + reply = channel.basic_consume(queue="auto-delete-me") + channel2.basic_consume(queue="auto-delete-me") + + #implicit cancel + channel2.session_close() + + #check it is still there + channel.queue_declare(queue="auto-delete-me", passive=True) + + #explicit cancel => queue is now unused again: + channel.basic_cancel(consumer_tag=reply.consumer_tag) + + #NOTE: this assumes there is no timeout in use + + #check that it has gone be declaring passively + try: + channel.queue_declare(queue="auto-delete-me", passive=True) + self.fail("Expected queue to have been deleted") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + diff --git a/python/tests_0-10_preview/testlib.py b/python/tests_0-10_preview/testlib.py new file mode 100644 index 0000000000..a0355c4ce0 --- /dev/null +++ b/python/tests_0-10_preview/testlib.py @@ -0,0 +1,66 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# +# Tests for the testlib itself. +# + +from qpid.content import Content +from qpid.testlib import testrunner, TestBase +from Queue import Empty + +import sys +from traceback import * + +def mytrace(frame, event, arg): + print_stack(frame); + print "====" + return mytrace + +class TestBaseTest(TestBase): + """Verify TestBase functions work as expected""" + + def testAssertEmptyPass(self): + """Test assert empty works""" + self.queue_declare(queue="empty") + q = self.consume("empty") + self.assertEmpty(q) + try: + q.get(timeout=1) + self.fail("Queue is not empty.") + except Empty: None # Ignore + + def testAssertEmptyFail(self): + self.queue_declare(queue="full") + q = self.consume("full") + self.channel.message_transfer(content=Content("", properties={'routing_key':"full"})) + try: + self.assertEmpty(q); + self.fail("assertEmpty did not assert on non-empty queue") + except AssertionError: None # Ignore + + def testMessageProperties(self): + """Verify properties are passed with message""" + props={"x":1, "y":2} + self.queue_declare(queue="q") + q = self.consume("q") + self.assertPublishGet(q, routing_key="q", properties=props) + + + diff --git a/python/tests_0-10_preview/tx.py b/python/tests_0-10_preview/tx.py new file mode 100644 index 0000000000..3fd1065af3 --- /dev/null +++ b/python/tests_0-10_preview/tx.py @@ -0,0 +1,231 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import testrunner, TestBase + +class TxTests(TestBase): + """ + Tests for 'methods' on the amqp tx 'class' + """ + + def test_commit(self): + """ + Test that commited publishes are delivered and commited acks are not re-delivered + """ + channel2 = self.client.channel(2) + channel2.session_open() + self.perform_txn_work(channel2, "tx-commit-a", "tx-commit-b", "tx-commit-c") + channel2.tx_commit() + channel2.session_close() + + #use a different channel with new subscriptions to ensure + #there is no redelivery of acked messages: + channel = self.channel + channel.tx_select() + + self.subscribe(channel, queue="tx-commit-a", destination="qa", confirm_mode=1) + queue_a = self.client.queue("qa") + + self.subscribe(channel, queue="tx-commit-b", destination="qb", confirm_mode=1) + queue_b = self.client.queue("qb") + + self.subscribe(channel, queue="tx-commit-c", destination="qc", confirm_mode=1) + queue_c = self.client.queue("qc") + + #check results + for i in range(1, 5): + msg = queue_c.get(timeout=1) + self.assertEqual("TxMessage %d" % i, msg.content.body) + msg.complete() + + msg = queue_b.get(timeout=1) + self.assertEqual("TxMessage 6", msg.content.body) + msg.complete() + + msg = queue_a.get(timeout=1) + self.assertEqual("TxMessage 7", msg.content.body) + msg.complete() + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.content.body) + except Empty: None + + #cleanup + channel.tx_commit() + + def test_auto_rollback(self): + """ + Test that a channel closed with an open transaction is effectively rolled back + """ + channel2 = self.client.channel(2) + channel2.session_open() + queue_a, queue_b, queue_c = self.perform_txn_work(channel2, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c") + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.content.body) + except Empty: None + + channel2.session_close() + channel = self.channel + channel.tx_select() + + self.subscribe(channel, queue="tx-autorollback-a", destination="qa", confirm_mode=1) + queue_a = self.client.queue("qa") + + self.subscribe(channel, queue="tx-autorollback-b", destination="qb", confirm_mode=1) + queue_b = self.client.queue("qb") + + self.subscribe(channel, queue="tx-autorollback-c", destination="qc", confirm_mode=1) + queue_c = self.client.queue("qc") + + #check results + for i in range(1, 5): + msg = queue_a.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + msg.complete() + + msg = queue_b.get(timeout=1) + self.assertEqual("Message 6", msg.content.body) + msg.complete() + + msg = queue_c.get(timeout=1) + self.assertEqual("Message 7", msg.content.body) + msg.complete() + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.content.body) + except Empty: None + + #cleanup + channel.tx_commit() + + def test_rollback(self): + """ + Test that rolled back publishes are not delivered and rolled back acks are re-delivered + """ + channel = self.channel + queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c") + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.content.body) + except Empty: None + + #stop subscriptions (ensures no delivery occurs during rollback as messages are requeued) + for d in ["sub_a", "sub_b", "sub_c"]: + channel.message_stop(destination=d) + + channel.tx_rollback() + + #restart susbcriptions + for d in ["sub_a", "sub_b", "sub_c"]: + channel.message_flow(destination=d, unit=0, value=0xFFFFFFFF) + channel.message_flow(destination=d, unit=1, value=0xFFFFFFFF) + + #check results + for i in range(1, 5): + msg = queue_a.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + msg.complete() + + msg = queue_b.get(timeout=1) + self.assertEqual("Message 6", msg.content.body) + msg.complete() + + msg = queue_c.get(timeout=1) + self.assertEqual("Message 7", msg.content.body) + msg.complete() + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.content.body) + except Empty: None + + #cleanup + channel.tx_commit() + + def perform_txn_work(self, channel, name_a, name_b, name_c): + """ + Utility method that does some setup and some work under a transaction. Used for testing both + commit and rollback + """ + #setup: + channel.queue_declare(queue=name_a, exclusive=True, auto_delete=True) + channel.queue_declare(queue=name_b, exclusive=True, auto_delete=True) + channel.queue_declare(queue=name_c, exclusive=True, auto_delete=True) + + key = "my_key_" + name_b + topic = "my_topic_" + name_c + + channel.queue_bind(queue=name_b, exchange="amq.direct", routing_key=key) + channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic) + + for i in range(1, 5): + channel.message_transfer(content=Content(properties={'routing_key':name_a, 'message_id':"msg%d" % i}, body="Message %d" % i)) + + channel.message_transfer(destination="amq.direct", + content=Content(properties={'routing_key':key, 'message_id':"msg6"}, body="Message 6")) + channel.message_transfer(destination="amq.topic", + content=Content(properties={'routing_key':topic, 'message_id':"msg7"}, body="Message 7")) + + channel.tx_select() + + #consume and ack messages + self.subscribe(channel, queue=name_a, destination="sub_a", confirm_mode=1) + queue_a = self.client.queue("sub_a") + for i in range(1, 5): + msg = queue_a.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + msg.complete() + + self.subscribe(channel, queue=name_b, destination="sub_b", confirm_mode=1) + queue_b = self.client.queue("sub_b") + msg = queue_b.get(timeout=1) + self.assertEqual("Message 6", msg.content.body) + msg.complete() + + sub_c = self.subscribe(channel, queue=name_c, destination="sub_c", confirm_mode=1) + queue_c = self.client.queue("sub_c") + msg = queue_c.get(timeout=1) + self.assertEqual("Message 7", msg.content.body) + msg.complete() + + #publish messages + for i in range(1, 5): + channel.message_transfer(destination="amq.topic", + content=Content(properties={'routing_key':topic, 'message_id':"tx-msg%d" % i}, + body="TxMessage %d" % i)) + + channel.message_transfer(destination="amq.direct", + content=Content(properties={'routing_key':key, 'message_id':"tx-msg6"}, + body="TxMessage 6")) + channel.message_transfer(content=Content(properties={'routing_key':name_a, 'message_id':"tx-msg7"}, + body="TxMessage 7")) + return queue_a, queue_b, queue_c |