diff options
author | Robert Gemmell <robbie@apache.org> | 2010-12-10 15:00:27 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2010-12-10 15:00:27 +0000 |
commit | 27c90381d7f9bdf77d98e5dc83af8565ca11e8ec (patch) | |
tree | 78bf588983ae1bb342f9015f0aae135a8427529e | |
parent | c4ab602008d26d0a5137da73768c91081ce89ef6 (diff) | |
download | qpid-python-27c90381d7f9bdf77d98e5dc83af8565ca11e8ec.tar.gz |
QPID-2973: create the StoreContext upfront for one-time use at later points, instead of creating them as required. Remove discarded message tag from the unacked map instead of allowing it to remain until the map is later cleared
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1044387 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java | 22 | ||||
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java | 2 |
2 files changed, 14 insertions, 10 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 1897d37365..5317eb2fa3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -1112,11 +1112,14 @@ public class AMQChannel AMQQueue queue = rejectedQueueEntry.getQueue(); Exchange altExchange = queue.getAlternateExchange(); - + + StoreContext dlqStoreContext = new StoreContext("Session: " + _session.getClientIdentifier() + "; channel: " + _channelId + "; DL deliveryTag: " + deliveryTag); + if (altExchange == null) { _log.warn("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag); - rejectedQueueEntry.discard(new StoreContext()); + unackedMap.remove(deliveryTag); + rejectedQueueEntry.discard(dlqStoreContext); return; } @@ -1127,17 +1130,17 @@ public class AMQChannel if (destinationQueues == null || destinationQueues.isEmpty()) { _log.warn("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag); - rejectedQueueEntry.discard(new StoreContext()); + unackedMap.remove(deliveryTag); + rejectedQueueEntry.discard(dlqStoreContext); return; } - + //increment the message reference count to include the new queue(s) msg.incrementReference(destinationQueues.size()); - - //create a new storeContext to use with a new TransactionContext for the DLQ process - StoreContext dlqStoreContext = new StoreContext("Session: " + _session.getClientIdentifier() + "; channel: " + _channelId + "; DLQ deliveryTag: " + deliveryTag); + + //create a new storeContext to use with the TransactionContext for the DLQ process DLQTransactionalContext dlqTxnContext = new DLQTransactionalContext(this, dlqStoreContext); - + //enqueue the message on the new queues in the store if its persistent if (msg.isPersistent()) { @@ -1157,9 +1160,8 @@ public class AMQChannel { dlqTxnContext.deliver(destinationQueues.get(i), msg); } - + dlqTxnContext.commit(); - } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 786e997e8e..a831625cc9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -242,6 +242,8 @@ public class AMQQueueFactory //ensure the queue is bound to the exchange if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue)) { + //actual routing key used does not matter due to use of fanout exchange, + //but we will make the key 'dlq' as it can be logged at creation. dlQueue.bind(dlExchange, DLQ_ROUTING_KEY, null); } |