diff options
Diffstat (limited to 'Final/python/tests')
-rw-r--r-- | Final/python/tests/__init__.py | 20 | ||||
-rw-r--r-- | Final/python/tests/basic.py | 431 | ||||
-rw-r--r-- | Final/python/tests/broker.py | 122 | ||||
-rw-r--r-- | Final/python/tests/example.py | 94 | ||||
-rw-r--r-- | Final/python/tests/exchange.py | 327 | ||||
-rw-r--r-- | Final/python/tests/queue.py | 255 | ||||
-rw-r--r-- | Final/python/tests/testlib.py | 66 | ||||
-rw-r--r-- | Final/python/tests/tx.py | 209 |
8 files changed, 1524 insertions, 0 deletions
diff --git a/Final/python/tests/__init__.py b/Final/python/tests/__init__.py new file mode 100644 index 0000000000..9a09d2d04f --- /dev/null +++ b/Final/python/tests/__init__.py @@ -0,0 +1,20 @@ +# Do not delete - marks this directory as a python package. + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# diff --git a/Final/python/tests/basic.py b/Final/python/tests/basic.py new file mode 100644 index 0000000000..bbbfa8ebf9 --- /dev/null +++ b/Final/python/tests/basic.py @@ -0,0 +1,431 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import testrunner, TestBase + +class BasicTests(TestBase): + """Tests for 'methods' on the amqp basic 'class'""" + + def test_consume_no_local(self): + """ + Test that the no_local flag is honoured in the consume method + """ + channel = self.channel + #setup, declare two queues: + channel.queue_declare(queue="test-queue-1a", exclusive=True) + channel.queue_declare(queue="test-queue-1b", exclusive=True) + #establish two consumers one of which excludes delivery of locally sent messages + channel.basic_consume(consumer_tag="local_included", queue="test-queue-1a") + channel.basic_consume(consumer_tag="local_excluded", queue="test-queue-1b", no_local=True) + + #send a message + channel.basic_publish(routing_key="test-queue-1a", content=Content("consume_no_local")) + channel.basic_publish(routing_key="test-queue-1b", content=Content("consume_no_local")) + + #check the queues of the two consumers + excluded = self.client.queue("local_excluded") + included = self.client.queue("local_included") + msg = included.get(timeout=1) + self.assertEqual("consume_no_local", msg.content.body) + try: + excluded.get(timeout=1) + self.fail("Received locally published message though no_local=true") + except Empty: None + + + def test_consume_exclusive(self): + """ + Test that the exclusive flag is honoured in the consume method + """ + channel = self.channel + #setup, declare a queue: + channel.queue_declare(queue="test-queue-2", exclusive=True) + + #check that an exclusive consumer prevents other consumer being created: + channel.basic_consume(consumer_tag="first", queue="test-queue-2", exclusive=True) + try: + channel.basic_consume(consumer_tag="second", queue="test-queue-2") + self.fail("Expected consume request to fail due to previous exclusive consumer") + except Closed, e: + self.assertChannelException(403, e.args[0]) + + #open new channel and cleanup last consumer: + channel = self.client.channel(2) + channel.channel_open() + + #check that an exclusive consumer cannot be created if a consumer already exists: + channel.basic_consume(consumer_tag="first", queue="test-queue-2") + try: + channel.basic_consume(consumer_tag="second", queue="test-queue-2", exclusive=True) + self.fail("Expected exclusive consume request to fail due to previous consumer") + except Closed, e: + self.assertChannelException(403, e.args[0]) + + def test_consume_queue_errors(self): + """ + Test error conditions associated with the queue field of the consume method: + """ + channel = self.channel + try: + #queue specified but doesn't exist: + channel.basic_consume(queue="invalid-queue") + self.fail("Expected failure when consuming from non-existent queue") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + channel = self.client.channel(2) + channel.channel_open() + try: + #queue not specified and none previously declared for channel: + channel.basic_consume(queue="") + self.fail("Expected failure when consuming from unspecified queue") + except Closed, e: + self.assertConnectionException(530, e.args[0]) + + def test_consume_unique_consumers(self): + """ + Ensure unique consumer tags are enforced + """ + channel = self.channel + #setup, declare a queue: + channel.queue_declare(queue="test-queue-3", exclusive=True) + + #check that attempts to use duplicate tags are detected and prevented: + channel.basic_consume(consumer_tag="first", queue="test-queue-3") + try: + channel.basic_consume(consumer_tag="first", queue="test-queue-3") + self.fail("Expected consume request to fail due to non-unique tag") + except Closed, e: + self.assertConnectionException(530, e.args[0]) + + def test_cancel(self): + """ + Test compliance of the basic.cancel method + """ + channel = self.channel + #setup, declare a queue: + channel.queue_declare(queue="test-queue-4", exclusive=True) + channel.basic_consume(consumer_tag="my-consumer", queue="test-queue-4") + channel.basic_publish(routing_key="test-queue-4", content=Content("One")) + + #cancel should stop messages being delivered + channel.basic_cancel(consumer_tag="my-consumer") + channel.basic_publish(routing_key="test-queue-4", content=Content("Two")) + myqueue = self.client.queue("my-consumer") + msg = myqueue.get(timeout=1) + self.assertEqual("One", msg.content.body) + try: + msg = myqueue.get(timeout=1) + self.fail("Got message after cancellation: " + msg) + except Empty: None + + #cancellation of non-existant consumers should be handled without error + channel.basic_cancel(consumer_tag="my-consumer") + channel.basic_cancel(consumer_tag="this-never-existed") + + + def test_ack(self): + """ + Test basic ack/recover behaviour + """ + channel = self.channel + channel.queue_declare(queue="test-ack-queue", exclusive=True) + + reply = channel.basic_consume(queue="test-ack-queue", no_ack=False) + queue = self.client.queue(reply.consumer_tag) + + channel.basic_publish(routing_key="test-ack-queue", content=Content("One")) + channel.basic_publish(routing_key="test-ack-queue", content=Content("Two")) + channel.basic_publish(routing_key="test-ack-queue", content=Content("Three")) + channel.basic_publish(routing_key="test-ack-queue", content=Content("Four")) + channel.basic_publish(routing_key="test-ack-queue", content=Content("Five")) + + msg1 = queue.get(timeout=1) + msg2 = queue.get(timeout=1) + msg3 = queue.get(timeout=1) + msg4 = queue.get(timeout=1) + msg5 = queue.get(timeout=1) + + self.assertEqual("One", msg1.content.body) + self.assertEqual("Two", msg2.content.body) + self.assertEqual("Three", msg3.content.body) + self.assertEqual("Four", msg4.content.body) + self.assertEqual("Five", msg5.content.body) + + channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two + channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four + + channel.basic_recover(requeue=False) + + msg3b = queue.get(timeout=1) + msg5b = queue.get(timeout=1) + + self.assertEqual("Three", msg3b.content.body) + self.assertEqual("Five", msg5b.content.body) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message: " + extra.content.body) + except Empty: None + + def test_recover_requeue(self): + """ + Test requeing on recovery + """ + channel = self.channel + channel.queue_declare(queue="test-requeue", exclusive=True) + + subscription = channel.basic_consume(queue="test-requeue", no_ack=False) + queue = self.client.queue(subscription.consumer_tag) + + channel.basic_publish(routing_key="test-requeue", content=Content("One")) + channel.basic_publish(routing_key="test-requeue", content=Content("Two")) + channel.basic_publish(routing_key="test-requeue", content=Content("Three")) + channel.basic_publish(routing_key="test-requeue", content=Content("Four")) + channel.basic_publish(routing_key="test-requeue", content=Content("Five")) + + msg1 = queue.get(timeout=1) + msg2 = queue.get(timeout=1) + msg3 = queue.get(timeout=1) + msg4 = queue.get(timeout=1) + msg5 = queue.get(timeout=1) + + self.assertEqual("One", msg1.content.body) + self.assertEqual("Two", msg2.content.body) + self.assertEqual("Three", msg3.content.body) + self.assertEqual("Four", msg4.content.body) + self.assertEqual("Five", msg5.content.body) + + channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two + channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four + + channel.basic_cancel(consumer_tag=subscription.consumer_tag) + subscription2 = channel.basic_consume(queue="test-requeue") + queue2 = self.client.queue(subscription2.consumer_tag) + + channel.basic_recover(requeue=True) + + msg3b = queue2.get(timeout=1) + msg5b = queue2.get(timeout=1) + + self.assertEqual("Three", msg3b.content.body) + self.assertEqual("Five", msg5b.content.body) + + self.assertEqual(True, msg3b.redelivered) + self.assertEqual(True, msg5b.redelivered) + + try: + extra = queue2.get(timeout=1) + self.fail("Got unexpected message in second queue: " + extra.content.body) + except Empty: None + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in original queue: " + extra.content.body) + except Empty: None + + + def test_qos_prefetch_count(self): + """ + Test that the prefetch count specified is honoured + """ + #setup: declare queue and subscribe + channel = self.channel + channel.queue_declare(queue="test-prefetch-count", exclusive=True) + subscription = channel.basic_consume(queue="test-prefetch-count", no_ack=False) + queue = self.client.queue(subscription.consumer_tag) + + #set prefetch to 5: + channel.basic_qos(prefetch_count=5) + + #publish 10 messages: + for i in range(1, 11): + channel.basic_publish(routing_key="test-prefetch-count", content=Content("Message %d" % i)) + + #only 5 messages should have been delivered: + for i in range(1, 6): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 6th message in original queue: " + extra.content.body) + except Empty: None + + #ack messages and check that the next set arrive ok: + channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) + + for i in range(6, 11): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 11th message in original queue: " + extra.content.body) + except Empty: None + + + + def test_qos_prefetch_size(self): + """ + Test that the prefetch size specified is honoured + """ + #setup: declare queue and subscribe + channel = self.channel + channel.queue_declare(queue="test-prefetch-size", exclusive=True) + subscription = channel.basic_consume(queue="test-prefetch-size", no_ack=False) + queue = self.client.queue(subscription.consumer_tag) + + #set prefetch to 50 bytes (each message is 9 or 10 bytes): + channel.basic_qos(prefetch_size=50) + + #publish 10 messages: + for i in range(1, 11): + channel.basic_publish(routing_key="test-prefetch-size", content=Content("Message %d" % i)) + + #only 5 messages should have been delivered (i.e. 45 bytes worth): + for i in range(1, 6): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 6th message in original queue: " + extra.content.body) + except Empty: None + + #ack messages and check that the next set arrive ok: + channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) + + for i in range(6, 11): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 11th message in original queue: " + extra.content.body) + except Empty: None + + #make sure that a single oversized message still gets delivered + large = "abcdefghijklmnopqrstuvwxyz" + large = large + "-" + large; + channel.basic_publish(routing_key="test-prefetch-size", content=Content(large)) + msg = queue.get(timeout=1) + self.assertEqual(large, msg.content.body) + + def test_get(self): + """ + Test basic_get method + """ + channel = self.channel + channel.queue_declare(queue="test-get", exclusive=True) + + #publish some messages (no_ack=True) with persistent messaging + for i in range(1, 11): + msg=Content("Message %d" % i) + msg["delivery mode"] = 2 + channel.basic_publish(routing_key="test-get",content=msg ) + + #use basic_get to read back the messages, and check that we get an empty at the end + for i in range(1, 11): + reply = channel.basic_get(no_ack=True) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get-ok") + self.assertEqual("Message %d" % i, reply.content.body) + + reply = channel.basic_get(no_ack=True) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get-empty") + + + #publish some messages (no_ack=True) transient messaging + for i in range(11, 21): + channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i)) + + #use basic_get to read back the messages, and check that we get an empty at the end + for i in range(11, 21): + reply = channel.basic_get(no_ack=True) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get-ok") + self.assertEqual("Message %d" % i, reply.content.body) + + reply = channel.basic_get(no_ack=True) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get-empty") + + #repeat for no_ack=False + + #publish some messages (no_ack=False) with persistent messaging + for i in range(21, 31): + msg=Content("Message %d" % i) + msg["delivery mode"] = 2 + channel.basic_publish(routing_key="test-get",content=msg ) + + #use basic_get to read back the messages, and check that we get an empty at the end + for i in range(21, 31): + reply = channel.basic_get(no_ack=False) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get-ok") + self.assertEqual("Message %d" % i, reply.content.body) + + reply = channel.basic_get(no_ack=True) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get-empty") + + #public some messages (no_ack=False) with transient messaging + for i in range(31, 41): + channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i)) + + for i in range(31, 41): + reply = channel.basic_get(no_ack=False) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get-ok") + self.assertEqual("Message %d" % i, reply.content.body) + if(i == 33): + channel.basic_ack(delivery_tag=reply.delivery_tag, multiple=True) + if(i in [35, 37, 39]): + channel.basic_ack(delivery_tag=reply.delivery_tag) + + reply = channel.basic_get(no_ack=True) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get-empty") + + #recover(requeue=True) + channel.basic_recover(requeue=True) + + #get the unacked messages again (34, 36, 38, 40) + for i in [34, 36, 38, 40]: + reply = channel.basic_get(no_ack=False) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get-ok") + self.assertEqual("Message %d" % i, reply.content.body) + channel.basic_ack(delivery_tag=reply.delivery_tag) + + reply = channel.basic_get(no_ack=True) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get-empty") + + channel.basic_recover(requeue=True) + + reply = channel.basic_get(no_ack=True) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get-empty") diff --git a/Final/python/tests/broker.py b/Final/python/tests/broker.py new file mode 100644 index 0000000000..90009b6847 --- /dev/null +++ b/Final/python/tests/broker.py @@ -0,0 +1,122 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import testrunner, TestBase + +class BrokerTests(TestBase): + """Tests for basic Broker functionality""" + + def test_amqp_basic_13(self): + """ + First, this test tries to receive a message with a no-ack + consumer. Second, this test tries to explicitely receive and + acknowledge a message with an acknowledging consumer. + """ + ch = self.channel + self.queue_declare(ch, queue = "myqueue") + + # No ack consumer + ctag = ch.basic_consume(queue = "myqueue", no_ack = True).consumer_tag + body = "test no-ack" + ch.basic_publish(routing_key = "myqueue", content = Content(body)) + msg = self.client.queue(ctag).get(timeout = 5) + self.assert_(msg.content.body == body) + + # Acknowleding consumer + self.queue_declare(ch, queue = "otherqueue") + ctag = ch.basic_consume(queue = "otherqueue", no_ack = False).consumer_tag + body = "test ack" + ch.basic_publish(routing_key = "otherqueue", content = Content(body)) + msg = self.client.queue(ctag).get(timeout = 5) + ch.basic_ack(delivery_tag = msg.delivery_tag) + self.assert_(msg.content.body == body) + + def test_basic_delivery_immediate(self): + """ + Test basic message delivery where consume is issued before publish + """ + channel = self.channel + self.exchange_declare(channel, exchange="test-exchange", type="direct") + self.queue_declare(channel, queue="test-queue") + channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") + reply = channel.basic_consume(queue="test-queue", no_ack=True) + queue = self.client.queue(reply.consumer_tag) + + body = "Immediate Delivery" + channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body), immediate=True) + msg = queue.get(timeout=5) + self.assert_(msg.content.body == body) + + # TODO: Ensure we fail if immediate=True and there's no consumer. + + + def test_basic_delivery_queued(self): + """ + Test basic message delivery where publish is issued before consume + (i.e. requires queueing of the message) + """ + channel = self.channel + self.exchange_declare(channel, exchange="test-exchange", type="direct") + self.queue_declare(channel, queue="test-queue") + channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") + body = "Queued Delivery" + channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body)) + reply = channel.basic_consume(queue="test-queue", no_ack=True) + queue = self.client.queue(reply.consumer_tag) + msg = queue.get(timeout=5) + self.assert_(msg.content.body == body) + + def test_invalid_channel(self): + channel = self.client.channel(200) + try: + channel.queue_declare(exclusive=True) + self.fail("Expected error on queue_declare for invalid channel") + except Closed, e: + self.assertConnectionException(504, e.args[0]) + + def test_closed_channel(self): + channel = self.client.channel(200) + channel.channel_open() + channel.channel_close() + try: + channel.queue_declare(exclusive=True) + self.fail("Expected error on queue_declare for closed channel") + except Closed, e: + self.assertConnectionException(504, e.args[0]) + + def test_channel_flow(self): + channel = self.channel + channel.queue_declare(queue="flow_test_queue", exclusive=True) + channel.basic_consume(consumer_tag="my-tag", queue="flow_test_queue") + incoming = self.client.queue("my-tag") + + channel.channel_flow(active=False) + channel.basic_publish(routing_key="flow_test_queue", content=Content("abcdefghijklmnopqrstuvwxyz")) + try: + incoming.get(timeout=1) + self.fail("Received message when flow turned off.") + except Empty: None + + channel.channel_flow(active=True) + msg = incoming.get(timeout=1) + self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.content.body) + + diff --git a/Final/python/tests/example.py b/Final/python/tests/example.py new file mode 100644 index 0000000000..bc84f002e0 --- /dev/null +++ b/Final/python/tests/example.py @@ -0,0 +1,94 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.content import Content +from qpid.testlib import testrunner, TestBase + +class ExampleTest (TestBase): + """ + An example Qpid test, illustrating the unittest frameowkr and the + python Qpid client. The test class must inherit TestCase. The + test code uses the Qpid client to interact with a qpid broker and + verify it behaves as expected. + """ + + def test_example(self): + """ + An example test. Note that test functions must start with 'test_' + to be recognized by the test framework. + """ + + # By inheriting TestBase, self.client is automatically connected + # and self.channel is automatically opened as channel(1) + # Other channel methods mimic the protocol. + channel = self.channel + + # Now we can send regular commands. If you want to see what the method + # arguments mean or what other commands are available, you can use the + # python builtin help() method. For example: + #help(chan) + #help(chan.exchange_declare) + + # If you want browse the available protocol methods without being + # connected to a live server you can use the amqp-doc utility: + # + # Usage amqp-doc [<options>] <spec> [<pattern_1> ... <pattern_n>] + # + # Options: + # -e, --regexp use regex instead of glob when matching + + # Now that we know what commands are available we can use them to + # interact with the server. + + # Here we use ordinal arguments. + self.exchange_declare(channel, 0, "test", "direct") + + # Here we use keyword arguments. + self.queue_declare(channel, queue="test-queue") + channel.queue_bind(queue="test-queue", exchange="test", routing_key="key") + + # Call Channel.basic_consume to register as a consumer. + # All the protocol methods return a message object. The message object + # has fields corresponding to the reply method fields, plus a content + # field that is filled if the reply includes content. In this case the + # interesting field is the consumer_tag. + reply = channel.basic_consume(queue="test-queue") + + # We can use the Client.queue(...) method to access the queue + # corresponding to our consumer_tag. + queue = self.client.queue(reply.consumer_tag) + + # Now lets publish a message and see if our consumer gets it. To do + # this we need to import the Content class. + body = "Hello World!" + channel.basic_publish(exchange="test", + routing_key="key", + content=Content(body)) + + # Now we'll wait for the message to arrive. We can use the timeout + # argument in case the server hangs. By default queue.get() will wait + # until a message arrives or the connection to the server dies. + msg = queue.get(timeout=10) + + # And check that we got the right response with assertEqual + self.assertEqual(body, msg.content.body) + + # Now acknowledge the message. + channel.basic_ack(msg.delivery_tag, True) + diff --git a/Final/python/tests/exchange.py b/Final/python/tests/exchange.py new file mode 100644 index 0000000000..56d6fa82e4 --- /dev/null +++ b/Final/python/tests/exchange.py @@ -0,0 +1,327 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +Tests for exchange behaviour. + +Test classes ending in 'RuleTests' are derived from rules in amqp.xml. +""" + +import Queue, logging +from qpid.testlib import TestBase +from qpid.content import Content +from qpid.client import Closed + + +class StandardExchangeVerifier: + """Verifies standard exchange behavior. + + Used as base class for classes that test standard exchanges.""" + + def verifyDirectExchange(self, ex): + """Verify that ex behaves like a direct exchange.""" + self.queue_declare(queue="q") + self.channel.queue_bind(queue="q", exchange=ex, routing_key="k") + self.assertPublishConsume(exchange=ex, queue="q", routing_key="k") + try: + self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk") + self.fail("Expected Empty exception") + except Queue.Empty: None # Expected + + def verifyFanOutExchange(self, ex): + """Verify that ex behaves like a fanout exchange.""" + self.queue_declare(queue="q") + self.channel.queue_bind(queue="q", exchange=ex) + self.queue_declare(queue="p") + self.channel.queue_bind(queue="p", exchange=ex) + for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex) + + def verifyTopicExchange(self, ex): + """Verify that ex behaves like a topic exchange""" + self.queue_declare(queue="a") + self.channel.queue_bind(queue="a", exchange=ex, routing_key="a.#.b.*") + q = self.consume("a") + self.assertPublishGet(q, ex, "a.b.x") + self.assertPublishGet(q, ex, "a.x.b.x") + self.assertPublishGet(q, ex, "a.x.x.b.x") + # Shouldn't match + self.channel.basic_publish(exchange=ex, routing_key="a.b") + self.channel.basic_publish(exchange=ex, routing_key="a.b.x.y") + self.channel.basic_publish(exchange=ex, routing_key="x.a.b.x") + self.channel.basic_publish(exchange=ex, routing_key="a.b") + self.assert_(q.empty()) + + def verifyHeadersExchange(self, ex): + """Verify that ex is a headers exchange""" + self.queue_declare(queue="q") + self.channel.queue_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} ) + q = self.consume("q") + headers = {"name":"fred", "age":3} + self.assertPublishGet(q, exchange=ex, properties={'headers':headers}) + self.channel.basic_publish(exchange=ex) # No headers, won't deliver + self.assertEmpty(q); + + +class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier): + """ + The server SHOULD implement these standard exchange types: topic, headers. + + Client attempts to declare an exchange with each of these standard types. + """ + + def testDirect(self): + """Declare and test a direct exchange""" + self.exchange_declare(0, exchange="d", type="direct") + self.verifyDirectExchange("d") + + def testFanout(self): + """Declare and test a fanout exchange""" + self.exchange_declare(0, exchange="f", type="fanout") + self.verifyFanOutExchange("f") + + def testTopic(self): + """Declare and test a topic exchange""" + self.exchange_declare(0, exchange="t", type="topic") + self.verifyTopicExchange("t") + + def testHeaders(self): + """Declare and test a headers exchange""" + self.exchange_declare(0, exchange="h", type="headers") + self.verifyHeadersExchange("h") + + +class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier): + """ + The server MUST, in each virtual host, pre-declare an exchange instance + for each standard exchange type that it implements, where the name of the + exchange instance is amq. followed by the exchange type name. + + Client creates a temporary queue and attempts to bind to each required + exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if + those types are defined). + """ + def testAmqDirect(self): self.verifyDirectExchange("amq.direct") + + def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout") + + def testAmqTopic(self): self.verifyTopicExchange("amq.topic") + + def testAmqMatch(self): self.verifyHeadersExchange("amq.match") + +class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier): + """ + The server MUST predeclare a direct exchange to act as the default exchange + for content Publish methods and for default queue bindings. + + Client checks that the default exchange is active by specifying a queue + binding with no exchange name, and publishing a message with a suitable + routing key but without specifying the exchange name, then ensuring that + the message arrives in the queue correctly. + """ + def testDefaultExchange(self): + # Test automatic binding by queue name. + self.queue_declare(queue="d") + self.assertPublishConsume(queue="d", routing_key="d") + # Test explicit bind to default queue + self.verifyDirectExchange("") + + +# TODO aconway 2006-09-27: Fill in empty tests: + +class DefaultAccessRuleTests(TestBase): + """ + The server MUST NOT allow clients to access the default exchange except + by specifying an empty exchange name in the Queue.Bind and content Publish + methods. + """ + +class ExtensionsRuleTests(TestBase): + """ + The server MAY implement other exchange types as wanted. + """ + + +class DeclareMethodMinimumRuleTests(TestBase): + """ + The server SHOULD support a minimum of 16 exchanges per virtual host and + ideally, impose no limit except as defined by available resources. + + The client creates as many exchanges as it can until the server reports + an error; the number of exchanges successfuly created must be at least + sixteen. + """ + + +class DeclareMethodTicketFieldValidityRuleTests(TestBase): + """ + The client MUST provide a valid access ticket giving "active" access to + the realm in which the exchange exists or will be created, or "passive" + access if the if-exists flag is set. + + Client creates access ticket with wrong access rights and attempts to use + in this method. + """ + + +class DeclareMethodExchangeFieldReservedRuleTests(TestBase): + """ + Exchange names starting with "amq." are reserved for predeclared and + standardised exchanges. The client MUST NOT attempt to create an exchange + starting with "amq.". + + + """ + + +class DeclareMethodTypeFieldTypedRuleTests(TestBase): + """ + Exchanges cannot be redeclared with different types. The client MUST not + attempt to redeclare an existing exchange with a different type than used + in the original Exchange.Declare method. + + + """ + + +class DeclareMethodTypeFieldSupportRuleTests(TestBase): + """ + The client MUST NOT attempt to create an exchange with a type that the + server does not support. + + + """ + + +class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase): + """ + If set, and the exchange does not already exist, the server MUST raise a + channel exception with reply code 404 (not found). + """ + def test(self): + try: + self.channel.exchange_declare(exchange="humpty_dumpty", passive=True) + self.fail("Expected 404 for passive declaration of unknown exchange.") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + +class DeclareMethodDurableFieldSupportRuleTests(TestBase): + """ + The server MUST support both durable and transient exchanges. + + + """ + + +class DeclareMethodDurableFieldStickyRuleTests(TestBase): + """ + The server MUST ignore the durable field if the exchange already exists. + + + """ + + +class DeclareMethodAutoDeleteFieldStickyRuleTests(TestBase): + """ + The server MUST ignore the auto-delete field if the exchange already + exists. + + + """ + + +class DeleteMethodTicketFieldValidityRuleTests(TestBase): + """ + The client MUST provide a valid access ticket giving "active" access + rights to the exchange's access realm. + + Client creates access ticket with wrong access rights and attempts to use + in this method. + """ + + +class DeleteMethodExchangeFieldExistsRuleTests(TestBase): + """ + The client MUST NOT attempt to delete an exchange that does not exist. + """ + + +class HeadersExchangeTests(TestBase): + """ + Tests for headers exchange functionality. + """ + def setUp(self): + TestBase.setUp(self) + self.queue_declare(queue="q") + self.q = self.consume("q") + + def myAssertPublishGet(self, headers): + self.assertPublishGet(self.q, exchange="amq.match", properties={'headers':headers}) + + def myBasicPublish(self, headers): + self.channel.basic_publish(exchange="amq.match", content=Content("foobar", properties={'headers':headers})) + + def testMatchAll(self): + self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3}) + self.myAssertPublishGet({"name":"fred", "age":3}) + self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"}) + + # None of these should match + self.myBasicPublish({}) + self.myBasicPublish({"name":"barney"}) + self.myBasicPublish({"name":10}) + self.myBasicPublish({"name":"fred", "age":2}) + self.assertEmpty(self.q) + + def testMatchAny(self): + self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3}) + self.myAssertPublishGet({"name":"fred"}) + self.myAssertPublishGet({"name":"fred", "ignoreme":10}) + self.myAssertPublishGet({"ignoreme":10, "age":3}) + + # Wont match + self.myBasicPublish({}) + self.myBasicPublish({"irrelevant":0}) + self.assertEmpty(self.q) + + +class MiscellaneousErrorsTests(TestBase): + """ + Test some miscellaneous error conditions + """ + def testTypeNotKnown(self): + try: + self.channel.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type") + self.fail("Expected 503 for declaration of unknown exchange type.") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def testDifferentDeclaredType(self): + self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="direct") + try: + self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="topic") + self.fail("Expected 530 for redeclaration of exchange with different type.") + except Closed, e: + self.assertConnectionException(530, e.args[0]) + #cleanup + other = self.connect() + c2 = other.channel(1) + c2.channel_open() + c2.exchange_delete(exchange="test_different_declared_type_exchange") + diff --git a/Final/python/tests/queue.py b/Final/python/tests/queue.py new file mode 100644 index 0000000000..60ac4c3dfb --- /dev/null +++ b/Final/python/tests/queue.py @@ -0,0 +1,255 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import testrunner, TestBase + +class QueueTests(TestBase): + """Tests for 'methods' on the amqp queue 'class'""" + + def test_purge(self): + """ + Test that the purge method removes messages from the queue + """ + channel = self.channel + #setup, declare a queue and add some messages to it: + channel.exchange_declare(exchange="test-exchange", type="direct") + channel.queue_declare(queue="test-queue", exclusive=True) + channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") + channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("one")) + channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("two")) + channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("three")) + + #check that the queue now reports 3 messages: + reply = channel.queue_declare(queue="test-queue") + self.assertEqual(3, reply.message_count) + + #now do the purge, then test that three messages are purged and the count drops to 0 + reply = channel.queue_purge(queue="test-queue"); + self.assertEqual(3, reply.message_count) + reply = channel.queue_declare(queue="test-queue") + self.assertEqual(0, reply.message_count) + + #send a further message and consume it, ensuring that the other messages are really gone + channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("four")) + reply = channel.basic_consume(queue="test-queue", no_ack=True) + queue = self.client.queue(reply.consumer_tag) + msg = queue.get(timeout=1) + self.assertEqual("four", msg.content.body) + + #check error conditions (use new channels): + channel = self.client.channel(2) + channel.channel_open() + try: + #queue specified but doesn't exist: + channel.queue_purge(queue="invalid-queue") + self.fail("Expected failure when purging non-existent queue") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + channel = self.client.channel(3) + channel.channel_open() + try: + #queue not specified and none previously declared for channel: + channel.queue_purge() + self.fail("Expected failure when purging unspecified queue") + except Closed, e: + self.assertConnectionException(530, e.args[0]) + + #cleanup + other = self.connect() + channel = other.channel(1) + channel.channel_open() + channel.exchange_delete(exchange="test-exchange") + + def test_declare_exclusive(self): + """ + Test that the exclusive field is honoured in queue.declare + """ + # TestBase.setUp has already opened channel(1) + c1 = self.channel + # Here we open a second separate connection: + other = self.connect() + c2 = other.channel(1) + c2.channel_open() + + #declare an exclusive queue: + c1.queue_declare(queue="exclusive-queue", exclusive="True") + try: + #other connection should not be allowed to declare this: + c2.queue_declare(queue="exclusive-queue", exclusive="True") + self.fail("Expected second exclusive queue_declare to raise a channel exception") + except Closed, e: + self.assertChannelException(405, e.args[0]) + + + def test_declare_passive(self): + """ + Test that the passive field is honoured in queue.declare + """ + channel = self.channel + #declare an exclusive queue: + channel.queue_declare(queue="passive-queue-1", exclusive="True") + channel.queue_declare(queue="passive-queue-1", passive="True") + try: + #other connection should not be allowed to declare this: + channel.queue_declare(queue="passive-queue-2", passive="True") + self.fail("Expected passive declaration of non-existant queue to raise a channel exception") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + + def test_bind(self): + """ + Test various permutations of the queue.bind method + """ + channel = self.channel + channel.queue_declare(queue="queue-1", exclusive="True") + + #straightforward case, both exchange & queue exist so no errors expected: + channel.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1") + + #bind the default queue for the channel (i.e. last one declared): + channel.queue_bind(exchange="amq.direct", routing_key="key2") + + #use the queue name where neither routing key nor queue are specified: + channel.queue_bind(exchange="amq.direct") + + #try and bind to non-existant exchange + try: + channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1") + self.fail("Expected bind to non-existant exchange to fail") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + #need to reopen a channel: + channel = self.client.channel(2) + channel.channel_open() + + #try and bind non-existant queue: + try: + channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1") + self.fail("Expected bind of non-existant queue to fail") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + + def test_delete_simple(self): + """ + Test basic queue deletion + """ + channel = self.channel + + #straight-forward case: + channel.queue_declare(queue="delete-me") + channel.basic_publish(routing_key="delete-me", content=Content("a")) + channel.basic_publish(routing_key="delete-me", content=Content("b")) + channel.basic_publish(routing_key="delete-me", content=Content("c")) + reply = channel.queue_delete(queue="delete-me") + self.assertEqual(3, reply.message_count) + #check that it has gone be declaring passively + try: + channel.queue_declare(queue="delete-me", passive="True") + self.fail("Queue has not been deleted") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + #check attempted deletion of non-existant queue is handled correctly: + channel = self.client.channel(2) + channel.channel_open() + try: + channel.queue_delete(queue="i-dont-exist", if_empty="True") + self.fail("Expected delete of non-existant queue to fail") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + + + def test_delete_ifempty(self): + """ + Test that if_empty field of queue_delete is honoured + """ + channel = self.channel + + #create a queue and add a message to it (use default binding): + channel.queue_declare(queue="delete-me-2") + channel.queue_declare(queue="delete-me-2", passive="True") + channel.basic_publish(routing_key="delete-me-2", content=Content("message")) + + #try to delete, but only if empty: + try: + channel.queue_delete(queue="delete-me-2", if_empty="True") + self.fail("Expected delete if_empty to fail for non-empty queue") + except Closed, e: + self.assertChannelException(406, e.args[0]) + + #need new channel now: + channel = self.client.channel(2) + channel.channel_open() + + #empty queue: + reply = channel.basic_consume(queue="delete-me-2", no_ack=True) + queue = self.client.queue(reply.consumer_tag) + msg = queue.get(timeout=1) + self.assertEqual("message", msg.content.body) + channel.basic_cancel(consumer_tag=reply.consumer_tag) + + #retry deletion on empty queue: + channel.queue_delete(queue="delete-me-2", if_empty="True") + + #check that it has gone by declaring passively: + try: + channel.queue_declare(queue="delete-me-2", passive="True") + self.fail("Queue has not been deleted") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + def test_delete_ifunused(self): + """ + Test that if_unused field of queue_delete is honoured + """ + channel = self.channel + + #create a queue and register a consumer: + channel.queue_declare(queue="delete-me-3") + channel.queue_declare(queue="delete-me-3", passive="True") + reply = channel.basic_consume(queue="delete-me-3", no_ack=True) + + #need new channel now: + channel2 = self.client.channel(2) + channel2.channel_open() + #try to delete, but only if empty: + try: + channel2.queue_delete(queue="delete-me-3", if_unused="True") + self.fail("Expected delete if_unused to fail for queue with existing consumer") + except Closed, e: + self.assertChannelException(406, e.args[0]) + + + channel.basic_cancel(consumer_tag=reply.consumer_tag) + channel.queue_delete(queue="delete-me-3", if_unused="True") + #check that it has gone by declaring passively: + try: + channel.queue_declare(queue="delete-me-3", passive="True") + self.fail("Queue has not been deleted") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + diff --git a/Final/python/tests/testlib.py b/Final/python/tests/testlib.py new file mode 100644 index 0000000000..cab07cc4ac --- /dev/null +++ b/Final/python/tests/testlib.py @@ -0,0 +1,66 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# +# Tests for the testlib itself. +# + +from qpid.content import Content +from qpid.testlib import testrunner, TestBase +from Queue import Empty + +import sys +from traceback import * + +def mytrace(frame, event, arg): + print_stack(frame); + print "====" + return mytrace + +class TestBaseTest(TestBase): + """Verify TestBase functions work as expected""" + + def testAssertEmptyPass(self): + """Test assert empty works""" + self.queue_declare(queue="empty") + q = self.consume("empty") + self.assertEmpty(q) + try: + q.get(timeout=1) + self.fail("Queue is not empty.") + except Empty: None # Ignore + + def testAssertEmptyFail(self): + self.queue_declare(queue="full") + q = self.consume("full") + self.channel.basic_publish(routing_key="full") + try: + self.assertEmpty(q); + self.fail("assertEmpty did not assert on non-empty queue") + except AssertionError: None # Ignore + + def testMessageProperties(self): + """Verify properties are passed with message""" + props={"headers":{"x":1, "y":2}} + self.queue_declare(queue="q") + q = self.consume("q") + self.assertPublishGet(q, routing_key="q", properties=props) + + + diff --git a/Final/python/tests/tx.py b/Final/python/tests/tx.py new file mode 100644 index 0000000000..054fb8d8b7 --- /dev/null +++ b/Final/python/tests/tx.py @@ -0,0 +1,209 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import testrunner, TestBase + +class TxTests(TestBase): + """ + Tests for 'methods' on the amqp tx 'class' + """ + + def test_commit(self): + """ + Test that commited publishes are delivered and commited acks are not re-delivered + """ + channel = self.channel + queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-commit-a", "tx-commit-b", "tx-commit-c") + channel.tx_commit() + + #check results + for i in range(1, 5): + msg = queue_c.get(timeout=1) + self.assertEqual("TxMessage %d" % i, msg.content.body) + + msg = queue_b.get(timeout=1) + self.assertEqual("TxMessage 6", msg.content.body) + + msg = queue_a.get(timeout=1) + self.assertEqual("TxMessage 7", msg.content.body) + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.content.body) + except Empty: None + + #cleanup + channel.basic_ack(delivery_tag=0, multiple=True) + channel.tx_commit() + + def test_auto_rollback(self): + """ + Test that a channel closed with an open transaction is effectively rolled back + """ + channel = self.channel + queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c") + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.content.body) + except Empty: None + + channel.tx_rollback() + + #check results + for i in range(1, 5): + msg = queue_a.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + msg = queue_b.get(timeout=1) + self.assertEqual("Message 6", msg.content.body) + + msg = queue_c.get(timeout=1) + self.assertEqual("Message 7", msg.content.body) + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.content.body) + except Empty: None + + #cleanup + channel.basic_ack(delivery_tag=0, multiple=True) + channel.tx_commit() + + def test_rollback(self): + """ + Test that rolled back publishes are not delivered and rolled back acks are re-delivered + """ + channel = self.channel + queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c") + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.content.body) + except Empty: None + + channel.tx_rollback() + + #check results + for i in range(1, 5): + msg = queue_a.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + msg = queue_b.get(timeout=1) + self.assertEqual("Message 6", msg.content.body) + + msg = queue_c.get(timeout=1) + self.assertEqual("Message 7", msg.content.body) + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.content.body) + except Empty: None + + #cleanup + channel.basic_ack(delivery_tag=0, multiple=True) + channel.tx_commit() + + def perform_txn_work(self, channel, name_a, name_b, name_c): + """ + Utility method that does some setup and some work under a transaction. Used for testing both + commit and rollback + """ + #setup: + channel.queue_declare(queue=name_a, exclusive=True) + channel.queue_declare(queue=name_b, exclusive=True) + channel.queue_declare(queue=name_c, exclusive=True) + + key = "my_key_" + name_b + topic = "my_topic_" + name_c + + channel.queue_bind(queue=name_b, exchange="amq.direct", routing_key=key) + channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic) + + for i in range(1, 5): + channel.basic_publish(routing_key=name_a, content=Content("Message %d" % i)) + + channel.basic_publish(routing_key=key, exchange="amq.direct", content=Content("Message 6")) + channel.basic_publish(routing_key=topic, exchange="amq.topic", content=Content("Message 7")) + + channel.tx_select() + + #consume and ack messages + sub_a = channel.basic_consume(queue=name_a, no_ack=False) + queue_a = self.client.queue(sub_a.consumer_tag) + for i in range(1, 5): + msg = queue_a.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) + + sub_b = channel.basic_consume(queue=name_b, no_ack=False) + queue_b = self.client.queue(sub_b.consumer_tag) + msg = queue_b.get(timeout=1) + self.assertEqual("Message 6", msg.content.body) + channel.basic_ack(delivery_tag=msg.delivery_tag) + + sub_c = channel.basic_consume(queue=name_c, no_ack=False) + queue_c = self.client.queue(sub_c.consumer_tag) + msg = queue_c.get(timeout=1) + self.assertEqual("Message 7", msg.content.body) + channel.basic_ack(delivery_tag=msg.delivery_tag) + + #publish messages + for i in range(1, 5): + channel.basic_publish(routing_key=topic, exchange="amq.topic", content=Content("TxMessage %d" % i)) + + channel.basic_publish(routing_key=key, exchange="amq.direct", content=Content("TxMessage 6")) + channel.basic_publish(routing_key=name_a, content=Content("TxMessage 7")) + + return queue_a, queue_b, queue_c + + def test_commit_overlapping_acks(self): + """ + Test that logically 'overlapping' acks do not cause errors on commit + """ + channel = self.channel + channel.queue_declare(queue="commit-overlapping", exclusive=True) + for i in range(1, 10): + channel.basic_publish(routing_key="commit-overlapping", content=Content("Message %d" % i)) + + + channel.tx_select() + + sub = channel.basic_consume(queue="commit-overlapping", no_ack=False) + queue = self.client.queue(sub.consumer_tag) + for i in range(1, 10): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + if i in [3, 6, 10]: + channel.basic_ack(delivery_tag=msg.delivery_tag) + + channel.tx_commit() + + #check all have been acked: + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message: " + extra.content.body) + except Empty: None |