summaryrefslogtreecommitdiff
path: root/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py')
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py125
1 files changed, 117 insertions, 8 deletions
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py b/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py
index 0ffeb57172..8cbb5793d9 100644
--- a/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py
@@ -18,7 +18,7 @@
#
import traceback
from qpid.queue import Empty
-from qpid.datatypes import Message
+from qpid.datatypes import Message, RangedSet
from qpid.testlib import TestBase010
from qpid.session import SessionException
@@ -77,13 +77,7 @@ class AlternateExchangeTests(TestBase010):
"""
session = self.session
#set up a 'dead letter queue':
- session.exchange_declare(exchange="dlq", type="fanout")
- session.queue_declare(queue="deleted", exclusive=True, auto_delete=True)
- session.exchange_bind(exchange="dlq", queue="deleted")
- session.message_subscribe(destination="dlq", queue="deleted")
- session.message_flow(destination="dlq", unit=session.credit_unit.message, value=0xFFFFFFFFL)
- session.message_flow(destination="dlq", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
- dlq = session.incoming("dlq")
+ dlq = self.setup_dlq()
#create a queue using the dlq as its alternate exchange:
session.queue_declare(queue="delete-me", alternate_exchange="dlq")
@@ -236,6 +230,121 @@ class AlternateExchangeTests(TestBase010):
self.assertEqual("Three", dlq.get(timeout=1).body)
self.assertEmpty(dlq)
+ def test_queue_delete_loop(self):
+ """
+ Test that if a queue is bound to its own alternate exchange,
+ then on deletion there is no infinite looping
+ """
+ session = self.session
+ dlq = self.setup_dlq()
+
+ #create a queue using the dlq as its alternate exchange:
+ session.queue_declare(queue="delete-me", alternate_exchange="dlq")
+ #bind that queue to the dlq as well:
+ session.exchange_bind(exchange="dlq", queue="delete-me")
+ #send it some messages:
+ dp=self.session.delivery_properties(routing_key="delete-me")
+ for m in ["One", "Two", "Three"]:
+ session.message_transfer(message=Message(dp, m))
+ #delete it:
+ session.queue_delete(queue="delete-me")
+ #cleanup:
+ session.exchange_delete(exchange="dlq")
+
+ #check the messages were delivered to the dlq:
+ for m in ["One", "Two", "Three"]:
+ self.assertEqual(m, dlq.get(timeout=1).body)
+ self.assertEmpty(dlq)
+
+ def test_queue_delete_no_match(self):
+ """
+ Test that on queue deletion, if the queues own alternate
+ exchange cannot find a match for the message, the
+ alternate-exchange of that exchange will be tried. Note:
+ though the spec rules out going to the alternate-exchanges
+ alternate exchange when sending to an exchange, it does not
+ cover this case.
+ """
+ session = self.session
+ dlq = self.setup_dlq()
+
+ #setu up an 'intermediary' exchange
+ session.exchange_declare(exchange="my-exchange", type="direct", alternate_exchange="dlq")
+
+ #create a queue using the intermediary as its alternate exchange:
+ session.queue_declare(queue="delete-me", alternate_exchange="my-exchange")
+ #bind that queue to the dlq as well:
+ session.exchange_bind(exchange="dlq", queue="delete-me")
+ #send it some messages:
+ dp=self.session.delivery_properties(routing_key="delete-me")
+ for m in ["One", "Two", "Three"]:
+ session.message_transfer(message=Message(dp, m))
+
+ #delete it:
+ session.queue_delete(queue="delete-me")
+ #cleanup:
+ session.exchange_delete(exchange="my-exchange")
+ session.exchange_delete(exchange="dlq")
+
+ #check the messages were delivered to the dlq:
+ for m in ["One", "Two", "Three"]:
+ self.assertEqual(m, dlq.get(timeout=1).body)
+ self.assertEmpty(dlq)
+
+ def test_reject_no_match(self):
+ """
+ Test that on rejecting a message, if the queues own alternate
+ exchange cannot find a match for the message, the
+ alternate-exchange of that exchange will be tried. Note:
+ though the spec rules out going to the alternate-exchanges
+ alternate exchange when sending to an exchange, it does not
+ cover this case.
+ """
+ session = self.session
+ dlq = self.setup_dlq()
+
+ #setu up an 'intermediary' exchange
+ session.exchange_declare(exchange="my-exchange", type="direct", alternate_exchange="dlq")
+
+ #create a queue using the intermediary as its alternate exchange:
+ session.queue_declare(queue="delivery-queue", alternate_exchange="my-exchange", auto_delete=True)
+ #bind that queue to the dlq as well:
+ session.exchange_bind(exchange="dlq", queue="delivery-queue")
+ #send it some messages:
+ dp=self.session.delivery_properties(routing_key="delivery-queue")
+ for m in ["One", "Two", "Three"]:
+ session.message_transfer(message=Message(dp, m))
+
+ #get and reject those messages:
+ session.message_subscribe(destination="a", queue="delivery-queue")
+ session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ incoming = session.incoming("a")
+ for m in ["One", "Two", "Three"]:
+ msg = incoming.get(timeout=1)
+ self.assertEqual(m, msg.body)
+ session.message_reject(RangedSet(msg.id))
+ session.message_cancel(destination="a")
+
+ #check the messages were delivered to the dlq:
+ for m in ["One", "Two", "Three"]:
+ self.assertEqual(m, dlq.get(timeout=1).body)
+ self.assertEmpty(dlq)
+ #cleanup:
+ session.exchange_delete(exchange="my-exchange")
+ session.exchange_delete(exchange="dlq")
+
+ def setup_dlq(self):
+ session = self.session
+ #set up 'dead-letter' handling:
+ session.exchange_declare(exchange="dlq", type="fanout")
+ session.queue_declare(queue="deleted", exclusive=True, auto_delete=True)
+ session.exchange_bind(exchange="dlq", queue="deleted")
+ session.message_subscribe(destination="dlq", queue="deleted")
+ session.message_flow(destination="dlq", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="dlq", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ dlq = session.incoming("dlq")
+ return dlq
def assertEmpty(self, queue):
try: