summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2010-12-10 15:00:27 +0000
committerRobert Gemmell <robbie@apache.org>2010-12-10 15:00:27 +0000
commit27c90381d7f9bdf77d98e5dc83af8565ca11e8ec (patch)
tree78bf588983ae1bb342f9015f0aae135a8427529e
parentc4ab602008d26d0a5137da73768c91081ce89ef6 (diff)
downloadqpid-python-27c90381d7f9bdf77d98e5dc83af8565ca11e8ec.tar.gz
QPID-2973: create the StoreContext upfront for one-time use at later points, instead of creating them as required. Remove discarded message tag from the unacked map instead of allowing it to remain until the map is later cleared
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1044387 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java22
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java2
2 files changed, 14 insertions, 10 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 1897d37365..5317eb2fa3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -1112,11 +1112,14 @@ public class AMQChannel
AMQQueue queue = rejectedQueueEntry.getQueue();
Exchange altExchange = queue.getAlternateExchange();
-
+
+ StoreContext dlqStoreContext = new StoreContext("Session: " + _session.getClientIdentifier() + "; channel: " + _channelId + "; DL deliveryTag: " + deliveryTag);
+
if (altExchange == null)
{
_log.warn("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
- rejectedQueueEntry.discard(new StoreContext());
+ unackedMap.remove(deliveryTag);
+ rejectedQueueEntry.discard(dlqStoreContext);
return;
}
@@ -1127,17 +1130,17 @@ public class AMQChannel
if (destinationQueues == null || destinationQueues.isEmpty())
{
_log.warn("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag);
- rejectedQueueEntry.discard(new StoreContext());
+ unackedMap.remove(deliveryTag);
+ rejectedQueueEntry.discard(dlqStoreContext);
return;
}
-
+
//increment the message reference count to include the new queue(s)
msg.incrementReference(destinationQueues.size());
-
- //create a new storeContext to use with a new TransactionContext for the DLQ process
- StoreContext dlqStoreContext = new StoreContext("Session: " + _session.getClientIdentifier() + "; channel: " + _channelId + "; DLQ deliveryTag: " + deliveryTag);
+
+ //create a new storeContext to use with the TransactionContext for the DLQ process
DLQTransactionalContext dlqTxnContext = new DLQTransactionalContext(this, dlqStoreContext);
-
+
//enqueue the message on the new queues in the store if its persistent
if (msg.isPersistent())
{
@@ -1157,9 +1160,8 @@ public class AMQChannel
{
dlqTxnContext.deliver(destinationQueues.get(i), msg);
}
-
+
dlqTxnContext.commit();
-
}
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index 786e997e8e..a831625cc9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -242,6 +242,8 @@ public class AMQQueueFactory
//ensure the queue is bound to the exchange
if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue))
{
+ //actual routing key used does not matter due to use of fanout exchange,
+ //but we will make the key 'dlq' as it can be logged at creation.
dlQueue.bind(dlExchange, DLQ_ROUTING_KEY, null);
}