summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2011-03-03 12:45:25 +0000
committerGordon Sim <gsim@apache.org>2011-03-03 12:45:25 +0000
commit53a0c88af1f7afd3a0827f8ca586450a0456c40b (patch)
treea525029936106396e5ed185ece5c2ba8dfb3476d
parentc1979c9f419cc57649196e5dc30b0d36cec5c6c5 (diff)
downloadqpid-python-53a0c88af1f7afd3a0827f8ca586450a0456c40b.tar.gz
QPID-3107: If queue's alternate-exchange can't route message, try that exchange's alternate-exchange
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1076604 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.cpp9
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.h2
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp3
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py125
5 files changed, 130 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
index 64760bea36..58dcc6d7c7 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -135,7 +135,7 @@ void DeliveryRecord::reject()
Exchange::shared_ptr alternate = queue->getAlternateExchange();
if (alternate) {
DeliverableMessage delivery(msg.payload);
- alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders());
+ alternate->routeWithAlternate(delivery);
QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to "
<< alternate->getName());
} else {
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp
index d143471559..2c7589e4b6 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.cpp
+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp
@@ -342,3 +342,12 @@ bool Exchange::MatchQueue::operator()(Exchange::Binding::shared_ptr b)
void Exchange::setProperties(const boost::intrusive_ptr<Message>& msg) {
msg->getProperties<DeliveryProperties>()->setExchange(getName());
}
+
+bool Exchange::routeWithAlternate(Deliverable& msg)
+{
+ route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders());
+ if (!msg.delivered && alternate) {
+ alternate->route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders());
+ }
+ return msg.delivered;
+}
diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h
index 3c8b5ca2cd..9c4e6be192 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.h
+++ b/qpid/cpp/src/qpid/broker/Exchange.h
@@ -222,6 +222,8 @@ public:
*/
void recoveryComplete(ExchangeRegistry& exchanges);
+ bool routeWithAlternate(Deliverable& message);
+
protected:
qpid::sys::Mutex bridgeLock;
std::vector<DynamicBridge*> bridgeVector;
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index edae8dc24f..29b3c0d585 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -834,8 +834,7 @@ void Queue::destroyed()
Mutex::ScopedLock locker(messageLock);
while(!messages->empty()){
DeliverableMessage msg(messages->front().payload);
- alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
- msg.getMessage().getApplicationHeaders());
+ alternateExchange->routeWithAlternate(msg);
popAndDequeue();
}
alternateExchange->decAlternateUsers();
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py b/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py
index 0ffeb57172..8cbb5793d9 100644
--- a/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py
@@ -18,7 +18,7 @@
#
import traceback
from qpid.queue import Empty
-from qpid.datatypes import Message
+from qpid.datatypes import Message, RangedSet
from qpid.testlib import TestBase010
from qpid.session import SessionException
@@ -77,13 +77,7 @@ class AlternateExchangeTests(TestBase010):
"""
session = self.session
#set up a 'dead letter queue':
- session.exchange_declare(exchange="dlq", type="fanout")
- session.queue_declare(queue="deleted", exclusive=True, auto_delete=True)
- session.exchange_bind(exchange="dlq", queue="deleted")
- session.message_subscribe(destination="dlq", queue="deleted")
- session.message_flow(destination="dlq", unit=session.credit_unit.message, value=0xFFFFFFFFL)
- session.message_flow(destination="dlq", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
- dlq = session.incoming("dlq")
+ dlq = self.setup_dlq()
#create a queue using the dlq as its alternate exchange:
session.queue_declare(queue="delete-me", alternate_exchange="dlq")
@@ -236,6 +230,121 @@ class AlternateExchangeTests(TestBase010):
self.assertEqual("Three", dlq.get(timeout=1).body)
self.assertEmpty(dlq)
+ def test_queue_delete_loop(self):
+ """
+ Test that if a queue is bound to its own alternate exchange,
+ then on deletion there is no infinite looping
+ """
+ session = self.session
+ dlq = self.setup_dlq()
+
+ #create a queue using the dlq as its alternate exchange:
+ session.queue_declare(queue="delete-me", alternate_exchange="dlq")
+ #bind that queue to the dlq as well:
+ session.exchange_bind(exchange="dlq", queue="delete-me")
+ #send it some messages:
+ dp=self.session.delivery_properties(routing_key="delete-me")
+ for m in ["One", "Two", "Three"]:
+ session.message_transfer(message=Message(dp, m))
+ #delete it:
+ session.queue_delete(queue="delete-me")
+ #cleanup:
+ session.exchange_delete(exchange="dlq")
+
+ #check the messages were delivered to the dlq:
+ for m in ["One", "Two", "Three"]:
+ self.assertEqual(m, dlq.get(timeout=1).body)
+ self.assertEmpty(dlq)
+
+ def test_queue_delete_no_match(self):
+ """
+ Test that on queue deletion, if the queues own alternate
+ exchange cannot find a match for the message, the
+ alternate-exchange of that exchange will be tried. Note:
+ though the spec rules out going to the alternate-exchanges
+ alternate exchange when sending to an exchange, it does not
+ cover this case.
+ """
+ session = self.session
+ dlq = self.setup_dlq()
+
+ #setu up an 'intermediary' exchange
+ session.exchange_declare(exchange="my-exchange", type="direct", alternate_exchange="dlq")
+
+ #create a queue using the intermediary as its alternate exchange:
+ session.queue_declare(queue="delete-me", alternate_exchange="my-exchange")
+ #bind that queue to the dlq as well:
+ session.exchange_bind(exchange="dlq", queue="delete-me")
+ #send it some messages:
+ dp=self.session.delivery_properties(routing_key="delete-me")
+ for m in ["One", "Two", "Three"]:
+ session.message_transfer(message=Message(dp, m))
+
+ #delete it:
+ session.queue_delete(queue="delete-me")
+ #cleanup:
+ session.exchange_delete(exchange="my-exchange")
+ session.exchange_delete(exchange="dlq")
+
+ #check the messages were delivered to the dlq:
+ for m in ["One", "Two", "Three"]:
+ self.assertEqual(m, dlq.get(timeout=1).body)
+ self.assertEmpty(dlq)
+
+ def test_reject_no_match(self):
+ """
+ Test that on rejecting a message, if the queues own alternate
+ exchange cannot find a match for the message, the
+ alternate-exchange of that exchange will be tried. Note:
+ though the spec rules out going to the alternate-exchanges
+ alternate exchange when sending to an exchange, it does not
+ cover this case.
+ """
+ session = self.session
+ dlq = self.setup_dlq()
+
+ #setu up an 'intermediary' exchange
+ session.exchange_declare(exchange="my-exchange", type="direct", alternate_exchange="dlq")
+
+ #create a queue using the intermediary as its alternate exchange:
+ session.queue_declare(queue="delivery-queue", alternate_exchange="my-exchange", auto_delete=True)
+ #bind that queue to the dlq as well:
+ session.exchange_bind(exchange="dlq", queue="delivery-queue")
+ #send it some messages:
+ dp=self.session.delivery_properties(routing_key="delivery-queue")
+ for m in ["One", "Two", "Three"]:
+ session.message_transfer(message=Message(dp, m))
+
+ #get and reject those messages:
+ session.message_subscribe(destination="a", queue="delivery-queue")
+ session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ incoming = session.incoming("a")
+ for m in ["One", "Two", "Three"]:
+ msg = incoming.get(timeout=1)
+ self.assertEqual(m, msg.body)
+ session.message_reject(RangedSet(msg.id))
+ session.message_cancel(destination="a")
+
+ #check the messages were delivered to the dlq:
+ for m in ["One", "Two", "Three"]:
+ self.assertEqual(m, dlq.get(timeout=1).body)
+ self.assertEmpty(dlq)
+ #cleanup:
+ session.exchange_delete(exchange="my-exchange")
+ session.exchange_delete(exchange="dlq")
+
+ def setup_dlq(self):
+ session = self.session
+ #set up 'dead-letter' handling:
+ session.exchange_declare(exchange="dlq", type="fanout")
+ session.queue_declare(queue="deleted", exclusive=True, auto_delete=True)
+ session.exchange_bind(exchange="dlq", queue="deleted")
+ session.message_subscribe(destination="dlq", queue="deleted")
+ session.message_flow(destination="dlq", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="dlq", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ dlq = session.incoming("dlq")
+ return dlq
def assertEmpty(self, queue):
try: