diff options
Diffstat (limited to 'qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py')
-rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py | 353 |
1 files changed, 353 insertions, 0 deletions
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py b/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py new file mode 100644 index 0000000000..8cbb5793d9 --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py @@ -0,0 +1,353 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import traceback +from qpid.queue import Empty +from qpid.datatypes import Message, RangedSet +from qpid.testlib import TestBase010 +from qpid.session import SessionException + +class AlternateExchangeTests(TestBase010): + """ + Tests for the new mechanism for message returns introduced in 0-10 + and available in 0-9 for preview + """ + + def test_unroutable(self): + """ + Test that unroutable messages are delivered to the alternate-exchange if specified + """ + session = self.session + #create an exchange with an alternate defined + session.exchange_declare(exchange="secondary", type="fanout") + session.exchange_declare(exchange="primary", type="direct", alternate_exchange="secondary") + + #declare, bind (to the alternate exchange) and consume from a queue for 'returned' messages + session.queue_declare(queue="returns", exclusive=True, auto_delete=True) + session.exchange_bind(queue="returns", exchange="secondary") + session.message_subscribe(destination="a", queue="returns") + session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + returned = session.incoming("a") + + #declare, bind (to the primary exchange) and consume from a queue for 'processed' messages + session.queue_declare(queue="processed", exclusive=True, auto_delete=True) + session.exchange_bind(queue="processed", exchange="primary", binding_key="my-key") + session.message_subscribe(destination="b", queue="processed") + session.message_flow(destination="b", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="b", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + processed = session.incoming("b") + + #publish to the primary exchange + #...one message that makes it to the 'processed' queue: + dp=self.session.delivery_properties(routing_key="my-key") + session.message_transfer(destination="primary", message=Message(dp, "Good")) + #...and one that does not: + dp=self.session.delivery_properties(routing_key="unused-key") + session.message_transfer(destination="primary", message=Message(dp, "Bad")) + + #delete the exchanges + session.exchange_delete(exchange="primary") + session.exchange_delete(exchange="secondary") + + #verify behaviour + self.assertEqual("Good", processed.get(timeout=1).body) + self.assertEqual("Bad", returned.get(timeout=1).body) + self.assertEmpty(processed) + self.assertEmpty(returned) + + def test_queue_delete(self): + """ + Test that messages in a queue being deleted are delivered to the alternate-exchange if specified + """ + session = self.session + #set up a 'dead letter queue': + dlq = self.setup_dlq() + + #create a queue using the dlq as its alternate exchange: + session.queue_declare(queue="delete-me", alternate_exchange="dlq") + #send it some messages: + dp=self.session.delivery_properties(routing_key="delete-me") + session.message_transfer(message=Message(dp, "One")) + session.message_transfer(message=Message(dp, "Two")) + session.message_transfer(message=Message(dp, "Three")) + #delete it: + session.queue_delete(queue="delete-me") + #delete the dlq exchange: + session.exchange_delete(exchange="dlq") + + #check the messages were delivered to the dlq: + self.assertEqual("One", dlq.get(timeout=1).body) + self.assertEqual("Two", dlq.get(timeout=1).body) + self.assertEqual("Three", dlq.get(timeout=1).body) + self.assertEmpty(dlq) + + def test_delete_while_used_by_queue(self): + """ + Ensure an exchange still in use as an alternate-exchange for a + queue can't be deleted + """ + session = self.session + session.exchange_declare(exchange="alternate", type="fanout") + + session2 = self.conn.session("alternate", 2) + session2.queue_declare(queue="q", alternate_exchange="alternate") + try: + session2.exchange_delete(exchange="alternate") + self.fail("Expected deletion of in-use alternate-exchange to fail") + except SessionException, e: + session = self.session + session.queue_delete(queue="q") + session.exchange_delete(exchange="alternate") + self.assertEquals(530, e.args[0].error_code) + + + def test_delete_while_used_by_exchange(self): + """ + Ensure an exchange still in use as an alternate-exchange for + another exchange can't be deleted + """ + session = self.session + session.exchange_declare(exchange="alternate", type="fanout") + + session = self.conn.session("alternate", 2) + session.exchange_declare(exchange="e", type="fanout", alternate_exchange="alternate") + try: + session.exchange_delete(exchange="alternate") + self.fail("Expected deletion of in-use alternate-exchange to fail") + except SessionException, e: + session = self.session + session.exchange_delete(exchange="e") + session.exchange_delete(exchange="alternate") + self.assertEquals(530, e.args[0].error_code) + + + def test_modify_existing_exchange_alternate(self): + """ + Ensure that attempting to modify an exhange to change + the alternate throws an exception + """ + session = self.session + session.exchange_declare(exchange="alt1", type="direct") + session.exchange_declare(exchange="alt2", type="direct") + session.exchange_declare(exchange="onealternate", type="fanout", alternate_exchange="alt1") + try: + # attempt to change the alternate on an already existing exchange + session.exchange_declare(exchange="onealternate", type="fanout", alternate_exchange="alt2") + self.fail("Expected changing an alternate on an existing exchange to fail") + except SessionException, e: + self.assertEquals(530, e.args[0].error_code) + session = self.conn.session("alternate", 2) + session.exchange_delete(exchange="onealternate") + session.exchange_delete(exchange="alt2") + session.exchange_delete(exchange="alt1") + + + def test_add_alternate_to_exchange(self): + """ + Ensure that attempting to modify an exhange by adding + an alternate throws an exception + """ + session = self.session + session.exchange_declare(exchange="alt1", type="direct") + session.exchange_declare(exchange="noalternate", type="fanout") + try: + # attempt to add an alternate on an already existing exchange + session.exchange_declare(exchange="noalternate", type="fanout", alternate_exchange="alt1") + self.fail("Expected adding an alternate on an existing exchange to fail") + except SessionException, e: + self.assertEquals(530, e.args[0].error_code) + session = self.conn.session("alternate", 2) + session.exchange_delete(exchange="noalternate") + session.exchange_delete(exchange="alt1") + + + def test_del_alternate_to_exchange(self): + """ + Ensure that attempting to modify an exhange by declaring + it again without an alternate does nothing + """ + session = self.session + session.exchange_declare(exchange="alt1", type="direct") + session.exchange_declare(exchange="onealternate", type="fanout", alternate_exchange="alt1") + # attempt to re-declare without an alternate - silently ignore + session.exchange_declare(exchange="onealternate", type="fanout" ) + session.exchange_delete(exchange="onealternate") + session.exchange_delete(exchange="alt1") + + def test_queue_autodelete(self): + """ + Test that messages in a queue being auto-deleted are delivered + to the alternate-exchange if specified, including messages + that are acquired but not accepted + """ + session = self.session + #set up a 'dead letter queue': + session.exchange_declare(exchange="dlq", type="fanout") + session.queue_declare(queue="deleted", exclusive=True, auto_delete=True) + session.exchange_bind(exchange="dlq", queue="deleted") + session.message_subscribe(destination="dlq", queue="deleted") + session.message_flow(destination="dlq", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="dlq", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + dlq = session.incoming("dlq") + + #on a separate session, create an auto-deleted queue using the + #dlq as its alternate exchange (handling of auto-delete is + #different for exclusive and non-exclusive queues, so test + #both modes): + for mode in [True, False]: + session2 = self.conn.session("another-session") + session2.queue_declare(queue="my-queue", alternate_exchange="dlq", exclusive=mode, auto_delete=True) + #send it some messages: + dp=session2.delivery_properties(routing_key="my-queue") + session2.message_transfer(message=Message(dp, "One")) + session2.message_transfer(message=Message(dp, "Two")) + session2.message_transfer(message=Message(dp, "Three")) + session2.message_subscribe(destination="incoming", queue="my-queue") + session2.message_flow(destination="incoming", unit=session.credit_unit.message, value=1) + session2.message_flow(destination="incoming", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + self.assertEqual("One", session2.incoming("incoming").get(timeout=1).body) + session2.close() + + #check the messages were delivered to the dlq: + self.assertEqual("One", dlq.get(timeout=1).body) + self.assertEqual("Two", dlq.get(timeout=1).body) + self.assertEqual("Three", dlq.get(timeout=1).body) + self.assertEmpty(dlq) + + def test_queue_delete_loop(self): + """ + Test that if a queue is bound to its own alternate exchange, + then on deletion there is no infinite looping + """ + session = self.session + dlq = self.setup_dlq() + + #create a queue using the dlq as its alternate exchange: + session.queue_declare(queue="delete-me", alternate_exchange="dlq") + #bind that queue to the dlq as well: + session.exchange_bind(exchange="dlq", queue="delete-me") + #send it some messages: + dp=self.session.delivery_properties(routing_key="delete-me") + for m in ["One", "Two", "Three"]: + session.message_transfer(message=Message(dp, m)) + #delete it: + session.queue_delete(queue="delete-me") + #cleanup: + session.exchange_delete(exchange="dlq") + + #check the messages were delivered to the dlq: + for m in ["One", "Two", "Three"]: + self.assertEqual(m, dlq.get(timeout=1).body) + self.assertEmpty(dlq) + + def test_queue_delete_no_match(self): + """ + Test that on queue deletion, if the queues own alternate + exchange cannot find a match for the message, the + alternate-exchange of that exchange will be tried. Note: + though the spec rules out going to the alternate-exchanges + alternate exchange when sending to an exchange, it does not + cover this case. + """ + session = self.session + dlq = self.setup_dlq() + + #setu up an 'intermediary' exchange + session.exchange_declare(exchange="my-exchange", type="direct", alternate_exchange="dlq") + + #create a queue using the intermediary as its alternate exchange: + session.queue_declare(queue="delete-me", alternate_exchange="my-exchange") + #bind that queue to the dlq as well: + session.exchange_bind(exchange="dlq", queue="delete-me") + #send it some messages: + dp=self.session.delivery_properties(routing_key="delete-me") + for m in ["One", "Two", "Three"]: + session.message_transfer(message=Message(dp, m)) + + #delete it: + session.queue_delete(queue="delete-me") + #cleanup: + session.exchange_delete(exchange="my-exchange") + session.exchange_delete(exchange="dlq") + + #check the messages were delivered to the dlq: + for m in ["One", "Two", "Three"]: + self.assertEqual(m, dlq.get(timeout=1).body) + self.assertEmpty(dlq) + + def test_reject_no_match(self): + """ + Test that on rejecting a message, if the queues own alternate + exchange cannot find a match for the message, the + alternate-exchange of that exchange will be tried. Note: + though the spec rules out going to the alternate-exchanges + alternate exchange when sending to an exchange, it does not + cover this case. + """ + session = self.session + dlq = self.setup_dlq() + + #setu up an 'intermediary' exchange + session.exchange_declare(exchange="my-exchange", type="direct", alternate_exchange="dlq") + + #create a queue using the intermediary as its alternate exchange: + session.queue_declare(queue="delivery-queue", alternate_exchange="my-exchange", auto_delete=True) + #bind that queue to the dlq as well: + session.exchange_bind(exchange="dlq", queue="delivery-queue") + #send it some messages: + dp=self.session.delivery_properties(routing_key="delivery-queue") + for m in ["One", "Two", "Three"]: + session.message_transfer(message=Message(dp, m)) + + #get and reject those messages: + session.message_subscribe(destination="a", queue="delivery-queue") + session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + incoming = session.incoming("a") + for m in ["One", "Two", "Three"]: + msg = incoming.get(timeout=1) + self.assertEqual(m, msg.body) + session.message_reject(RangedSet(msg.id)) + session.message_cancel(destination="a") + + #check the messages were delivered to the dlq: + for m in ["One", "Two", "Three"]: + self.assertEqual(m, dlq.get(timeout=1).body) + self.assertEmpty(dlq) + #cleanup: + session.exchange_delete(exchange="my-exchange") + session.exchange_delete(exchange="dlq") + + def setup_dlq(self): + session = self.session + #set up 'dead-letter' handling: + session.exchange_declare(exchange="dlq", type="fanout") + session.queue_declare(queue="deleted", exclusive=True, auto_delete=True) + session.exchange_bind(exchange="dlq", queue="deleted") + session.message_subscribe(destination="dlq", queue="deleted") + session.message_flow(destination="dlq", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="dlq", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + dlq = session.incoming("dlq") + return dlq + + def assertEmpty(self, queue): + try: + msg = queue.get(timeout=1) + self.fail("Queue not empty: " + msg) + except Empty: None |